Google Cloud Transfer Service Operators

Prerequisite Tasks

To use these operators, you must do a few things:

GcpTransferServiceJobCreateOperator

Create a transfer job.

The function accepts dates in two formats:

The function accepts time in two formats:

  • consistent with Google API

    { "hours": 12, "minutes": 30, "seconds": 0 }
    
    Copy to clipboard
  • as an time object

If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection The selected connection for AWS can be indicated by the parameter aws_conn_id.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

gcs_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}  # type: Dict[str, Any]
Copy to clipboard

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}
Copy to clipboard

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

create_transfer_job_from_aws = GcpTransferServiceJobCreateOperator(
    task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
)
Copy to clipboard

Templating

template_fields = ('body', 'gcp_conn_id', 'aws_conn_id')
Copy to clipboard

GcpTransferServiceJobDeleteOperator

Deletes a transfer job.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

delete_transfer_from_aws_job = GcpTransferServiceJobDeleteOperator(
    task_id="delete_transfer_from_aws_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
)
Copy to clipboard

Templating

template_fields = ('job_name', 'project_id', 'gcp_conn_id', 'api_version')
Copy to clipboard

GcpTransferServiceJobUpdateOperator

Updates a transfer job.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

update_body = {
    PROJECT_ID: GCP_PROJECT_ID,
    TRANSFER_JOB: {DESCRIPTION: "{}_updated".format(GCP_DESCRIPTION)},
    TRANSFER_JOB_FIELD_MASK: "description",
}
Copy to clipboard

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

update_job = GcpTransferServiceJobUpdateOperator(
    task_id="update_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    body=update_body,
)
Copy to clipboard

Templating

template_fields = ('job_name', 'body', 'gcp_conn_id', 'aws_conn_id')
Copy to clipboard

GcpTransferServiceOperationCancelOperator

Gets a transfer operation. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

cancel_operation = GcpTransferServiceOperationCancelOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}",
)
Copy to clipboard

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
Copy to clipboard

GcpTransferServiceOperationGetOperator

Gets a transfer operation. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

get_operation = GcpTransferServiceOperationGetOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
Copy to clipboard

Templating

template_fields = ('operation_name', 'gcp_conn_id')
Copy to clipboard

GcpTransferServiceOperationsListOperator

List a transfer operations. The result is returned to XCOM.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

list_operations = GcpTransferServiceOperationsListOperator(
    task_id="list_operations",
    filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
    },
)
Copy to clipboard

Templating

template_fields = ('filter', 'gcp_conn_id')
Copy to clipboard

GcpTransferServiceOperationPauseOperator

Pauses a transfer operations.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

pause_operation = GcpTransferServiceOperationPauseOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)
Copy to clipboard

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
Copy to clipboard

GcpTransferServiceOperationResumeOperator

Resumes a transfer operations.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

resume_operation = GcpTransferServiceOperationResumeOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
Copy to clipboard

Templating

template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
Copy to clipboard

GCPTransferServiceWaitForJobStatusSensor

Waits for at least one operation belonging to the job to have the expected status.

Arguments

Some arguments in the example DAG are taken from the OS environment variables:

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))

GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
    'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Copy to clipboard

Using the operator

airflow/contrib/example_dags/example_gcp_transfer.pyView Source

wait_for_operation_to_end = GCPTransferServiceWaitForJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
Copy to clipboard

Templating

template_fields = ('job_name',)
Copy to clipboard

Was this entry helpful?