Google Cloud BigQuery Operators

BigQuery is Google’s fully managed, petabyte scale, low cost analytics data warehouse. It is a serverless Software as a Service (SaaS) that doesn’t need a database administrator. It allows users to focus on analyzing data to find meaningful insights using familiar SQL.

Airflow provides operators to manage datasets and tables, run queries and validate data.

Prerequisite Tasks

To use these operators, you must do a few things:

Manage datasets

Create dataset

To create an empty dataset in a BigQuery database you can use BigQueryCreateEmptyDatasetOperator.

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[source]

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

Get dataset details

To get the details of an existing dataset you can use BigQueryGetDatasetOperator.

This operator returns a Dataset Resource.

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[source]

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

List tables in dataset

To retrieve the list of tables in a given dataset use BigQueryGetDatasetTablesOperator.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

Update table

To update a table in BigQuery you can use BigQueryUpdateTableOperator.

The update method replaces the entire Table resource, whereas the patch method only replaces fields that are provided in the submitted Table resource.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=["friendlyName", "description"],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

Update dataset

To update a dataset in BigQuery you can use BigQueryUpdateDatasetOperator.

The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource.

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[source]

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

Delete dataset

To delete an existing dataset from a BigQuery database you can use BigQueryDeleteDatasetOperator.

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[source]

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

Manage tables

Create native table

To create a new, empty table in the given BigQuery dataset, optionally with schema you can use BigQueryCreateEmptyTableOperator.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google Cloud Storage object name. The object in Google Cloud Storage must be a JSON file with the schema fields in it.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

create_table = BigQueryCreateEmptyTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

You can use this operator to create a view on top of an existing table.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

create_view = BigQueryCreateEmptyTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    view={
        "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "useLegacySql": False,
    },
)

You can also use this operator to create a materialized view that periodically cache results of a query for increased performance and efficiency.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

create_materialized_view = BigQueryCreateEmptyTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    materialized_view={
        "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "enableRefresh": True,
        "refreshIntervalMs": 2000000,
    },
)

Create external table

To create a new external table in a dataset with the data in Google Cloud Storage you can use BigQueryCreateExternalTableOperator.

Similarly to BigQueryCreateEmptyTableOperator you can directly pass the schema fields in.

tests/system/providers/google/cloud/bigquery/example_bigquery_operations.py[source]

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    destination_project_dataset_table=f"{DATASET_NAME}.external_table",
    bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

Or you may point the operator to a Google Cloud Storage object name where the schema is stored.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

update_table_schema_json = BigQueryCreateEmptyTableOperator(
    task_id="update_table_schema_json",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)

To use BigQuery schema auto-detection, set the autodetect flag instead of providing explicit schema information.

Fetch data from table

To fetch data from a BigQuery table you can use BigQueryGetDataOperator . Alternatively you can fetch data for selected columns if you pass fields to selected_fields.

The result of this operator can be retrieved in two different formats based on the value of the as_dict parameter: False (default) - A Python list of lists, where the number of elements in the nesting list will be equal to the number of rows fetched. Each element in the nesting will a nested list where elements would represent the column values for that row. True - A Python list of dictionaries, where each dictionary represents a row. In each dictionary, the keys are the column names and the values are the corresponding values for those columns.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

The below example shows how to use BigQueryGetDataOperator in async (deferrable) mode. Note that a deferrable task requires the Triggerer to be running on your Airflow deployment.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[source]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=LOCATION,
    deferrable=True,
)

Upsert table

To upsert a table you can use BigQueryUpsertTableOperator.

This operator either updates the existing table or creates a new, empty table in the given dataset.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

Update table schema

To update the schema of a table you can use BigQueryUpdateTableSchemaOperator.

This operator updates the schema field values supplied, while leaving the rest unchanged. This is useful for instance to set new field descriptions on an existing table schema.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

Delete table

To delete an existing table you can use BigQueryDeleteTableOperator.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

You can also use this operator to delete a view.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)

You can also use this operator to delete a materialized view.

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[source]

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

Execute BigQuery jobs

Let’s say you would like to execute the following query.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

    INSERT_ROWS_QUERY = (
        f"INSERT {DATASET}.{TABLE_1} VALUES "
        f"(42, 'monty python', '{INSERT_DATE}'), "
        f"(42, 'fishy fish', '{INSERT_DATE}');"
    )

To execute the SQL query in a specific BigQuery database you can use BigQueryInsertJobOperator with proper query job configuration that can be Jinja templated.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=location,
)

The below example shows how to use BigQueryInsertJobOperator in async (deferrable) mode. Note that a deferrable task requires the Triggerer to be running on your Airflow deployment.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[source]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

For more information on types of BigQuery job please check documentation.

If you want to include some files in your configuration you can use include clause of Jinja template language as follow:

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include QUERY_SQL_PATH %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

The included file can also use Jinja templates which can be useful in case of .sql files.

Additionally you can use job_id parameter of BigQueryInsertJobOperator to improve idempotency. If this parameter is not passed then uuid will be used as job_id. If provided then operator will try to submit a new job with this job_id`. If there’s already a job with such job_id then it will reattach to the existing job.

Also for all this action you can use operator in the deferrable mode:

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[source]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

Validate data

Check if query result has data

To perform checks against BigQuery you can use BigQueryCheckOperator

This operator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

Also you can use deferrable mode in this operator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[source]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

Compare query result to pass value

To perform a simple value check using sql code you can use BigQueryValueCheckOperator

These operators expects a sql query that will return a single row. Each value on that first row is evaluated against pass_value which can be either a string or numeric value. If numeric, you can also specify tolerance.

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

Also you can use deferrable mode in this operator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[source]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    pass_value=2,
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

Compare metrics over time

To check that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before you can either use BigQueryIntervalCheckOperator or BigQueryIntervalCheckAsyncOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=location,
)

Also you can use deferrable mode in this operator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[source]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

Check columns with predefined tests

To check that columns pass user-configurable tests you can use BigQueryColumnCheckOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

    column_check = BigQueryColumnCheckOperator(
        task_id="column_check",
        table=f"{DATASET}.{TABLE_1}",
        column_mapping={"value": {"null_check": {"equal_to": 0}}},
    )

Check table level data quality

To check that tables pass user-defined tests you can use BigQueryTableCheckOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[source]

    table_check = BigQueryTableCheckOperator(
        task_id="table_check",
        table=f"{DATASET}.{TABLE_1}",
        checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
    )

Sensors

Check that a Table exists

To check that a table exists you can define a sensor operator. This allows delaying execution of downstream operators until a table exist. If the table is sharded on dates you can for instance use the {{ ds_nodash }} macro as the table name suffix.

BigQueryTableExistenceSensor.

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[source]

check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[source]

check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
    task_id="check_table_exists_defered",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    deferrable=True,
)

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[source]

check_table_exists_async = BigQueryTableExistenceAsyncSensor(
    task_id="check_table_exists_async",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

Check that a Table Partition exists

To check that a table exists and has a partition you can use. BigQueryTablePartitionExistenceSensor.

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[source]

check_table_partition_exists: BaseSensorOperator = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

For DAY partitioned tables, the partition_id parameter is a string on the “%Y%m%d” format

Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[source]

check_table_partition_exists: BaseSensorOperator = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_defered",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
    deferrable=True,
)

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[source]

check_table_partition_exists_async: BaseSensorOperator = BigQueryTableExistencePartitionAsyncSensor(
    task_id="check_table_partition_exists_async",
    partition_id=PARTITION_NAME,
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

Reference

For further information, look at:

Was this entry helpful?