Source code for airflow.providers.google.cloud.example_dags.example_spanner
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG that creates, updates, queries and deletes a Cloud Spanner instance.This DAG relies on the following environment variables* GCP_PROJECT_ID - Google Cloud project for the Cloud Spanner instance.* GCP_SPANNER_INSTANCE_ID - Cloud Spanner instance ID.* GCP_SPANNER_DATABASE_ID - Cloud Spanner database ID.* GCP_SPANNER_CONFIG_NAME - The name of the instance's configuration. Values are of the form ``projects/<gcp_project>/instanceConfigs/<configuration>``. See also: https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs* GCP_SPANNER_NODE_COUNT - Number of nodes allocated to the instance.* GCP_SPANNER_DISPLAY_NAME - The descriptive name for this instance as it appears in UIs. Must be unique per project and between 4 and 30 characters in length."""importosfromairflowimportmodelsfromairflow.providers.google.cloud.operators.spannerimport(SpannerDeleteDatabaseInstanceOperator,SpannerDeleteInstanceOperator,SpannerDeployDatabaseInstanceOperator,SpannerDeployInstanceOperator,SpannerQueryDatabaseInstanceOperator,SpannerUpdateDatabaseInstanceOperator,)fromairflow.utils.datesimportdays_agoGCP_PROJECT_ID=os.environ.get('GCP_PROJECT_ID','example-project')GCP_SPANNER_INSTANCE_ID=os.environ.get('GCP_SPANNER_INSTANCE_ID','testinstance')GCP_SPANNER_DATABASE_ID=os.environ.get('GCP_SPANNER_DATABASE_ID','testdatabase')GCP_SPANNER_CONFIG_NAME=os.environ.get('GCP_SPANNER_CONFIG_NAME',f'projects/{GCP_PROJECT_ID}/instanceConfigs/regional-europe-west3')GCP_SPANNER_NODE_COUNT=os.environ.get('GCP_SPANNER_NODE_COUNT','1')GCP_SPANNER_DISPLAY_NAME=os.environ.get('GCP_SPANNER_DISPLAY_NAME','Test Instance')# OPERATION_ID should be unique per operationOPERATION_ID='unique_operation_id'withmodels.DAG('example_gcp_spanner',schedule_interval='@once',# Override to match your needsstart_date=days_ago(1),tags=['example'],)asdag:# Create# [START howto_operator_spanner_deploy]spanner_instance_create_task=SpannerDeployInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,configuration_name=GCP_SPANNER_CONFIG_NAME,node_count=int(GCP_SPANNER_NODE_COUNT),display_name=GCP_SPANNER_DISPLAY_NAME,task_id='spanner_instance_create_task',)spanner_instance_update_task=SpannerDeployInstanceOperator(instance_id=GCP_SPANNER_INSTANCE_ID,configuration_name=GCP_SPANNER_CONFIG_NAME,node_count=int(GCP_SPANNER_NODE_COUNT)+1,display_name=GCP_SPANNER_DISPLAY_NAME+'_updated',task_id='spanner_instance_update_task',)# [END howto_operator_spanner_deploy]# [START howto_operator_spanner_database_deploy]spanner_database_deploy_task=SpannerDeployDatabaseInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,ddl_statements=["CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)","CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",],task_id='spanner_database_deploy_task',)spanner_database_deploy_task2=SpannerDeployDatabaseInstanceOperator(instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,ddl_statements=["CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)","CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",],task_id='spanner_database_deploy_task2',)# [END howto_operator_spanner_database_deploy]# [START howto_operator_spanner_database_update]spanner_database_update_task=SpannerUpdateDatabaseInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,ddl_statements=["CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",],task_id='spanner_database_update_task',)# [END howto_operator_spanner_database_update]# [START howto_operator_spanner_database_update_idempotent]spanner_database_update_idempotent1_task=SpannerUpdateDatabaseInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,operation_id=OPERATION_ID,ddl_statements=["CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",],task_id='spanner_database_update_idempotent1_task',)spanner_database_update_idempotent2_task=SpannerUpdateDatabaseInstanceOperator(instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,operation_id=OPERATION_ID,ddl_statements=["CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",],task_id='spanner_database_update_idempotent2_task',)# [END howto_operator_spanner_database_update_idempotent]# [START howto_operator_spanner_query]spanner_instance_query_task=SpannerQueryDatabaseInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,query=["DELETE FROM my_table2 WHERE true"],task_id='spanner_instance_query_task',)spanner_instance_query_task2=SpannerQueryDatabaseInstanceOperator(instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,query=["DELETE FROM my_table2 WHERE true"],task_id='spanner_instance_query_task2',)# [END howto_operator_spanner_query]# [START howto_operator_spanner_database_delete]spanner_database_delete_task=SpannerDeleteDatabaseInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,task_id='spanner_database_delete_task',)spanner_database_delete_task2=SpannerDeleteDatabaseInstanceOperator(instance_id=GCP_SPANNER_INSTANCE_ID,database_id=GCP_SPANNER_DATABASE_ID,task_id='spanner_database_delete_task2',)# [END howto_operator_spanner_database_delete]# [START howto_operator_spanner_delete]spanner_instance_delete_task=SpannerDeleteInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=GCP_SPANNER_INSTANCE_ID,task_id='spanner_instance_delete_task')spanner_instance_delete_task2=SpannerDeleteInstanceOperator(instance_id=GCP_SPANNER_INSTANCE_ID,task_id='spanner_instance_delete_task2')# [END howto_operator_spanner_delete](spanner_instance_create_task>>spanner_instance_update_task>>spanner_database_deploy_task>>spanner_database_deploy_task2>>spanner_database_update_task>>spanner_database_update_idempotent1_task>>spanner_database_update_idempotent2_task>>spanner_instance_query_task>>spanner_instance_query_task2>>spanner_database_delete_task>>spanner_database_delete_task2>>spanner_instance_delete_task>>spanner_instance_delete_task2)