Google Cloud Transfer Service Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using Cloud Console.
Enable billing for your project, as described in Google Cloud documentation.
Enable API, as described in Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[gcp]'
Detailed information is available Installation
GcpTransferServiceJobCreateOperator¶
Create a transfer job.
The function accepts dates in two formats:
consistent with Google API
{ "year": 2019, "month": 2, "day": 11 }
as an
datetime
object
The function accepts time in two formats:
consistent with Google API
{ "hours": 12, "minutes": 30, "seconds": 0 }
as an
time
object
If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection
The selected connection for AWS can be indicated by the parameter aws_conn_id
.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
gcs_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
} # type: Dict[str, Any]
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
create_transfer_job_from_aws = GcpTransferServiceJobCreateOperator(
task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
)
Templating¶
template_fields = ('body', 'gcp_conn_id', 'aws_conn_id')
More information¶
See Google Cloud Transfer Service - Method: transferJobs.create.
GcpTransferServiceJobDeleteOperator¶
Deletes a transfer job.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
delete_transfer_from_aws_job = GcpTransferServiceJobDeleteOperator(
task_id="delete_transfer_from_aws_job",
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
project_id=GCP_PROJECT_ID,
)
Templating¶
template_fields = ('job_name', 'project_id', 'gcp_conn_id', 'api_version')
More information¶
See Google Cloud Transfer Service - REST Resource: transferJobs - Status
GcpTransferServiceJobUpdateOperator¶
Updates a transfer job.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
update_body = {
PROJECT_ID: GCP_PROJECT_ID,
TRANSFER_JOB: {DESCRIPTION: "{}_updated".format(GCP_DESCRIPTION)},
TRANSFER_JOB_FIELD_MASK: "description",
}
update_job = GcpTransferServiceJobUpdateOperator(
task_id="update_job",
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
body=update_body,
)
Templating¶
template_fields = ('job_name', 'body', 'gcp_conn_id', 'aws_conn_id')
More information¶
See Google Cloud Transfer Service - Method: transferJobs.patch
GcpTransferServiceOperationCancelOperator¶
Gets a transfer operation. The result is returned to XCOM.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
cancel_operation = GcpTransferServiceOperationCancelOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
More information¶
See Google Cloud Transfer Service - Method: transferOperations.cancel
GcpTransferServiceOperationGetOperator¶
Gets a transfer operation. The result is returned to XCOM.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
get_operation = GcpTransferServiceOperationGetOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
Templating¶
template_fields = ('operation_name', 'gcp_conn_id')
More information¶
See Google Cloud Transfer Service - Method: transferOperations.get
GcpTransferServiceOperationsListOperator¶
List a transfer operations. The result is returned to XCOM.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
list_operations = GcpTransferServiceOperationsListOperator(
task_id="list_operations",
filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
},
)
Templating¶
template_fields = ('filter', 'gcp_conn_id')
More information¶
See Google Cloud Transfer Service - Method: transferOperations.list
GcpTransferServiceOperationPauseOperator¶
Pauses a transfer operations.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
pause_operation = GcpTransferServiceOperationPauseOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
More information¶
See Google Cloud Transfer Service - Method: transferOperations.pause
GcpTransferServiceOperationResumeOperator¶
Resumes a transfer operations.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
resume_operation = GcpTransferServiceOperationResumeOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
Templating¶
template_fields = ('operation_name', 'gcp_conn_id', 'api_version')
More information¶
See Google Cloud Transfer Service - Method: transferOperations.resume
GCPTransferServiceWaitForJobStatusSensor¶
Waits for at least one operation belonging to the job to have the expected status.
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
WAIT_FOR_OPERATION_POKE_INTERVAL = int(os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5))
GCP_TRANSFER_SOURCE_AWS_BUCKET = os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
)
GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
)
Using the operator¶
wait_for_operation_to_end = GCPTransferServiceWaitForJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
Templating¶
template_fields = ('job_name',)