Source code for airflow.providers.google.cloud.triggers.cloud_composer
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportjsonfromdatetimeimportdatetimefromtypingimportAny,Sequencefromdateutilimportparserfromgoogle.cloud.orchestration.airflow.service_v1.typesimportExecuteAirflowCommandResponsefromairflow.exceptionsimportAirflowExceptionfromairflow.providers.google.cloud.hooks.cloud_composerimportCloudComposerAsyncHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classCloudComposerExecutionTrigger(BaseTrigger):"""The trigger handles the async communication with the Google Cloud Composer."""def__init__(self,project_id:str,region:str,operation_name:str,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,pooling_period_seconds:int=30,):super().__init__()self.project_id=project_idself.region=regionself.operation_name=operation_nameself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.pooling_period_seconds=pooling_period_secondsself.gcp_hook=CloudComposerAsyncHook(gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,)
[docs]classCloudComposerAirflowCLICommandTrigger(BaseTrigger):"""The trigger wait for the Airflow CLI command result."""def__init__(self,project_id:str,region:str,environment_id:str,execution_cmd_info:dict,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,poll_interval:int=10,):super().__init__()self.project_id=project_idself.region=regionself.environment_id=environment_idself.execution_cmd_info=execution_cmd_infoself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.poll_interval=poll_intervalself.gcp_hook=CloudComposerAsyncHook(gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,)
[docs]classCloudComposerDAGRunTrigger(BaseTrigger):"""The trigger wait for the DAG run completion."""def__init__(self,project_id:str,region:str,environment_id:str,composer_dag_id:str,start_date:datetime,end_date:datetime,allowed_states:list[str],gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,poll_interval:int=10,):super().__init__()self.project_id=project_idself.region=regionself.environment_id=environment_idself.composer_dag_id=composer_dag_idself.start_date=start_dateself.end_date=end_dateself.allowed_states=allowed_statesself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.poll_interval=poll_intervalself.gcp_hook=CloudComposerAsyncHook(gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,)
asyncdef_pull_dag_runs(self)->list[dict]:"""Pull the list of dag runs."""dag_runs_cmd=awaitself.gcp_hook.execute_airflow_command(project_id=self.project_id,region=self.region,environment_id=self.environment_id,command="dags",subcommand="list-runs",parameters=["-d",self.composer_dag_id,"-o","json"],)cmd_result=awaitself.gcp_hook.wait_command_execution_result(project_id=self.project_id,region=self.region,environment_id=self.environment_id,execution_cmd_info=ExecuteAirflowCommandResponse.to_dict(dag_runs_cmd),)dag_runs=json.loads(cmd_result["output"][0]["content"])returndag_runsdef_check_dag_runs_states(self,dag_runs:list[dict],start_date:datetime,end_date:datetime,)->bool:fordag_runindag_runs:if(start_date.timestamp()<parser.parse(dag_run["execution_date"]).timestamp()<end_date.timestamp())anddag_run["state"]notinself.allowed_states:returnFalsereturnTrue
[docs]asyncdefrun(self):try:whileTrue:ifdatetime.now(self.end_date.tzinfo).timestamp()>self.end_date.timestamp():dag_runs=awaitself._pull_dag_runs()self.log.info("Sensor waits for allowed states: %s",self.allowed_states)ifself._check_dag_runs_states(dag_runs=dag_runs,start_date=self.start_date,end_date=self.end_date,):yieldTriggerEvent({"status":"success"})returnself.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)exceptAirflowExceptionasex:yieldTriggerEvent({"status":"error","message":str(ex),})return