Google Cloud BigQuery Operators¶
BigQuery is Google’s fully managed, petabyte scale, low cost analytics data warehouse. It is a serverless Software as a Service (SaaS) that doesn’t need a database administrator. It allows users to focus on analyzing data to find meaningful insights using familiar SQL.
Airflow provides operators to manage datasets and tables, run queries and validate data.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Manage datasets¶
Create dataset¶
To create an empty dataset in a BigQuery database you can use
BigQueryCreateEmptyDatasetOperator
.
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
Get dataset details¶
To get the details of an existing dataset you can use
BigQueryGetDatasetOperator
.
This operator returns a Dataset Resource.
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
List tables in dataset¶
To retrieve the list of tables in a given dataset use
BigQueryGetDatasetTablesOperator
.
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
Update table¶
To update a table in BigQuery you can use
BigQueryUpdateTableOperator
.
The update method replaces the entire Table resource, whereas the patch method only replaces fields that are provided in the submitted Table resource.
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=["friendlyName", "description"],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
Update dataset¶
To update a dataset in BigQuery you can use
BigQueryUpdateDatasetOperator
.
The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource.
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
Delete dataset¶
To delete an existing dataset from a BigQuery database you can use
BigQueryDeleteDatasetOperator
.
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
Manage tables¶
Create native table¶
To create a new, empty table in the given BigQuery dataset, optionally with
schema you can use
BigQueryCreateEmptyTableOperator
.
The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google Cloud Storage object name. The object in Google Cloud Storage must be a JSON file with the schema fields in it.
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
You can use this operator to create a view on top of an existing table.
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
)
You can also use this operator to create a materialized view that periodically cache results of a query for increased performance and efficiency.
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
materialized_view={
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 2000000,
},
)
Create external table¶
To create a new external table in a dataset with the data in Google Cloud Storage
you can use
BigQueryCreateExternalTableOperator
.
Similarly to
BigQueryCreateEmptyTableOperator
you can directly pass the schema fields in.
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table=f"{DATASET_NAME}.external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
Or you may point the operator to a Google Cloud Storage object name where the schema is stored.
update_table_schema_json = BigQueryCreateEmptyTableOperator(
task_id="update_table_schema_json",
dataset_id=DATASET_NAME,
table_id="test_table",
gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)
To use BigQuery schema auto-detection,
set the autodetect
flag instead of providing explicit schema information.
Fetch data from table¶
To fetch data from a BigQuery table you can use
BigQueryGetDataOperator
.
Alternatively you can fetch data for selected columns if you pass fields to
selected_fields
.
The result of this operator can be retrieved in two different formats based on the value of the as_dict
parameter:
False
(default) - A Python list of lists, where the number of elements in the nesting list will be equal to the number of rows fetched. Each element in the
nesting will a nested list where elements would represent the column values for
that row.
True
- A Python list of dictionaries, where each dictionary represents a row. In each dictionary, the keys are the column names and the values are the corresponding values for those columns.
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=location,
)
The below example shows how to use
BigQueryGetDataOperator
in async (deferrable) mode. Note that a deferrable task requires the Triggerer to be
running on your Airflow deployment.
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME_1,
use_legacy_sql=False,
max_results=10,
selected_fields="value",
location=LOCATION,
deferrable=True,
)
Upsert table¶
To upsert a table you can use
BigQueryUpsertTableOperator
.
This operator either updates the existing table or creates a new, empty table in the given dataset.
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
Update table schema¶
To update the schema of a table you can use
BigQueryUpdateTableSchemaOperator
.
This operator updates the schema field values supplied, while leaving the rest unchanged. This is useful for instance to set new field descriptions on an existing table schema.
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
Delete table¶
To delete an existing table you can use
BigQueryDeleteTableOperator
.
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
You can also use this operator to delete a view.
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)
You can also use this operator to delete a materialized view.
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
Execute BigQuery jobs¶
Let’s say you would like to execute the following query.
INSERT_ROWS_QUERY = (
f"INSERT {DATASET}.{TABLE_1} VALUES "
f"(42, 'monty python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
To execute the SQL query in a specific BigQuery database you can use
BigQueryInsertJobOperator
with proper query job configuration that can be Jinja templated.
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=location,
)
The below example shows how to use
BigQueryInsertJobOperator
in async (deferrable) mode. Note that a deferrable task requires the Triggerer to be
running on your Airflow deployment.
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
For more information on types of BigQuery job please check documentation.
If you want to include some files in your configuration you can use include
clause of Jinja template
language as follow:
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include QUERY_SQL_PATH %}",
"useLegacySql": False,
}
},
location=location,
)
The included file can also use Jinja templates which can be useful in case of .sql
files.
Additionally you can use job_id
parameter of
BigQueryInsertJobOperator
to improve
idempotency. If this parameter is not passed then uuid will be used as job_id
. If provided then
operator will try to submit a new job with this job_id`
. If there’s already a job with such job_id
then it will reattach to the existing job.
Also for all this action you can use operator in the deferrable mode:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
Validate data¶
Check if query result has data¶
To perform checks against BigQuery you can use
BigQueryCheckOperator
This operator expects a sql query that will return a single row. Each value on
that first row is evaluated using python bool
casting. If any of the values
return False
the check is failed and errors out.
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
use_legacy_sql=False,
location=location,
)
Also you can use deferrable mode in this operator
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
Compare query result to pass value¶
To perform a simple value check using sql code you can use
BigQueryValueCheckOperator
These operators expects a sql query that will return a single row. Each value on
that first row is evaluated against pass_value
which can be either a string
or numeric value. If numeric, you can also specify tolerance
.
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
location=location,
)
Also you can use deferrable mode in this operator
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
Compare metrics over time¶
To check that the values of metrics given as SQL expressions are within a certain
tolerance of the ones from days_back
before you can either use
BigQueryIntervalCheckOperator
or
BigQueryIntervalCheckAsyncOperator
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=location,
)
Also you can use deferrable mode in this operator
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
Check columns with predefined tests¶
To check that columns pass user-configurable tests you can use
BigQueryColumnCheckOperator
column_check = BigQueryColumnCheckOperator(
task_id="column_check",
table=f"{DATASET}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
)
Check table level data quality¶
To check that tables pass user-defined tests you can use
BigQueryTableCheckOperator
table_check = BigQueryTableCheckOperator(
task_id="table_check",
table=f"{DATASET}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)
Sensors¶
Check that a Table exists¶
To check that a table exists you can define a sensor operator. This allows delaying execution
of downstream operators until a table exist. If the table is sharded on dates you can for instance
use the {{ ds_nodash }}
macro as the table name suffix.
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.
check_table_exists_def = BigQueryTableExistenceSensor(
task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
deferrable=True,
)
check_table_exists_async = BigQueryTableExistenceAsyncSensor(
task_id="check_table_exists_async",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
Check that a Table Partition exists¶
To check that a table exists and has a partition you can use.
BigQueryTablePartitionExistenceSensor
.
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
For DAY partitioned tables, the partition_id parameter is a string on the “%Y%m%d” format
Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.
check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
deferrable=True,
)
check_table_partition_exists_async = BigQueryTableExistencePartitionAsyncSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)