Source code for airflow.providers.teradata.triggers.teradata_compute_cluster
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromcollections.abcimportAsyncIteratorfromtypingimportAnyfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.common.sql.hooks.handlersimportfetch_one_handlerfromairflow.providers.teradata.hooks.teradataimportTeradataHookfromairflow.providers.teradata.utils.constantsimportConstantsfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classTeradataComputeClusterSyncTrigger(BaseTrigger):""" Fetch the status of the suspend or resume operation for the specified compute cluster. :param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` reference to a specific Teradata database. :param compute_profile_name: Name of the Compute Profile to manage. :param compute_group_name: Name of compute group to which compute profile belongs. :param opr_type: Compute cluster operation - SUSPEND/RESUME :param poll_interval: polling period in minutes to check for the status """def__init__(self,teradata_conn_id:str,compute_profile_name:str,compute_group_name:str|None=None,operation_type:str|None=None,poll_interval:float|None=None,):super().__init__()
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize TeradataComputeClusterSyncTrigger arguments and classpath."""return("airflow.providers.teradata.triggers.teradata_compute_cluster.TeradataComputeClusterSyncTrigger",{"teradata_conn_id":self.teradata_conn_id,"compute_profile_name":self.compute_profile_name,"compute_group_name":self.compute_group_name,"operation_type":self.operation_type,"poll_interval":self.poll_interval,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Wait for Compute Cluster operation to complete."""try:whileTrue:status=awaitself.get_status()ifstatusisNoneorlen(status)==0:self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG)raiseAirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG)if(self.operation_type==Constants.CC_SUSPEND_OPRorself.operation_type==Constants.CC_CREATE_SUSPEND_OPR):ifstatus==Constants.CC_SUSPEND_DB_STATUS:breakelif(self.operation_type==Constants.CC_RESUME_OPRorself.operation_type==Constants.CC_CREATE_OPR):ifstatus==Constants.CC_RESUME_DB_STATUS:breakifself.poll_intervalisnotNone:self.poll_interval=float(self.poll_interval)else:self.poll_interval=float(Constants.CC_POLL_INTERVAL)awaitasyncio.sleep(self.poll_interval)if(self.operation_type==Constants.CC_SUSPEND_OPRorself.operation_type==Constants.CC_CREATE_SUSPEND_OPR):ifstatus==Constants.CC_SUSPEND_DB_STATUS:yieldTriggerEvent({"status":"success","message":Constants.CC_OPR_SUCCESS_STATUS_MSG%(self.compute_profile_name,self.operation_type),})else:yieldTriggerEvent({"status":"error","message":Constants.CC_OPR_FAILURE_STATUS_MSG%(self.compute_profile_name,self.operation_type),})elif(self.operation_type==Constants.CC_RESUME_OPRorself.operation_type==Constants.CC_CREATE_OPR):ifstatus==Constants.CC_RESUME_DB_STATUS:yieldTriggerEvent({"status":"success","message":Constants.CC_OPR_SUCCESS_STATUS_MSG%(self.compute_profile_name,self.operation_type),})else:yieldTriggerEvent({"status":"error","message":Constants.CC_OPR_FAILURE_STATUS_MSG%(self.compute_profile_name,self.operation_type),})else:yieldTriggerEvent({"status":"error","message":"Invalid operation"})exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e)})exceptasyncio.CancelledError:self.log.error(Constants.CC_OPR_TIMEOUT_ERROR,self.operation_type)
[docs]asyncdefget_status(self)->str:"""Return compute cluster SUSPEND/RESUME operation status."""sql=("SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('"+self.compute_profile_name+"')")ifself.compute_group_name:sql+=" AND UPPER(ComputeGroupName) = UPPER('"+self.compute_group_name+"')"hook=TeradataHook(teradata_conn_id=self.teradata_conn_id)result_set=hook.run(sql,handler=fetch_one_handler)status=""ifisinstance(result_set,list)andisinstance(result_set[0],str):status=str(result_set[0])returnstatus