Google Cloud Transfer Service Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
CloudDataTransferServiceCreateJobOperator¶
Create a transfer job.
The function accepts dates in two formats:
- consistent with Google API - { "year": 2019, "month": 2, "day": 11 } 
- as an - datetimeobject
The function accepts time in two formats:
- consistent with Google API - { "hours": 12, "minutes": 30, "seconds": 0 } 
- as an - timeobject
If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection
The selected connection for AWS can be indicated by the parameter aws_conn_id.
For parameter definition, take a look at
CloudDataTransferServiceCreateJobOperator.
Using the operator¶
gcs_to_gcs_transfer_body = {
    DESCRIPTION: "description",
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: PROJECT_ID_TRANSFER,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}
aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    JOB_NAME: GCP_TRANSFER_JOB_NAME,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
    task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
Templating¶
template_fields: Sequence[str] = (
    "body",
    "gcp_conn_id",
    "aws_conn_id",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferJobs.create.
CloudDataTransferServiceDeleteJobOperator¶
Deletes a transfer job.
For parameter definition, take a look at
CloudDataTransferServiceDeleteJobOperator.
Using the operator¶
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
    task_id="delete_transfer_job_s3_to_gcs",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)
Templating¶
template_fields: Sequence[str] = (
    "job_name",
    "project_id",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - REST Resource: transferJobs - Status
CloudDataTransferServiceUpdateJobOperator¶
Updates a transfer job.
For parameter definition, take a look at
CloudDataTransferServiceUpdateJobOperator.
Using the operator¶
update_body = {
    PROJECT_ID: PROJECT_ID_TRANSFER,
    TRANSFER_JOB: {DESCRIPTION: "description_updated"},
    TRANSFER_JOB_FIELD_MASK: "description",
}
update_transfer = CloudDataTransferServiceUpdateJobOperator(
    task_id="update_transfer",
    job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
    body=update_body,
)
Templating¶
template_fields: Sequence[str] = (
    "job_name",
    "body",
    "gcp_conn_id",
    "aws_conn_id",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferJobs.patch
CloudDataTransferServiceCancelOperationOperator¶
Gets a transfer operation. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceCancelOperationOperator.
Using the operator¶
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.cancel
CloudDataTransferServiceGetOperationOperator¶
Gets a transfer operation. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceGetOperationOperator.
Using the operator¶
get_operation = CloudDataTransferServiceGetOperationOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
Templating¶
template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.get
CloudDataTransferServiceListOperationsOperator¶
List a transfer operations. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceListOperationsOperator.
Using the operator¶
list_operations = CloudDataTransferServiceListOperationsOperator(
    task_id="list_operations",
    request_filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
    },
)
Templating¶
template_fields: Sequence[str] = (
    "filter",
    "gcp_conn_id",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.list
CloudDataTransferServicePauseOperationOperator¶
Pauses a transfer operations.
For parameter definition, take a look at
CloudDataTransferServicePauseOperationOperator.
Using the operator¶
pause_operation = CloudDataTransferServicePauseOperationOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.pause
CloudDataTransferServiceResumeOperationOperator¶
Resumes a transfer operations.
For parameter definition, take a look at
CloudDataTransferServiceResumeOperationOperator.
Using the operator¶
resume_operation = CloudDataTransferServiceResumeOperationOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
Templating¶
template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.resume
CloudDataTransferServiceJobStatusSensor¶
Waits for at least one operation belonging to the job to have the expected status.
For parameter definition, take a look at
CloudDataTransferServiceJobStatusSensor.
Using the operator¶
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
Templating¶
template_fields: Sequence[str] = (
    "job_name",
    "impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
Copy data from one GCS bucket to another.
For parameter definition, take a look at
CloudDataTransferServiceGCSToGCSOperator.
Using the operator¶
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
    task_id="transfer_gcs_to_gcs",
    source_bucket=BUCKET_NAME_SRC,
    source_path=FILE_URI,
    destination_bucket=BUCKET_NAME_DST,
    destination_path=FILE_URI,
    wait=True,
)
Templating¶
template_fields: Sequence[str] = (
    "gcp_conn_id",
    "source_bucket",
    "destination_bucket",
    "source_path",
    "destination_path",
    "description",
    "object_conditions",
    "google_impersonation_chain",
)
