# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that creates and performs following operations on Cloud Bigtable:
- creates an Instance
- creates a Table
- updates Cluster
- waits for Table replication completeness
- deletes the Table
- deletes the Instance
This DAG relies on the following environment variables:
* GCP_PROJECT_ID - Google Cloud project
* CBT_INSTANCE_ID - desired ID of a Cloud Bigtable instance
* CBT_INSTANCE_DISPLAY_NAME - desired human-readable display name of the Instance
* CBT_INSTANCE_TYPE - type of the Instance, e.g. 1 for DEVELOPMENT
    See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance
* CBT_INSTANCE_LABELS - labels to add for the Instance
* CBT_CLUSTER_ID - desired ID of the main Cluster created for the Instance
* CBT_CLUSTER_ZONE - zone in which main Cluster will be created. e.g. europe-west1-b
    See available zones: https://cloud.google.com/bigtable/docs/locations
* CBT_CLUSTER_NODES - initial amount of nodes of the Cluster
* CBT_CLUSTER_NODES_UPDATED - amount of nodes for BigtableClusterUpdateOperator
* CBT_CLUSTER_STORAGE_TYPE - storage for the Cluster, e.g. 1 for SSD
    See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster
* CBT_TABLE_ID - desired ID of the Table
* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check
"""
from __future__ import annotations
import os
from datetime import datetime
from google.cloud.bigtable import enums
from airflow.decorators import task_group
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigtable import (
    BigtableCreateInstanceOperator,
    BigtableCreateTableOperator,
    BigtableDeleteInstanceOperator,
    BigtableDeleteTableOperator,
    BigtableUpdateClusterOperator,
    BigtableUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplicationCompletedSensor
from airflow.utils.trigger_rule import TriggerRule
from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
[docs]ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") 
[docs]PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID 
# CBT instance id str full length should be between [6,33], lowercase only, "_" symbol  is forbidden
[docs]CBT_INSTANCE_ID_BASE = f"inst-id-{str(ENV_ID)[:20]}".lower().replace("_", "-") 
[docs]CBT_INSTANCE_ID_1 = f"{CBT_INSTANCE_ID_BASE}-1" 
[docs]CBT_INSTANCE_ID_2 = f"{CBT_INSTANCE_ID_BASE}-2" 
[docs]CBT_INSTANCE_DISPLAY_NAME = "instance-name" 
[docs]CBT_INSTANCE_DISPLAY_NAME_UPDATED = f"{CBT_INSTANCE_DISPLAY_NAME}-updated" 
[docs]CBT_INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT 
[docs]CBT_INSTANCE_TYPE_PROD = enums.Instance.Type.PRODUCTION 
[docs]CBT_INSTANCE_LABELS: dict[str, str] = {} 
[docs]CBT_INSTANCE_LABELS_UPDATED = {"env": "prod"} 
[docs]CBT_CLUSTER_ID = "bigtable-cluster-id" 
[docs]CBT_CLUSTER_ZONE = "europe-west1-b" 
[docs]CBT_CLUSTER_NODES_UPDATED = 5 
[docs]CBT_CLUSTER_STORAGE_TYPE = enums.StorageType.HDD 
[docs]CBT_TABLE_ID = "bigtable-table-id" 
with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["bigtable", "example"],
) as dag:
    # [START howto_operator_gcp_bigtable_instance_create]
[docs]    create_instance_task = BigtableCreateInstanceOperator(
        project_id=PROJECT_ID,
        instance_id=CBT_INSTANCE_ID_1,
        main_cluster_id=CBT_CLUSTER_ID,
        main_cluster_zone=CBT_CLUSTER_ZONE,
        instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
        instance_type=CBT_INSTANCE_TYPE,  # type: ignore[arg-type]
        instance_labels=CBT_INSTANCE_LABELS,
        cluster_nodes=None,
        cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE,  # type: ignore[arg-type]
        task_id="create_instance_task",
    ) 
    # [END howto_operator_gcp_bigtable_instance_create]
    create_instance_task2 = BigtableCreateInstanceOperator(
        instance_id=CBT_INSTANCE_ID_2,
        main_cluster_id=CBT_CLUSTER_ID,
        main_cluster_zone=CBT_CLUSTER_ZONE,
        instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
        instance_type=CBT_INSTANCE_TYPE,  # type: ignore[arg-type]
        instance_labels=CBT_INSTANCE_LABELS,
        cluster_nodes=CBT_CLUSTER_NODES,
        cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE,  # type: ignore[arg-type]
        task_id="create_instance_task2",
    )
    @task_group()
    def create_tables():
        # [START howto_operator_gcp_bigtable_table_create]
        create_table_task = BigtableCreateTableOperator(
            project_id=PROJECT_ID,
            instance_id=CBT_INSTANCE_ID_1,
            table_id=CBT_TABLE_ID,
            task_id="create_table",
        )
        # [END howto_operator_gcp_bigtable_table_create]
        create_table_task2 = BigtableCreateTableOperator(
            instance_id=CBT_INSTANCE_ID_2,
            table_id=CBT_TABLE_ID,
            task_id="create_table_task2",
        )
        create_table_task >> create_table_task2
    @task_group()
    def update_clusters_and_instance():
        # [START howto_operator_gcp_bigtable_cluster_update]
        cluster_update_task = BigtableUpdateClusterOperator(
            project_id=PROJECT_ID,
            instance_id=CBT_INSTANCE_ID_1,
            cluster_id=CBT_CLUSTER_ID,
            nodes=CBT_CLUSTER_NODES_UPDATED,
            task_id="update_cluster_task",
        )
        # [END howto_operator_gcp_bigtable_cluster_update]
        cluster_update_task2 = BigtableUpdateClusterOperator(
            instance_id=CBT_INSTANCE_ID_2,
            cluster_id=CBT_CLUSTER_ID,
            nodes=CBT_CLUSTER_NODES_UPDATED,
            task_id="update_cluster_task2",
        )
        # [START howto_operator_gcp_bigtable_instance_update]
        update_instance_task = BigtableUpdateInstanceOperator(
            instance_id=CBT_INSTANCE_ID_1,
            instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
            instance_type=CBT_INSTANCE_TYPE_PROD,
            instance_labels=CBT_INSTANCE_LABELS_UPDATED,
            task_id="update_instance_task",
        )
        # [END howto_operator_gcp_bigtable_instance_update]
        [cluster_update_task, cluster_update_task2] >> update_instance_task
    # [START howto_operator_gcp_bigtable_table_wait_for_replication]
    wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
        instance_id=CBT_INSTANCE_ID_2,
        table_id=CBT_TABLE_ID,
        poke_interval=CBT_POKE_INTERVAL,
        timeout=180,
        task_id="wait_for_table_replication_task2",
    )
    # [END howto_operator_gcp_bigtable_table_wait_for_replication]
    # [START howto_operator_gcp_bigtable_table_delete]
    delete_table_task = BigtableDeleteTableOperator(
        project_id=PROJECT_ID,
        instance_id=CBT_INSTANCE_ID_1,
        table_id=CBT_TABLE_ID,
        task_id="delete_table_task",
    )
    # [END howto_operator_gcp_bigtable_table_delete]
    delete_table_task2 = BigtableDeleteTableOperator(
        instance_id=CBT_INSTANCE_ID_2,
        table_id=CBT_TABLE_ID,
        task_id="delete_table_task2",
    )
    delete_table_task.trigger_rule = TriggerRule.ALL_DONE
    delete_table_task2.trigger_rule = TriggerRule.ALL_DONE
    # [START howto_operator_gcp_bigtable_instance_delete]
    delete_instance_task = BigtableDeleteInstanceOperator(
        project_id=PROJECT_ID,
        instance_id=CBT_INSTANCE_ID_1,
        task_id="delete_instance_task",
    )
    # [END howto_operator_gcp_bigtable_instance_delete]
    delete_instance_task2 = BigtableDeleteInstanceOperator(
        instance_id=CBT_INSTANCE_ID_2,
        task_id="delete_instance_task2",
    )
    delete_instance_task.trigger_rule = TriggerRule.ALL_DONE
    delete_instance_task2.trigger_rule = TriggerRule.ALL_DONE
    (
        # TEST SETUP
        [create_instance_task, create_instance_task2]
        # TEST BODY
        >> create_tables()
        >> wait_for_table_replication_task
        >> update_clusters_and_instance()
        # TEST TEARDOWN
        >> delete_table_task
        >> delete_table_task2
        >> [delete_instance_task, delete_instance_task2]
    )
    from dev.tests_common.test_utils.watcher import watcher
    # This test needs watcher in order to properly mark success/failure
    # when "tearDown" task with trigger rule is part of the DAG
    list(dag.tasks) >> watcher()
from dev.tests_common.test_utils.system_tests import get_test_run  # noqa: E402
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
[docs]test_run = get_test_run(dag)