Google Cloud BigQuery Data Transfer Service Operators

The BigQuery Data Transfer Service automates data movement from SaaS applications to Google BigQuery on a scheduled, managed basis. Your analytics team can lay the foundation for a data warehouse without writing a single line of code. BigQuery Data Transfer Service initially supports Google application sources like Google Ads, Campaign Manager, Google Ad Manager and YouTube. Through BigQuery Data Transfer Service, users also gain access to data connectors that allow you to easily transfer data from Teradata and Amazon S3 to BigQuery.

Creating transfer configuration

To create DTS transfer configuration you can use BigQueryCreateDataTransferOperator.

In the case of Airflow, the customer needs to create a transfer config with the automatic scheduling disabled and then trigger a transfer run using a specialized Airflow operator that will call StartManualTransferRuns API for example BigQueryDataTransferServiceStartTransferRunsOperator. BigQueryCreateDataTransferOperator checks if automatic scheduling option is present in passed configuration. If present then nothing is done, otherwise it’s value is set to True.

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py


# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
schedule_options = {"disable_auto_scheduling": True}

PARAMS = {
    "field_delimiter": ",",
    "max_bad_records": "0",
    "skip_leading_rows": "1",
    "data_path_template": BUCKET_URI,
    "destination_table_name_template": GCP_DTS_BQ_TABLE,
    "file_format": "CSV",
}

TRANSFER_CONFIG = {
    "destination_dataset_id": GCP_DTS_BQ_DATASET,
    "display_name": "GCS Test Config",
    "data_source_id": "google_cloud_storage",
    "schedule_options": schedule_options,
    "params": PARAMS,
}

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Basic usage of the operator:

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py

gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
    transfer_config=TRANSFER_CONFIG,
    project_id=GCP_PROJECT_ID,
    task_id="gcp_bigquery_create_transfer",
)

transfer_config_id = (
    "{{ task_instance.xcom_pull('gcp_bigquery_create_transfer', key='transfer_config_id') }}"
)

You can use Jinja templating with transfer_config, project_id, authorization_code, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators. Additionally, id of the new config is accessible in XCom under transfer_config_id key.

Deleting transfer configuration

To delete DTS transfer configuration you can use BigQueryDeleteDataTransferConfigOperator.

Basic usage of the operator:

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py

gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
    transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)

You can use Jinja templating with transfer_config, project_id, authorization_code, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Manually starting transfer runs

Start manual transfer runs to be executed now with schedule_time equal to current time. BigQueryDataTransferServiceStartTransferRunsOperator.

Basic usage of the operator:

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py

gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
    task_id="gcp_bigquery_start_transfer",
    transfer_config_id=transfer_config_id,
    requested_run_time={"seconds": int(time.time() + 60)},
)
run_id = "{{ task_instance.xcom_pull('gcp_bigquery_start_transfer', key='run_id') }}"

You can use Jinja templating with transfer_config_id, project_id, requested_time_range, requested_run_time, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

To check if operation succeeded you can use BigQueryDataTransferServiceTransferRunSensor.

airflow/providers/google/cloud/example_dags/example_bigquery_dts.py

gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
    task_id="gcp_run_sensor",
    transfer_config_id=transfer_config_id,
    run_id=run_id,
    expected_statuses={"SUCCEEDED"},
)

You can use Jinja templating with run_id, transfer_config_id, expected_statuses, project_id, impersonation_chain parameters which allows you to dynamically determine values.

Was this entry helpful?