Complete the airflow survey & get a free airflow 3 certification!

Google Cloud BigQuery Transfer Operator to Postgres

Google Cloud BigQuery is Google Cloud’s serverless data warehouse offering. PostgreSQL is an open-source relational database management system. This operator can be used to copy data from a BigQuery table to PostgreSQL.

Prerequisite Tasks

To use these operators, you must do a few things:

Operator

Copying data from BigQuery table to Postgres table is performed with the BigQueryToPostgresOperator operator.

Use Jinja templating with target_table_name, impersonation_chain, dataset_id, table_id to define values dynamically.

You may use the parameter selected_fields to limit the fields to be copied (all fields by default), as well as the parameter replace to overwrite the destination table instead of appending to it. If the replace parameter is used, then both selected_fields and replace_index parameters will need to be specified due to constraints of the PostgreSQL’s ON CONFLICT clause in the underlying INSERT command.

For more information, please refer to the links above.

Transferring data

The following Operator copies data from a BigQuery table to PostgreSQL.

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py[source]

bigquery_to_postgres = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=False,
)

The Operator can also replace data in a PostgreSQL table with matching data from a BigQuery table.

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py[source]

bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres_upsert",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=True,
    selected_fields=["emp_name", "salary"],
    replace_index=["emp_name", "salary"],
)

Reference

For further information, look at:

Was this entry helpful?