Google Cloud BigQuery Operators

BigQuery is Google's fully managed, petabyte scale, low cost analytics data warehouse. It is a serverless Software as a Service (SaaS) that doesn't need a database administrator. It allows users to focus on analyzing data to find meaningful insights using familiar SQL.

Airflow provides operators to manage datasets and tables, run queries and validate data.

Prerequisite Tasks

Manage datasets

Create dataset

To create an empty dataset in a BigQuery database you can use BigQueryCreateEmptyDatasetOperator.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

Get dataset details

To get the details of an existing dataset you can use BigQueryGetDatasetOperator.

This operator returns a Dataset Resource.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

List tables in dataset

To retrieve the list of tables in a given dataset use BigQueryGetDatasetTablesOperator.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

Update table

To update a table in BigQuery you can use BigQueryUpdateTableOperator.

The update method replaces the entire Table resource, whereas the patch method only replaces fields that are provided in the submitted Table resource.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

Update dataset

To update a dataset in BigQuery you can use BigQueryUpdateDatasetOperator.

The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

Delete dataset

To delete an existing dataset from a BigQuery database you can use BigQueryDeleteDatasetOperator.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

Manage tables

Create native table

To create a new, empty table in the given BigQuery dataset, optionally with schema you can use BigQueryCreateEmptyTableOperator.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google Cloud Storage object name. The object in Google Cloud Storage must be a JSON file with the schema fields in it.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

create_table = BigQueryCreateEmptyTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

You can use this operator to create a view on top of an existing table.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

create_view = BigQueryCreateEmptyTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    view={
        "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "useLegacySql": False,
    },
)

You can also use this operator to create a materialized view that periodically cache results of a query for increased performance and efficiency.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

create_materialized_view = BigQueryCreateEmptyTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    materialized_view={
        "query": f"SELECT SUM(salary)  AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "enableRefresh": True,
        "refreshIntervalMs": 2000000,
    },
)

Create external table

To create a new external table in a dataset with the data in Google Cloud Storage you can use BigQueryCreateExternalTableOperator.

Similarly to BigQueryCreateEmptyTableOperator you may either directly pass the schema fields in, or you may point the operator to a Google Cloud Storage object name.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    table_resource={
        "tableReference": {
            "projectId": PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": "external_table",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "CSV",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
            "sourceUris": [DATA_SAMPLE_GCS_URL],
        },
    },
)

Fetch data from table

To fetch data from a BigQuery table you can use BigQueryGetDataOperator. Alternatively you can fetch data for selected columns if you pass fields to selected_fields.

This operator returns data in a Python list where the number of elements in the returned list will be equal to the number of rows fetched. Each element in the list will again be a list where elements would represent the column values for that row.

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

Upsert table

To upsert a table you can use BigQueryUpsertTableOperator.

This operator either updates the existing table or creates a new, empty table in the given dataset.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

Update table schema

To update the schema of a table you can use BigQueryUpdateTableSchemaOperator.

This operator updates the schema field values supplied, while leaving the rest unchanged. This is useful for instance to set new field descriptions on an existing table schema.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

Delete table

To delete an existing table you can use BigQueryDeleteTableOperator.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

You can also use this operator to delete a view.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view", deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view"
)

You can also use this operator to delete a materialized view.

airflow/providers/google/cloud/example_dags/example_bigquery_operations.pyView Source

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

Execute BigQuery jobs

Let's say you would like to execute the following query.

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

    INSERT_ROWS_QUERY = (
        f"INSERT {DATASET}.{TABLE_1} VALUES "
        f"(42, 'monthy python', '{INSERT_DATE}'), "
        f"(42, 'fishy fish', '{INSERT_DATE}');"
    )

To execute the SQL query in a specific BigQuery database you can use BigQueryInsertJobOperator with proper query job configuration that can be Jinja templated.

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
        }
    },
    location=location,
)

For more information on types of BigQuery job please check documentation.

If you want to include some files in your configuration you can use include clause of Jinja template language as follow:

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include 'example_bigquery_query.sql' %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

The included file can also use Jinja templates which can be useful in case of .sql files.

Additionally you can use job_id parameter of BigQueryInsertJobOperator to improve idempotency. If this parameter is not passed then uuid will be used as job_id. If provided then operator will try to submit a new job with this job_id`. If there's already a job with such job_id then it will reattach to the existing job.

Validate data

Check if query result has data

To perform checks against BigQuery you can use BigQueryCheckOperator.

This operator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

Compare query result to pass value

To perform a simple value check using sql code you can use BigQueryValueCheckOperator.

This operator expects a sql query that will return a single row. Each value on that first row is evaluated against pass_value which can be either a string or numeric value. If numeric, you can also specify tolerance.

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

Compare metrics over time

To check that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before you can use BigQueryIntervalCheckOperator.

airflow/providers/google/cloud/example_dags/example_bigquery_queries.pyView Source

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=location,
)

Sensors

Check that a Table exists

To check that a table exists you can define a sensor operator. This allows delaying execution of downstream operators until a table exist. If the table is sharded on dates you can for instance use the {{ ds_nodash }} macro as the table name suffix.

BigQueryTableExistenceSensor.

airflow/providers/google/cloud/example_dags/example_bigquery_sensors.pyView Source

check_table_exists = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

Check that a Table Partition exists

To check that a table exists and has a partition you can use. BigQueryTablePartitionExistenceSensor.

airflow/providers/google/cloud/example_dags/example_bigquery_sensors.pyView Source

check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

For DAY partitioned tables, the partition_id parameter is a string on the "%Y%m%d" format

Reference

For further information, look at:

Was this entry helpful?