Google Cloud Bigtable Operators

Prerequisite Tasks

BigtableCreateInstanceOperator

Use the BigtableCreateInstanceOperator to create a Google Cloud Bigtable instance.

If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration and immediately succeeds. No changes are made to the existing instance.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

create_instance_task = BigtableCreateInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=CBT_INSTANCE_ID,
    main_cluster_id=CBT_CLUSTER_ID,
    main_cluster_zone=CBT_CLUSTER_ZONE,
    instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
    instance_type=int(CBT_INSTANCE_TYPE),
    instance_labels=json.loads(CBT_INSTANCE_LABELS),
    cluster_nodes=None,
    cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
    task_id='create_instance_task',
)
create_instance_task2 = BigtableCreateInstanceOperator(
    instance_id=CBT_INSTANCE_ID,
    main_cluster_id=CBT_CLUSTER_ID,
    main_cluster_zone=CBT_CLUSTER_ZONE,
    instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
    instance_type=int(CBT_INSTANCE_TYPE),
    instance_labels=json.loads(CBT_INSTANCE_LABELS),
    cluster_nodes=int(CBT_CLUSTER_NODES),
    cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
    task_id='create_instance_task2',
)
create_instance_task >> create_instance_task2

BigtableUpdateInstanceOperator

Use the BigtableUpdateInstanceOperator to update an existing Google Cloud Bigtable instance.

Only the following configuration can be updated for an existing instance: instance_display_name, instance_type and instance_labels.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

update_instance_task = BigtableUpdateInstanceOperator(
    instance_id=CBT_INSTANCE_ID,
    instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
    instance_type=int(CBT_INSTANCE_TYPE_PROD),
    instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),
    task_id='update_instance_task',
)

BigtableDeleteInstanceOperator

Use the BigtableDeleteInstanceOperator to delete a Google Cloud Bigtable instance.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

delete_instance_task = BigtableDeleteInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=CBT_INSTANCE_ID,
    task_id='delete_instance_task',
)
delete_instance_task2 = BigtableDeleteInstanceOperator(
    instance_id=CBT_INSTANCE_ID,
    task_id='delete_instance_task2',
)

BigtableUpdateClusterOperator

Use the BigtableUpdateClusterOperator to modify number of nodes in a Cloud Bigtable cluster.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

cluster_update_task = BigtableUpdateClusterOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=CBT_INSTANCE_ID,
    cluster_id=CBT_CLUSTER_ID,
    nodes=int(CBT_CLUSTER_NODES_UPDATED),
    task_id='update_cluster_task',
)
cluster_update_task2 = BigtableUpdateClusterOperator(
    instance_id=CBT_INSTANCE_ID,
    cluster_id=CBT_CLUSTER_ID,
    nodes=int(CBT_CLUSTER_NODES_UPDATED),
    task_id='update_cluster_task2',
)
cluster_update_task >> cluster_update_task2

BigtableCreateTableOperator

Creates a table in a Cloud Bigtable instance.

If the table with given ID exists in the Cloud Bigtable instance, the operator compares the Column Families. If the Column Families are identical operator succeeds. Otherwise, the operator fails with the appropriate error message.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

create_table_task = BigtableCreateTableOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=CBT_INSTANCE_ID,
    table_id=CBT_TABLE_ID,
    task_id='create_table',
)
create_table_task2 = BigtableCreateTableOperator(
    instance_id=CBT_INSTANCE_ID,
    table_id=CBT_TABLE_ID,
    task_id='create_table_task2',
)
create_table_task >> create_table_task2

Advanced

When creating a table, you can specify the optional initial_split_keys and column_families. Please refer to the Python Client for Google Cloud Bigtable documentation for Table and for Column Families.

BigtableDeleteTableOperator

Use the BigtableDeleteTableOperator to delete a table in Google Cloud Bigtable.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

delete_table_task = BigtableDeleteTableOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=CBT_INSTANCE_ID,
    table_id=CBT_TABLE_ID,
    task_id='delete_table_task',
)
delete_table_task2 = BigtableDeleteTableOperator(
    instance_id=CBT_INSTANCE_ID,
    table_id=CBT_TABLE_ID,
    task_id='delete_table_task2',
)

BigtableTableReplicationCompletedSensor

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

Use the BigtableTableReplicationCompletedSensor to wait for the table to replicate fully.

The same arguments apply to this sensor as the BigtableCreateTableOperator.

Note: If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until timeout hits and does not raise any exception.

Using the operator

airflow/providers/google/cloud/example_dags/example_bigtable.pyView Source

wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
    project_id=GCP_PROJECT_ID,
    instance_id=CBT_INSTANCE_ID,
    table_id=CBT_TABLE_ID,
    poke_interval=int(CBT_POKE_INTERVAL),
    timeout=180,
    task_id='wait_for_table_replication_task',
)
wait_for_table_replication_task2 = BigtableTableReplicationCompletedSensor(
    instance_id=CBT_INSTANCE_ID,
    table_id=CBT_TABLE_ID,
    poke_interval=int(CBT_POKE_INTERVAL),
    timeout=180,
    task_id='wait_for_table_replication_task2',
)

Reference

For further information, look at:

Was this entry helpful?