Source code for airflow.providers.dbt.cloud.triggers.dbt
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimporttimefromcollections.abcimportAsyncIteratorfromtypingimportAnyfromairflow.providers.dbt.cloud.hooks.dbtimportDbtCloudHook,DbtCloudJobRunStatusfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classDbtCloudRunJobTrigger(BaseTrigger):""" Trigger to make an HTTP call to dbt and get the status for the job. This is done with run id in polling interval of time. :param conn_id: The connection identifier for connecting to Dbt. :param run_id: The ID of a dbt Cloud job. :param end_time: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days. :param account_id: The ID of a dbt Cloud account. :param poll_interval: polling period in seconds to check for the status. """def__init__(self,conn_id:str,run_id:int,end_time:float,poll_interval:float,account_id:int|None,):super().__init__()
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize DbtCloudRunJobTrigger arguments and classpath."""return("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger",{"run_id":self.run_id,"account_id":self.account_id,"conn_id":self.conn_id,"end_time":self.end_time,"poll_interval":self.poll_interval,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Make async connection to Dbt, polls for the pipeline run status."""hook=DbtCloudHook(self.conn_id)try:whileawaitself.is_still_running(hook):ifself.end_time<time.time():yieldTriggerEvent({"status":"error","message":f"Job run {self.run_id} has not reached a terminal status after "f"{self.end_time} seconds.","run_id":self.run_id,})returnawaitasyncio.sleep(self.poll_interval)job_run_status=awaithook.get_job_status(self.run_id,self.account_id)ifjob_run_status==DbtCloudJobRunStatus.SUCCESS.value:yieldTriggerEvent({"status":"success","message":f"Job run {self.run_id} has completed successfully.","run_id":self.run_id,})elifjob_run_status==DbtCloudJobRunStatus.CANCELLED.value:yieldTriggerEvent({"status":"cancelled","message":f"Job run {self.run_id} has been cancelled.","run_id":self.run_id,})else:yieldTriggerEvent({"status":"error","message":f"Job run {self.run_id} has failed.","run_id":self.run_id,})exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e),"run_id":self.run_id})
[docs]asyncdefis_still_running(self,hook:DbtCloudHook)->bool:"""Check whether the submitted job is running."""job_run_status=awaithook.get_job_status(self.run_id,self.account_id)ifnotDbtCloudJobRunStatus.is_terminal(job_run_status):returnTruereturnFalse