Google Cloud BigQuery Data Transfer Service Operators¶
The BigQuery Data Transfer Service automates data movement from SaaS applications to Google BigQuery on a scheduled, managed basis. Your analytics team can lay the foundation for a data warehouse without writing a single line of code. BigQuery Data Transfer Service initially supports Google application sources like Google Ads, Campaign Manager, Google Ad Manager and YouTube. Through BigQuery Data Transfer Service, users also gain access to data connectors that allow you to easily transfer data from Teradata and Amazon S3 to BigQuery.
Prerequisite Tasks¶
Creating transfer configuration¶
To create DTS transfer configuration you can use
BigQueryCreateDataTransferOperator
.
In the case of Airflow, the customer needs to create a transfer config with the automatic scheduling disabled
and then trigger a transfer run using a specialized Airflow operator that will call StartManualTransferRuns API
for example BigQueryDataTransferServiceStartTransferRunsOperator
.
BigQueryCreateDataTransferOperator
checks if automatic
scheduling option is present in passed configuration. If present then nothing is done, otherwise it's value is
set to True
.
airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
schedule_options = {"disable_auto_scheduling": True}
PARAMS = {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": GCP_DTS_BQ_TABLE,
"file_format": "CSV",
}
TRANSFER_CONFIG = {
"destination_dataset_id": GCP_DTS_BQ_DATASET,
"display_name": "GCS Test Config",
"data_source_id": "google_cloud_storage",
"schedule_options": schedule_options,
"params": PARAMS,
}
You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Basic usage of the operator:
airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=GCP_PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = gcp_bigquery_create_transfer.output["transfer_config_id"]
You can use Jinja templating with
transfer_config
, project_id
, authorization_code
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values. The result is saved to XCom,
which allows it to be used by other operators. Additionally, id of the new config is accessible in
XCom under transfer_config_id
key.
Deleting transfer configuration¶
To delete DTS transfer configuration you can use
BigQueryDeleteDataTransferConfigOperator
.
Basic usage of the operator:
airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
You can use Jinja templating with
transfer_config
, project_id
, authorization_code
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Manually starting transfer runs¶
Start manual transfer runs to be executed now with schedule_time equal to current time.
BigQueryDataTransferServiceStartTransferRunsOperator
.
Basic usage of the operator:
airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
You can use Jinja templating with
transfer_config_id
, project_id
, requested_time_range
, requested_run_time
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
To check if operation succeeded you can use
BigQueryDataTransferServiceTransferRunSensor
.
airflow/providers/google/cloud/example_dags/example_bigquery_dts.py
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=gcp_bigquery_start_transfer.output["run_id"],
expected_statuses={"SUCCEEDED"},
)
You can use Jinja templating with
run_id
, transfer_config_id
, expected_statuses
, project_id
, impersonation_chain
parameters which allows you to dynamically determine values.