Source code for airflow.providers.google.cloud.example_dags.example_bigtable
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG that creates and performs following operations on Cloud Bigtable:- creates an Instance- creates a Table- updates Cluster- waits for Table replication completeness- deletes the Table- deletes the InstanceThis DAG relies on the following environment variables:* GCP_PROJECT_ID - Google Cloud project* CBT_INSTANCE_ID - desired ID of a Cloud Bigtable instance* CBT_INSTANCE_DISPLAY_NAME - desired human-readable display name of the Instance* CBT_INSTANCE_TYPE - type of the Instance, e.g. 1 for DEVELOPMENT See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance # noqa E501* CBT_INSTANCE_LABELS - labels to add for the Instance* CBT_CLUSTER_ID - desired ID of the main Cluster created for the Instance* CBT_CLUSTER_ZONE - zone in which main Cluster will be created. e.g. europe-west1-b See available zones: https://cloud.google.com/bigtable/docs/locations* CBT_CLUSTER_NODES - initial amount of nodes of the Cluster* CBT_CLUSTER_NODES_UPDATED - amount of nodes for BigtableClusterUpdateOperator* CBT_CLUSTER_STORAGE_TYPE - storage for the Cluster, e.g. 1 for SSD See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster # noqa E501* CBT_TABLE_ID - desired ID of the Table* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check"""importjsonfromosimportgetenvfromairflowimportmodelsfromairflow.providers.google.cloud.operators.bigtableimport(BigtableCreateInstanceOperator,BigtableCreateTableOperator,BigtableDeleteInstanceOperator,BigtableDeleteTableOperator,BigtableUpdateClusterOperator,BigtableUpdateInstanceOperator,)fromairflow.providers.google.cloud.sensors.bigtableimportBigtableTableReplicationCompletedSensorfromairflow.utils.datesimportdays_agoGCP_PROJECT_ID=getenv('GCP_PROJECT_ID','example-project')CBT_INSTANCE_ID=getenv('GCP_BIG_TABLE_INSTANCE_ID','some-instance-id')CBT_INSTANCE_DISPLAY_NAME=getenv('GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME','Human-readable name')CBT_INSTANCE_DISPLAY_NAME_UPDATED=getenv("GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME_UPDATED",f"{CBT_INSTANCE_DISPLAY_NAME} - updated")CBT_INSTANCE_TYPE=getenv('GCP_BIG_TABLE_INSTANCE_TYPE','2')CBT_INSTANCE_TYPE_PROD=getenv('GCP_BIG_TABLE_INSTANCE_TYPE_PROD','1')CBT_INSTANCE_LABELS=getenv('GCP_BIG_TABLE_INSTANCE_LABELS','{}')CBT_INSTANCE_LABELS_UPDATED=getenv('GCP_BIG_TABLE_INSTANCE_LABELS_UPDATED','{"env": "prod"}')CBT_CLUSTER_ID=getenv('GCP_BIG_TABLE_CLUSTER_ID','some-cluster-id')CBT_CLUSTER_ZONE=getenv('GCP_BIG_TABLE_CLUSTER_ZONE','europe-west1-b')CBT_CLUSTER_NODES=getenv('GCP_BIG_TABLE_CLUSTER_NODES','3')CBT_CLUSTER_NODES_UPDATED=getenv('GCP_BIG_TABLE_CLUSTER_NODES_UPDATED','5')CBT_CLUSTER_STORAGE_TYPE=getenv('GCP_BIG_TABLE_CLUSTER_STORAGE_TYPE','2')CBT_TABLE_ID=getenv('GCP_BIG_TABLE_TABLE_ID','some-table-id')CBT_POKE_INTERVAL=getenv('GCP_BIG_TABLE_POKE_INTERVAL','60')withmodels.DAG('example_gcp_bigtable_operators',schedule_interval=None,# Override to match your needsstart_date=days_ago(1),tags=['example'],)asdag:# [START howto_operator_gcp_bigtable_instance_create]create_instance_task=BigtableCreateInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=CBT_INSTANCE_ID,main_cluster_id=CBT_CLUSTER_ID,main_cluster_zone=CBT_CLUSTER_ZONE,instance_display_name=CBT_INSTANCE_DISPLAY_NAME,instance_type=int(CBT_INSTANCE_TYPE),instance_labels=json.loads(CBT_INSTANCE_LABELS),cluster_nodes=None,cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),task_id='create_instance_task',)create_instance_task2=BigtableCreateInstanceOperator(instance_id=CBT_INSTANCE_ID,main_cluster_id=CBT_CLUSTER_ID,main_cluster_zone=CBT_CLUSTER_ZONE,instance_display_name=CBT_INSTANCE_DISPLAY_NAME,instance_type=int(CBT_INSTANCE_TYPE),instance_labels=json.loads(CBT_INSTANCE_LABELS),cluster_nodes=int(CBT_CLUSTER_NODES),cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),task_id='create_instance_task2',)create_instance_task>>create_instance_task2# [END howto_operator_gcp_bigtable_instance_create]# [START howto_operator_gcp_bigtable_instance_update]update_instance_task=BigtableUpdateInstanceOperator(instance_id=CBT_INSTANCE_ID,instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,instance_type=int(CBT_INSTANCE_TYPE_PROD),instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),task_id='update_instance_task',)# [END howto_operator_gcp_bigtable_instance_update]# [START howto_operator_gcp_bigtable_cluster_update]cluster_update_task=BigtableUpdateClusterOperator(project_id=GCP_PROJECT_ID,instance_id=CBT_INSTANCE_ID,cluster_id=CBT_CLUSTER_ID,nodes=int(CBT_CLUSTER_NODES_UPDATED),task_id='update_cluster_task',)cluster_update_task2=BigtableUpdateClusterOperator(instance_id=CBT_INSTANCE_ID,cluster_id=CBT_CLUSTER_ID,nodes=int(CBT_CLUSTER_NODES_UPDATED),task_id='update_cluster_task2',)cluster_update_task>>cluster_update_task2# [END howto_operator_gcp_bigtable_cluster_update]# [START howto_operator_gcp_bigtable_instance_delete]delete_instance_task=BigtableDeleteInstanceOperator(project_id=GCP_PROJECT_ID,instance_id=CBT_INSTANCE_ID,task_id='delete_instance_task',)delete_instance_task2=BigtableDeleteInstanceOperator(instance_id=CBT_INSTANCE_ID,task_id='delete_instance_task2',)# [END howto_operator_gcp_bigtable_instance_delete]# [START howto_operator_gcp_bigtable_table_create]create_table_task=BigtableCreateTableOperator(project_id=GCP_PROJECT_ID,instance_id=CBT_INSTANCE_ID,table_id=CBT_TABLE_ID,task_id='create_table',)create_table_task2=BigtableCreateTableOperator(instance_id=CBT_INSTANCE_ID,table_id=CBT_TABLE_ID,task_id='create_table_task2',)create_table_task>>create_table_task2# [END howto_operator_gcp_bigtable_table_create]# [START howto_operator_gcp_bigtable_table_wait_for_replication]wait_for_table_replication_task=BigtableTableReplicationCompletedSensor(project_id=GCP_PROJECT_ID,instance_id=CBT_INSTANCE_ID,table_id=CBT_TABLE_ID,poke_interval=int(CBT_POKE_INTERVAL),timeout=180,task_id='wait_for_table_replication_task',)wait_for_table_replication_task2=BigtableTableReplicationCompletedSensor(instance_id=CBT_INSTANCE_ID,table_id=CBT_TABLE_ID,poke_interval=int(CBT_POKE_INTERVAL),timeout=180,task_id='wait_for_table_replication_task2',)# [END howto_operator_gcp_bigtable_table_wait_for_replication]# [START howto_operator_gcp_bigtable_table_delete]delete_table_task=BigtableDeleteTableOperator(project_id=GCP_PROJECT_ID,instance_id=CBT_INSTANCE_ID,table_id=CBT_TABLE_ID,task_id='delete_table_task',)delete_table_task2=BigtableDeleteTableOperator(instance_id=CBT_INSTANCE_ID,table_id=CBT_TABLE_ID,task_id='delete_table_task2',)# [END howto_operator_gcp_bigtable_table_delete]wait_for_table_replication_task>>delete_table_taskwait_for_table_replication_task2>>delete_table_taskwait_for_table_replication_task>>delete_table_task2wait_for_table_replication_task2>>delete_table_task2create_instance_task>>create_table_task>>cluster_update_taskcluster_update_task>>update_instance_task>>delete_table_taskcreate_instance_task2>>create_table_task2>>cluster_update_task2>>delete_table_task2# Only delete instances after all tables are deleted[delete_table_task,delete_table_task2]>>delete_instance_task>>delete_instance_task2