Google Cloud Transfer Service Operators

Prerequisite Tasks

CloudDataTransferServiceCreateJobOperator

Create a transfer job.

The function accepts dates in two formats:

  • consistent with Google API

    { "year": 2019, "month": 2, "day": 11 }
    
  • as an datetime object

The function accepts time in two formats:

  • consistent with Google API

    { "hours": 12, "minutes": 30, "seconds": 0 }
    
  • as an time object

If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection The selected connection for AWS can be indicated by the parameter aws_conn_id.

For parameter definition, take a look at CloudDataTransferServiceCreateJobOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.pyView Source

gcs_to_gcs_transfer_body = {
    DESCRIPTION: "description",
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    JOB_NAME: GCP_TRANSFER_JOB_NAME,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
    task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
)

Templating

template_fields = (
    'body',
    'gcp_conn_id',
    'aws_conn_id',
    'google_impersonation_chain',
)

CloudDataTransferServiceDeleteJobOperator

Deletes a transfer job.

For parameter definition, take a look at CloudDataTransferServiceDeleteJobOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

delete_transfer_from_aws_job = CloudDataTransferServiceDeleteJobOperator(
    task_id="delete_transfer_from_aws_job",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
)

Templating

template_fields = (
    'job_name',
    'project_id',
    'gcp_conn_id',
    'api_version',
    'google_impersonation_chain',
)

CloudDataTransferServiceUpdateJobOperator

Updates a transfer job.

For parameter definition, take a look at CloudDataTransferServiceUpdateJobOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.pyView Source

update_body = {
    PROJECT_ID: GCP_PROJECT_ID,
    TRANSFER_JOB: {DESCRIPTION: "description_updated"},
    TRANSFER_JOB_FIELD_MASK: "description",
}

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.pyView Source

update_transfer = CloudDataTransferServiceUpdateJobOperator(
    task_id="update_transfer",
    job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
    body=update_body,
)

Templating

template_fields = (
    'job_name',
    'body',
    'gcp_conn_id',
    'aws_conn_id',
    'google_impersonation_chain',
)

CloudDataTransferServiceCancelOperationOperator

Gets a transfer operation. The result is returned to XCOM.

For parameter definition, take a look at CloudDataTransferServiceCancelOperationOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

cancel_operation = CloudDataTransferServiceCancelOperationOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}",
)

Templating

template_fields = (
    'operation_name',
    'gcp_conn_id',
    'api_version',
    'google_impersonation_chain',
)

CloudDataTransferServiceGetOperationOperator

Gets a transfer operation. The result is returned to XCOM.

For parameter definition, take a look at CloudDataTransferServiceGetOperationOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

get_operation = CloudDataTransferServiceGetOperationOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)

Templating

template_fields = (
    'operation_name',
    'gcp_conn_id',
    'google_impersonation_chain',
)

CloudDataTransferServiceListOperationsOperator

List a transfer operations. The result is returned to XCOM.

For parameter definition, take a look at CloudDataTransferServiceListOperationsOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

list_operations = CloudDataTransferServiceListOperationsOperator(
    task_id="list_operations",
    request_filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
    },
)

Templating

template_fields = (
    'filter',
    'gcp_conn_id',
    'google_impersonation_chain',
)

CloudDataTransferServicePauseOperationOperator

Pauses a transfer operations.

For parameter definition, take a look at CloudDataTransferServicePauseOperationOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

pause_operation = CloudDataTransferServicePauseOperationOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)

Templating

template_fields = (
    'operation_name',
    'gcp_conn_id',
    'api_version',
    'google_impersonation_chain',
)

CloudDataTransferServiceResumeOperationOperator

Resumes a transfer operations.

For parameter definition, take a look at CloudDataTransferServiceResumeOperationOperator.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

resume_operation = CloudDataTransferServiceResumeOperationOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)

Templating

template_fields = (
    'operation_name',
    'gcp_conn_id',
    'api_version',
    'google_impersonation_chain',
)

CloudDataTransferServiceJobStatusSensor

Waits for at least one operation belonging to the job to have the expected status.

For parameter definition, take a look at CloudDataTransferServiceJobStatusSensor.

Using the operator

airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.pyView Source

wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)

Templating

template_fields = (
    'job_name',
    'impersonation_chain',
)

Reference

For further information, look at:

Was this entry helpful?