Source code for airflow.providers.google.cloud.hooks.bigtable
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a Google Cloud Bigtable Hook."""from__future__importannotationsfromtypingimportTYPE_CHECKING,Sequencefromgoogle.cloud.bigtableimportClient,enumsfromgoogle.cloud.bigtable.clusterimportClusterfromgoogle.cloud.bigtable.instanceimportInstancefromgoogle.cloud.bigtable.tableimportClusterState,Tablefromairflow.providers.google.common.constsimportCLIENT_INFOfromairflow.providers.google.common.hooks.base_googleimportGoogleBaseHookifTYPE_CHECKING:importenumfromgoogle.cloud.bigtable.column_familyimportColumnFamily,GarbageCollectionRule
[docs]classBigtableHook(GoogleBaseHook):""" Hook for Google Cloud Bigtable APIs. All the methods in the hook where project_id is used must be called with keyword arguments rather than positional. """def__init__(self,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,**kwargs,)->None:ifkwargs.get("delegate_to")isnotNone:raiseRuntimeError("The `delegate_to` parameter has been deprecated before and finally removed in this version"" of Google Provider. You MUST convert it to `impersonate_chain`")super().__init__(gcp_conn_id=gcp_conn_id,impersonation_chain=impersonation_chain,)self._client:Client|None=Nonedef_get_client(self,project_id:str)->Client:ifnotself._client:self._client=Client(project=project_id,credentials=self.get_credentials(),client_info=CLIENT_INFO,admin=True,)returnself._client@GoogleBaseHook.fallback_to_default_project_id
[docs]defget_instance(self,instance_id:str,project_id:str)->Instance|None:""" Retrieve and returns the specified Cloud Bigtable instance if it exists, otherwise returns None. :param instance_id: The ID of the Cloud Bigtable instance. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. """instance=self._get_client(project_id=project_id).instance(instance_id)ifnotinstance.exists():returnNonereturninstance
@GoogleBaseHook.fallback_to_default_project_id
[docs]defdelete_instance(self,instance_id:str,project_id:str)->None:""" Delete the specified Cloud Bigtable instance. Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does not exist. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param instance_id: The ID of the Cloud Bigtable instance. """instance=self.get_instance(instance_id=instance_id,project_id=project_id)ifinstance:instance.delete()else:self.log.warning("The instance '%s' does not exist in project '%s'. Exiting",instance_id,project_id)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defcreate_instance(self,instance_id:str,main_cluster_id:str,main_cluster_zone:str,project_id:str,replica_clusters:list[dict[str,str]]|None=None,instance_display_name:str|None=None,instance_type:enums.Instance.Type=enums.Instance.Type.UNSPECIFIED,# type: ignore[assignment]instance_labels:dict|None=None,cluster_nodes:int|None=None,cluster_storage_type:enums.StorageType=enums.StorageType.UNSPECIFIED,# type: ignore[assignment]timeout:float|None=None,)->Instance:""" Create new instance. :param instance_id: The ID for the new instance. :param main_cluster_id: The ID for main cluster for the new instance. :param main_cluster_zone: The zone for main cluster. See https://cloud.google.com/bigtable/docs/locations for more details. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param replica_clusters: (optional) A list of replica clusters for the new instance. Each cluster dictionary contains an id and a zone. Example: [{"id": "replica-1", "zone": "us-west1-a"}] :param instance_type: (optional) The type of the instance. :param instance_display_name: (optional) Human-readable name of the instance. Defaults to ``instance_id``. :param instance_labels: (optional) Dictionary of labels to associate with the instance. :param cluster_nodes: (optional) Number of nodes for cluster. :param cluster_storage_type: (optional) The type of storage. :param timeout: (optional) timeout (in seconds) for instance creation. If None is not specified, Operator will wait indefinitely. """instance=Instance(instance_id,self._get_client(project_id=project_id),instance_display_name,instance_type,instance_labels,)cluster_kwargs={"cluster_id":main_cluster_id,"location_id":main_cluster_zone,"default_storage_type":cluster_storage_type,}ifinstance_type!=enums.Instance.Type.DEVELOPMENTandcluster_nodes:cluster_kwargs["serve_nodes"]=cluster_nodesclusters=[instance.cluster(**cluster_kwargs)]ifreplica_clusters:forreplica_clusterinreplica_clusters:if"id"inreplica_clusterand"zone"inreplica_cluster:clusters.append(instance.cluster(replica_cluster["id"],replica_cluster["zone"],cluster_nodes,cluster_storage_type,))operation=instance.create(clusters=clusters)operation.result(timeout)returninstance
@GoogleBaseHook.fallback_to_default_project_id
[docs]defupdate_instance(self,instance_id:str,project_id:str,instance_display_name:str|None=None,instance_type:enums.Instance.Type|enum.IntEnum|None=None,instance_labels:dict|None=None,timeout:float|None=None,)->Instance:""" Update an existing instance. :param instance_id: The ID for the existing instance. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param instance_display_name: (optional) Human-readable name of the instance. :param instance_type: (optional) The type of the instance. :param instance_labels: (optional) Dictionary of labels to associate with the instance. :param timeout: (optional) timeout (in seconds) for instance update. If None is not specified, Operator will wait indefinitely. """instance=Instance(instance_id=instance_id,client=self._get_client(project_id=project_id),display_name=instance_display_name,instance_type=instance_type,labels=instance_labels,)operation=instance.update()operation.result(timeout)returninstance
@staticmethod
[docs]defcreate_table(instance:Instance,table_id:str,initial_split_keys:list|None=None,column_families:dict[str,GarbageCollectionRule]|None=None,)->None:""" Create the specified Cloud Bigtable table. Raises ``google.api_core.exceptions.AlreadyExists`` if the table exists. :param instance: The Cloud Bigtable instance that owns the table. :param table_id: The ID of the table to create in Cloud Bigtable. :param initial_split_keys: (Optional) A list of row keys in bytes to use to initially split the table. :param column_families: (Optional) A map of columns to create. The key is the column_id str, and the value is a :class:`google.cloud.bigtable.column_family.GarbageCollectionRule`. """ifcolumn_familiesisNone:column_families={}ifinitial_split_keysisNone:initial_split_keys=[]table=Table(table_id,instance)table.create(initial_split_keys,column_families)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defdelete_table(self,instance_id:str,table_id:str,project_id:str)->None:""" Delete the specified table in Cloud Bigtable. Raises google.api_core.exceptions.NotFound if the table does not exist. :param instance_id: The ID of the Cloud Bigtable instance. :param table_id: The ID of the table in Cloud Bigtable. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. """instance=self.get_instance(instance_id=instance_id,project_id=project_id)ifinstanceisNone:raiseRuntimeError(f"Instance {instance_id} did not exist; unable to delete table {table_id}")table=instance.table(table_id=table_id)table.delete()
@staticmethod
[docs]defupdate_cluster(instance:Instance,cluster_id:str,nodes:int)->None:""" Update number of nodes in the specified Cloud Bigtable cluster. Raises google.api_core.exceptions.NotFound if the cluster does not exist. :param instance: The Cloud Bigtable instance that owns the cluster. :param cluster_id: The ID of the cluster. :param nodes: The desired number of nodes. """cluster=Cluster(cluster_id,instance)# "reload" is required to set location_id attribute on cluster.cluster.reload()cluster.serve_nodes=nodescluster.update()
@staticmethod
[docs]defget_column_families_for_table(instance:Instance,table_id:str)->dict[str,ColumnFamily]:""" Fetch Column Families for the specified table in Cloud Bigtable. :param instance: The Cloud Bigtable instance that owns the table. :param table_id: The ID of the table in Cloud Bigtable to fetch Column Families from. """table=Table(table_id,instance)returntable.list_column_families()
@staticmethod
[docs]defget_cluster_states_for_table(instance:Instance,table_id:str)->dict[str,ClusterState]:""" Fetch Cluster States for the specified table in Cloud Bigtable. Raises google.api_core.exceptions.NotFound if the table does not exist. :param instance: The Cloud Bigtable instance that owns the table. :param table_id: The ID of the table in Cloud Bigtable to fetch Cluster States from. """table=Table(table_id,instance)returntable.get_cluster_states()