Source code for airflow.providers.dbt.cloud.sensors.dbt
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimeimportwarningsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Anyfromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarning,AirflowSkipExceptionfromairflow.providers.dbt.cloud.hooks.dbtimportDbtCloudHook,DbtCloudJobRunException,DbtCloudJobRunStatusfromairflow.providers.dbt.cloud.triggers.dbtimportDbtCloudRunJobTriggerfromairflow.providers.dbt.cloud.utils.openlineageimportgenerate_openlineage_events_from_dbt_cloud_runfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.providers.openlineage.extractorsimportOperatorLineagefromairflow.utils.contextimportContext
[docs]classDbtCloudJobRunSensor(BaseSensorOperator):"""Checks the status of a dbt Cloud job run. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/operator:DbtCloudJobRunSensor` :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud. :param run_id: The job run identifier. :param account_id: The dbt Cloud account identifier. :param deferrable: Run sensor in the deferrable mode. """
def__init__(self,*,dbt_cloud_conn_id:str=DbtCloudHook.default_conn_name,run_id:int,account_id:int|None=None,deferrable:bool=conf.getboolean("operators","default_deferrable",fallback=False),**kwargs,)->None:ifdeferrable:if"poke_interval"notinkwargs:# TODO: Remove once deprecatedif"polling_interval"inkwargs:kwargs["poke_interval"]=kwargs["polling_interval"]warnings.warn("Argument `poll_interval` is deprecated and will be removed ""in a future release. Please use `poke_interval` instead.",AirflowProviderDeprecationWarning,stacklevel=2,)else:kwargs["poke_interval"]=5if"timeout"notinkwargs:kwargs["timeout"]=60*60*24*7super().__init__(**kwargs)self.dbt_cloud_conn_id=dbt_cloud_conn_idself.run_id=run_idself.account_id=account_idself.deferrable=deferrable@cached_property
[docs]defpoke(self,context:Context)->bool:job_run_status=self.hook.get_job_run_status(run_id=self.run_id,account_id=self.account_id)ifjob_run_status==DbtCloudJobRunStatus.ERROR.value:# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1message=f"Job run {self.run_id} has failed."ifself.soft_fail:raiseAirflowSkipException(message)raiseDbtCloudJobRunException(message)ifjob_run_status==DbtCloudJobRunStatus.CANCELLED.value:# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1message=f"Job run {self.run_id} has been cancelled."ifself.soft_fail:raiseAirflowSkipException(message)raiseDbtCloudJobRunException(message)returnjob_run_status==DbtCloudJobRunStatus.SUCCESS.value
[docs]defexecute(self,context:Context)->None:"""Run the sensor. Depending on whether ``deferrable`` is set, this would either defer to the triggerer or poll for states of the job run, until the job reaches a failure state or success state. """ifnotself.deferrable:super().execute(context)else:end_time=time.time()+self.timeoutifnotself.poke(context=context):self.defer(timeout=self.execution_timeout,trigger=DbtCloudRunJobTrigger(run_id=self.run_id,conn_id=self.dbt_cloud_conn_id,account_id=self.account_id,poll_interval=self.poke_interval,end_time=end_time,),method_name="execute_complete",)
[docs]defexecute_complete(self,context:Context,event:dict[str,Any])->int:""" Execute when the trigger fires - returns immediately. This relies on trigger to throw an exception, otherwise it assumes execution was successful. """ifevent["status"]in["error","cancelled"]:message=f"Error in dbt: {event['message']}"ifself.soft_fail:raiseAirflowSkipException(message)raiseAirflowException()self.log.info(event["message"])returnint(event["run_id"])
[docs]defget_openlineage_facets_on_complete(self,task_instance)->OperatorLineage:"""Implement _on_complete because job_run needs to be triggered first in execute method."""returngenerate_openlineage_events_from_dbt_cloud_run(operator=self,task_instance=task_instance)
[docs]classDbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):""" This class is deprecated. Please use :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor` with ``deferrable=True``. """def__init__(self,**kwargs:Any)->None:warnings.warn("Class `DbtCloudJobRunAsyncSensor` is deprecated and will be removed in a future release. ""Please use `DbtCloudJobRunSensor` and set `deferrable` attribute to `True` instead",AirflowProviderDeprecationWarning,stacklevel=2,)super().__init__(deferrable=True,**kwargs)