Source code for tests.system.google.cloud.bigquery.example_bigquery_queries
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Google BigQuery service."""from__future__importannotationsimportosfromdatetimeimportdatetimefrompathlibimportPathfromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.bigqueryimport(BigQueryCheckOperator,BigQueryColumnCheckOperator,BigQueryCreateEmptyDatasetOperator,BigQueryCreateTableOperator,BigQueryDeleteDatasetOperator,BigQueryGetDataOperator,BigQueryInsertJobOperator,BigQueryIntervalCheckOperator,BigQueryTableCheckOperator,BigQueryValueCheckOperator,)fromairflow.providers.standard.operators.bashimportBashOperatorfromairflow.utils.trigger_ruleimportTriggerRulefromsystem.googleimportDEFAULT_GCP_SYSTEM_TEST_PROJECT_IDfromsystem.openlineage.operatorimportOpenLineageTestOperator
create_table_1=BigQueryCreateTableOperator(task_id="create_table_1",dataset_id=DATASET_NAME,table_id=TABLE_1,table_resource={"schema":{"fields":SCHEMA},},)create_table_2=BigQueryCreateTableOperator(task_id="create_table_2",dataset_id=DATASET_NAME,table_id=TABLE_2,table_resource={"schema":{"fields":SCHEMA},},)# [START howto_operator_bigquery_insert_job]insert_query_job=BigQueryInsertJobOperator(task_id="insert_query_job",configuration={"query":{"query":INSERT_ROWS_QUERY,"useLegacySql":False,"priority":"BATCH",}},)# [END howto_operator_bigquery_insert_job]# [START howto_operator_bigquery_select_job]select_query_job=BigQueryInsertJobOperator(task_id="select_query_job",configuration={"query":{"query":"{% include QUERY_SQL_PATH %}","useLegacySql":False,}},)# [END howto_operator_bigquery_select_job]execute_insert_query=BigQueryInsertJobOperator(task_id="execute_insert_query",configuration={"query":{"query":INSERT_ROWS_QUERY,"useLegacySql":False,}},)execute_query_save=BigQueryInsertJobOperator(task_id="execute_query_save",configuration={"query":{"query":f"SELECT * FROM {DATASET_NAME}.{TABLE_1}","useLegacySql":False,"destinationTable":{"projectId":PROJECT_ID,"datasetId":DATASET_NAME,"tableId":TABLE_2,},}},)bigquery_execute_multi_query=BigQueryInsertJobOperator(task_id="execute_multi_query",configuration={"query":{"query":[f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",],"useLegacySql":False,}},)# [START howto_operator_bigquery_get_data]get_data=BigQueryGetDataOperator(task_id="get_data",dataset_id=DATASET_NAME,table_id=TABLE_1,max_results=10,selected_fields="value,name",)# [END howto_operator_bigquery_get_data]get_data_result=BashOperator(task_id="get_data_result",bash_command=f"echo {get_data.output}",)# [START howto_operator_bigquery_check]check_count=BigQueryCheckOperator(task_id="check_count",sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",use_legacy_sql=False,)# [END howto_operator_bigquery_check]# [START howto_operator_bigquery_value_check]check_value=BigQueryValueCheckOperator(task_id="check_value",sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",pass_value=4,use_legacy_sql=False,)# [END howto_operator_bigquery_value_check]# [START howto_operator_bigquery_interval_check]check_interval=BigQueryIntervalCheckOperator(task_id="check_interval",table=f"{DATASET_NAME}.{TABLE_1}",days_back=1,metrics_thresholds={"COUNT(*)":1.5},use_legacy_sql=False,)# [END howto_operator_bigquery_interval_check]# [START howto_operator_bigquery_column_check]column_check=BigQueryColumnCheckOperator(task_id="column_check",table=f"{DATASET_NAME}.{TABLE_1}",column_mapping={"value":{"null_check":{"equal_to":0}}},)# [END howto_operator_bigquery_column_check]# [START howto_operator_bigquery_table_check]table_check=BigQueryTableCheckOperator(task_id="table_check",table=f"{DATASET_NAME}.{TABLE_1}",checks={"row_count_check":{"check_statement":"COUNT(*) = 4"}},)# [END howto_operator_bigquery_table_check]delete_dataset=BigQueryDeleteDatasetOperator(task_id="delete_dataset",dataset_id=DATASET_NAME,delete_contents=True,trigger_rule=TriggerRule.ALL_DONE,)check_openlineage_events=OpenLineageTestOperator(task_id="check_openlineage_events",file_path=str(Path(__file__).parent/"resources"/"openlineage"/"bigquery_queries.json"),)# TEST SETUPcreate_dataset>>[create_table_1,create_table_2]# TEST BODY[create_table_1,create_table_2]>>insert_query_job>>[select_query_job,execute_insert_query]execute_insert_query>>get_data>>get_data_result>>delete_datasetexecute_insert_query>>execute_query_save>>bigquery_execute_multi_query>>delete_datasetexecute_insert_query>>[check_count,check_value,check_interval]>>delete_datasetexecute_insert_query>>[column_check,table_check]>>delete_datasetdelete_dataset>>check_openlineage_eventsfromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)