Source code for airflow.providers.teradata.operators.teradata_compute_cluster
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportrefromabcimportabstractmethodfromenumimportEnumfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKINGfromairflow.modelsimportBaseOperatorfromairflow.providers.teradata.hooks.teradataimportTeradataHookfromairflow.providers.teradata.utils.constantsimportConstantsifTYPE_CHECKING:fromairflow.utils.contextimportContextfromdatetimeimporttimedeltafromtypingimportTYPE_CHECKING,Any,Sequence,castfromairflow.providers.teradata.triggers.teradata_compute_clusterimportTeradataComputeClusterSyncTriggerifTYPE_CHECKING:fromairflow.utils.contextimportContextfromairflow.exceptionsimportAirflowException# Represents# 1. Compute Cluster Setup - Provision and Decomission operations# 2. Compute Cluster State - Resume and Suspend operationsclass_Operation(Enum):SETUP=1STATE=2# Handler to handle single result set of a SQL querydef_single_result_row_handler(cursor):records=cursor.fetchone()ifisinstance(records,list):returnrecords[0]ifrecordsisNone:returnrecordsraiseTypeError(f"Unexpected results: {cursor.fetchone()!r}")# Providers given operation is setup or state operationdef_determine_operation_context(operation):ifoperation==Constants.CC_CREATE_OPRoroperation==Constants.CC_DROP_OPR:return_Operation.SETUPreturn_Operation.STATEclass_TeradataComputeClusterOperator(BaseOperator):""" Teradata Compute Cluster Base Operator to set up and status operations of compute cluster. :param compute_profile_name: Name of the Compute Profile to manage. :param compute_group_name: Name of compute group to which compute profile belongs. :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` reference to a specific Teradata database. :param timeout: Time elapsed before the task times out and fails. """template_fields:Sequence[str]=("compute_profile_name","compute_group_name","teradata_conn_id","timeout",)ui_color="#e07c24"def__init__(self,compute_profile_name:str,compute_group_name:str|None=None,teradata_conn_id:str=TeradataHook.default_conn_name,timeout:int=Constants.CC_OPR_TIME_OUT,**kwargs,)->None:super().__init__(**kwargs)self.compute_profile_name=compute_profile_nameself.compute_group_name=compute_group_nameself.teradata_conn_id=teradata_conn_idself.timeout=timeout@cached_propertydefhook(self)->TeradataHook:returnTeradataHook(teradata_conn_id=self.teradata_conn_id)@abstractmethoddefexecute(self,context:Context):passdefexecute_complete(self,context:Context,event:dict[str,Any])->None:""" Execute when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """self._compute_cluster_execute_complete(event)def_compute_cluster_execute(self):# Verifies the provided compute profile name.if(self.compute_profile_nameisNoneorself.compute_profile_name=="None"orself.compute_profile_name==""):self.log.info("Invalid compute cluster profile name")raiseAirflowException(Constants.CC_OPR_EMPTY_PROFILE_ERROR_MSG)# Verifies if the provided Teradata instance belongs to Vantage Cloud Lake.lake_support_find_sql="SELECT count(1) from DBC.StorageV WHERE StorageName='TD_OFSSTORAGE'"lake_support_result=self.hook.run(lake_support_find_sql,handler=_single_result_row_handler)iflake_support_resultisNone:raiseAirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG)# Getting teradata db version. Considering teradata instance is Lake when db version is 20 or abovedb_version_get_sql="SELECT InfoData AS Version FROM DBC.DBCInfoV WHERE InfoKey = 'VERSION'"try:db_version_result=self.hook.run(db_version_get_sql,handler=_single_result_row_handler)ifdb_version_resultisnotNone:db_version_result=str(db_version_result)db_version=db_version_result.split(".")[0]ifdb_versionisnotNoneandint(db_version)<20:raiseAirflowException(Constants.CC_GRP_LAKE_SUPPORT_ONLY_MSG)else:raiseAirflowException("Error occurred while getting teradata database version")exceptExceptionasex:self.log.error("Error occurred while getting teradata database version: %s ",str(ex))raiseAirflowException("Error occurred while getting teradata database version")def_compute_cluster_execute_complete(self,event:dict[str,Any])->None:ifevent["status"]=="success":returnevent["message"]elifevent["status"]=="error":raiseAirflowException(event["message"])def_handle_cc_status(self,operation_type,sql):create_sql_result=self._hook_run(sql,handler=_single_result_row_handler)self.log.info("%s query ran successfully. Differing to trigger to check status in db. Result from sql: %s",operation_type,create_sql_result,)self.defer(timeout=timedelta(minutes=self.timeout),trigger=TeradataComputeClusterSyncTrigger(teradata_conn_id=cast(str,self.teradata_conn_id),compute_profile_name=self.compute_profile_name,compute_group_name=self.compute_group_name,operation_type=operation_type,poll_interval=Constants.CC_POLL_INTERVAL,),method_name="execute_complete",)returncreate_sql_resultdef_hook_run(self,query,handler=None):try:ifhandlerisnotNone:returnself.hook.run(query,handler=handler)else:returnself.hook.run(query)exceptExceptionasex:self.log.error(str(ex))raisedef_get_initially_suspended(self,create_cp_query):initially_suspended="FALSE"pattern=r"INITIALLY_SUSPENDED\s*\(\s*'(TRUE|FALSE)'\s*\)"# Search for the pattern in the input stringmatch=re.search(pattern,create_cp_query,re.IGNORECASE)ifmatch:# Get the value of INITIALLY_SUSPENDEDinitially_suspended=match.group(1).strip().upper()returninitially_suspended
[docs]classTeradataComputeClusterProvisionOperator(_TeradataComputeClusterOperator):""" Creates the new Computer Cluster with specified Compute Group Name and Compute Profile Name. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:TeradataComputeClusterProvisionOperator` :param compute_profile_name: Name of the Compute Profile to manage. :param compute_group_name: Name of compute group to which compute profile belongs. :param query_strategy: Query strategy to use. Refers to the approach or method used by the Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster. Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD. :param compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers to the mapping of compute resources to a specific node or set of nodes within the cluster. :param compute_attribute: Optional attributes of compute profile. Example compute attribute MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE') :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` reference to a specific Teradata database. :param timeout: Time elapsed before the task times out and fails. """
def__init__(self,query_strategy:str|None=None,compute_map:str|None=None,compute_attribute:str|None=None,**kwargs,)->None:super().__init__(**kwargs)self.query_strategy=query_strategyself.compute_map=compute_mapself.compute_attribute=compute_attributedef_build_ccp_setup_query(self):create_cp_query="CREATE COMPUTE PROFILE "+self.compute_profile_nameifself.compute_group_name:create_cp_query=create_cp_query+" IN "+self.compute_group_nameifself.compute_mapisnotNone:create_cp_query=create_cp_query+", INSTANCE = "+self.compute_mapifself.query_strategyisnotNone:create_cp_query=create_cp_query+", INSTANCE TYPE = "+self.query_strategyifself.compute_attributeisnotNone:create_cp_query=create_cp_query+" USING "+self.compute_attributereturncreate_cp_query
[docs]defexecute(self,context:Context):""" Initiate the execution of CREATE COMPUTE SQL statement. Initiate the execution of the SQL statement for provisioning the compute cluster within Teradata Vantage Lake, effectively creates the compute cluster. Airflow runs this method on the worker and defers using the trigger. """super().execute(context)returnself._compute_cluster_execute()
def_compute_cluster_execute(self):super()._compute_cluster_execute()ifself.compute_group_name:cg_status_query=("SELECT count(1) FROM DBC.ComputeGroups WHERE UPPER(ComputeGroupName) = UPPER('"+self.compute_group_name+"')")cg_status_result=self._hook_run(cg_status_query,_single_result_row_handler)ifcg_status_resultisnotNone:cg_status_result=str(cg_status_result)else:cg_status_result=0ifint(cg_status_result)==0:create_cg_query="CREATE COMPUTE GROUP "+self.compute_group_nameifself.query_strategyisnotNone:create_cg_query=(create_cg_query+" USING QUERY_STRATEGY ('"+self.query_strategy+"')")self._hook_run(create_cg_query,_single_result_row_handler)cp_status_query=("SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"+self.compute_profile_name+"')")ifself.compute_group_name:cp_status_query+=" AND UPPER(ComputeGroupName) = UPPER('"+self.compute_group_name+"')"cp_status_result=self._hook_run(cp_status_query,handler=_single_result_row_handler)ifcp_status_resultisnotNone:cp_status_result=str(cp_status_result)msg=f"Compute Profile {self.compute_profile_name} is already exists under Compute Group {self.compute_group_name}. Status is {cp_status_result}"self.log.info(msg)returncp_status_resultelse:create_cp_query=self._build_ccp_setup_query()operation=Constants.CC_CREATE_OPRinitially_suspended=self._get_initially_suspended(create_cp_query)ifinitially_suspended=="TRUE":operation=Constants.CC_CREATE_SUSPEND_OPRreturnself._handle_cc_status(operation,create_cp_query)
[docs]classTeradataComputeClusterDecommissionOperator(_TeradataComputeClusterOperator):""" Drops the compute cluster with specified Compute Group Name and Compute Profile Name. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:TeradataComputeClusterDecommissionOperator` :param compute_profile_name: Name of the Compute Profile to manage. :param compute_group_name: Name of compute group to which compute profile belongs. :param delete_compute_group: Indicates whether the compute group should be deleted. When set to True, it signals the system to remove the specified compute group. Conversely, when set to False, no action is taken on the compute group. :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` reference to a specific Teradata database. :param timeout: Time elapsed before the task times out and fails. """
[docs]defexecute(self,context:Context):""" Initiate the execution of DROP COMPUTE SQL statement. Initiate the execution of the SQL statement for decommissioning the compute cluster within Teradata Vantage Lake, effectively drops the compute cluster. Airflow runs this method on the worker and defers using the trigger. """super().execute(context)returnself._compute_cluster_execute()
def_compute_cluster_execute(self):super()._compute_cluster_execute()cp_drop_query="DROP COMPUTE PROFILE "+self.compute_profile_nameifself.compute_group_name:cp_drop_query=cp_drop_query+" IN COMPUTE GROUP "+self.compute_group_nameself._hook_run(cp_drop_query,handler=_single_result_row_handler)self.log.info("Compute Profile %s IN Compute Group %s is successfully dropped",self.compute_profile_name,self.compute_group_name,)ifself.delete_compute_group:cg_drop_query="DROP COMPUTE GROUP "+self.compute_group_nameself._hook_run(cg_drop_query,handler=_single_result_row_handler)self.log.info("Compute Group %s is successfully dropped",self.compute_group_name)
[docs]classTeradataComputeClusterResumeOperator(_TeradataComputeClusterOperator):""" Teradata Compute Cluster Operator to Resume the specified Teradata Vantage Cloud Lake Compute Cluster. Resumes the Teradata Vantage Lake Computer Cluster by employing the RESUME SQL statement within the Teradata Vantage Lake Compute Cluster SQL Interface. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:TeradataComputeClusterResumeOperator` :param compute_profile_name: Name of the Compute Profile to manage. :param compute_group_name: Name of compute group to which compute profile belongs. :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` reference to a specific Teradata database. :param timeout: Time elapsed before the task times out and fails. Time is in minutes. """
[docs]defexecute(self,context:Context):""" Initiate the execution of RESUME COMPUTE SQL statement. Initiate the execution of the SQL statement for resuming the compute cluster within Teradata Vantage Lake, effectively resumes the compute cluster. Airflow runs this method on the worker and defers using the trigger. """super().execute(context)returnself._compute_cluster_execute()
def_compute_cluster_execute(self):super()._compute_cluster_execute()cc_status_query=("SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"+self.compute_profile_name+"')")ifself.compute_group_name:cc_status_query+=" AND UPPER(ComputeGroupName) = UPPER('"+self.compute_group_name+"')"cc_status_result=self._hook_run(cc_status_query,handler=_single_result_row_handler)ifcc_status_resultisnotNone:cp_status_result=str(cc_status_result)# Generates an error message if the compute cluster does not exist for the specified# compute profile and compute group.else:self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)raiseAirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)ifcp_status_result!=Constants.CC_RESUME_DB_STATUS:cp_resume_query=f"RESUME COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}"ifself.compute_group_name:cp_resume_query=f"{cp_resume_query} IN COMPUTE GROUP {self.compute_group_name}"returnself._handle_cc_status(Constants.CC_RESUME_OPR,cp_resume_query)else:self.log.info("Compute Cluster %s already %s",self.compute_profile_name,Constants.CC_RESUME_DB_STATUS)
[docs]classTeradataComputeClusterSuspendOperator(_TeradataComputeClusterOperator):""" Teradata Compute Cluster Operator to suspend the specified Teradata Vantage Cloud Lake Compute Cluster. Suspends the Teradata Vantage Lake Computer Cluster by employing the SUSPEND SQL statement within the Teradata Vantage Lake Compute Cluster SQL Interface. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:TeradataComputeClusterSuspendOperator` :param compute_profile_name: Name of the Compute Profile to manage. :param compute_group_name: Name of compute group to which compute profile belongs. :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` reference to a specific Teradata database. :param timeout: Time elapsed before the task times out and fails. """
[docs]defexecute(self,context:Context):""" Initiate the execution of SUSPEND COMPUTE SQL statement. Initiate the execution of the SQL statement for suspending the compute cluster within Teradata Vantage Lake, effectively suspends the compute cluster. Airflow runs this method on the worker and defers using the trigger. """super().execute(context)returnself._compute_cluster_execute()
def_compute_cluster_execute(self):super()._compute_cluster_execute()sql=("SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"+self.compute_profile_name+"')")ifself.compute_group_name:sql+=" AND UPPER(ComputeGroupName) = UPPER('"+self.compute_group_name+"')"result=self._hook_run(sql,handler=_single_result_row_handler)ifresultisnotNone:result=str(result)# Generates an error message if the compute cluster does not exist for the specified# compute profile and compute group.else:self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)raiseAirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)ifresult!=Constants.CC_SUSPEND_DB_STATUS:sql=f"SUSPEND COMPUTE FOR COMPUTE PROFILE {self.compute_profile_name}"ifself.compute_group_name:sql=f"{sql} IN COMPUTE GROUP {self.compute_group_name}"returnself._handle_cc_status(Constants.CC_SUSPEND_OPR,sql)else:self.log.info("Compute Cluster %s already %s",self.compute_profile_name,Constants.CC_SUSPEND_DB_STATUS)