Google Cloud BigQuery Operators¶
BigQuery is Google's fully managed, petabyte scale, low cost analytics data warehouse. It is a serverless Software as a Service (SaaS) that doesn't need a database administrator. It allows users to focus on analyzing data to find meaningful insights using familiar SQL.
Airflow provides operators to manage datasets and tables, run queries and validate data.
Manage datasets¶
Create dataset¶
To create an empty dataset in a BigQuery database you can use
BigQueryCreateEmptyDatasetOperator
.
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)
Get dataset details¶
To get the details of an existing dataset you can use
BigQueryGetDatasetOperator
.
This operator returns a Dataset Resource.
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
List tables in dataset¶
To retrieve the list of tables in a given dataset use
BigQueryGetDatasetTablesOperator
.
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
Update table¶
To update a table in BigQuery you can use
BigQueryUpdateTableOperator
.
The update method replaces the entire Table resource, whereas the patch method only replaces fields that are provided in the submitted Table resource.
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
Update dataset¶
To update a dataset in BigQuery you can use
BigQueryUpdateDatasetOperator
.
The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource.
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
Delete dataset¶
To delete an existing dataset from a BigQuery database you can use
BigQueryDeleteDatasetOperator
.
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
Manage tables¶
Create native table¶
To create a new, empty table in the given BigQuery dataset, optionally with
schema you can use
BigQueryCreateEmptyTableOperator
.
The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google Cloud Storage object name. The object in Google Cloud Storage must be a JSON file with the schema fields in it.
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
You can use this operator to create a view on top of an existing table.
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
)
You can also use this operator to create a materialized view that periodically cache results of a query for increased performance and efficiency.
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
materialized_view={
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 2000000,
},
)
Create external table¶
To create a new external table in a dataset with the data in Google Cloud Storage
you can use
BigQueryCreateExternalTableOperator
.
Similarly to
BigQueryCreateEmptyTableOperator
you may either directly pass the schema fields in, or you may point the operator
to a Google Cloud Storage object name.
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": "external_table",
},
"schema": {
"fields": [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
},
"externalDataConfiguration": {
"sourceFormat": "CSV",
"compression": "NONE",
"csvOptions": {"skipLeadingRows": 1},
"sourceUris": [DATA_SAMPLE_GCS_URL],
},
},
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
)
Fetch data from table¶
To fetch data from a BigQuery table you can use
BigQueryGetDataOperator
.
Alternatively you can fetch data for selected columns if you pass fields to
selected_fields
.
This operator returns data in a Python list where the number of elements in the returned list will be equal to the number of rows fetched. Each element in the list will again be a list where elements would represent the column values for that row.
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=location,
)
Upsert table¶
To upsert a table you can use
BigQueryUpsertTableOperator
.
This operator either updates the existing table or creates a new, empty table in the given dataset.
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
Update table schema¶
To update the schema of a table you can use
BigQueryUpdateTableSchemaOperator
.
This operator updates the schema field values supplied, while leaving the rest unchanged. This is useful for instance to set new field descriptions on an existing table schema.
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
Delete table¶
To delete an existing table you can use
BigQueryDeleteTableOperator
.
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
You can also use this operator to delete a view.
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view", deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view"
)
You can also use this operator to delete a materialized view.
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
Execute BigQuery jobs¶
Let's say you would like to execute the following query.
INSERT_ROWS_QUERY = (
f"INSERT {DATASET}.{TABLE_1} VALUES "
f"(42, 'monthy python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
To execute the SQL query in a specific BigQuery database you can use
BigQueryInsertJobOperator
with
proper query job configuration that can be Jinja templated.
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
}
},
location=location,
)
For more information on types of BigQuery job please check documentation.
If you want to include some files in your configuration you can use include
clause of Jinja template
language as follow:
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include 'example_bigquery_query.sql' %}",
"useLegacySql": False,
}
},
location=location,
)
The included file can also use Jinja templates which can be useful in case of .sql
files.
Additionally you can use job_id
parameter of
BigQueryInsertJobOperator
to improve
idempotency. If this parameter is not passed then uuid will be used as job_id
. If provided then
operator will try to submit a new job with this job_id`
. If there's already a job with such job_id
then it will reattach to the existing job.
Validate data¶
Check if query result has data¶
To perform checks against BigQuery you can use
BigQueryCheckOperator
.
This operator expects a sql query that will return a single row. Each value on
that first row is evaluated using python bool
casting. If any of the values
return False
the check is failed and errors out.
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
use_legacy_sql=False,
location=location,
)
Compare query result to pass value¶
To perform a simple value check using sql code you can use
BigQueryValueCheckOperator
.
This operator expects a sql query that will return a single row. Each value on
that first row is evaluated against pass_value
which can be either a string
or numeric value. If numeric, you can also specify tolerance
.
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
location=location,
)
Compare metrics over time¶
To check that the values of metrics given as SQL expressions are within a certain
tolerance of the ones from days_back
before you can use
BigQueryIntervalCheckOperator
.
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=location,
)
Sensors¶
Check that a Table exists¶
To check that a table exists you can define a sensor operator. This allows delaying execution
of downstream operators until a table exist. If the table is sharded on dates you can for instance
use the {{ ds_nodash }}
macro as the table name suffix.
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
Check that a Table Partition exists¶
To check that a table exists and has a partition you can use.
BigQueryTablePartitionExistenceSensor
.
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
For DAY partitioned tables, the partition_id parameter is a string on the "%Y%m%d" format