Source code for airflow.providers.dbt.cloud.sensors.dbt
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Anyfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.providers.dbt.cloud.hooks.dbtimportDbtCloudHook,DbtCloudJobRunException,DbtCloudJobRunStatusfromairflow.providers.dbt.cloud.triggers.dbtimportDbtCloudRunJobTriggerfromairflow.providers.dbt.cloud.utils.openlineageimportgenerate_openlineage_events_from_dbt_cloud_runfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.providers.openlineage.extractorsimportOperatorLineagefromairflow.utils.contextimportContext
[docs]classDbtCloudJobRunSensor(BaseSensorOperator):""" Checks the status of a dbt Cloud job run. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/operator:DbtCloudJobRunSensor` :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud. :param run_id: The job run identifier. :param account_id: The dbt Cloud account identifier. :param deferrable: Run sensor in the deferrable mode. """
[docs]defpoke(self,context:Context)->bool:job_run_status=self.hook.get_job_run_status(run_id=self.run_id,account_id=self.account_id)ifjob_run_status==DbtCloudJobRunStatus.ERROR.value:message=f"Job run {self.run_id} has failed."raiseDbtCloudJobRunException(message)ifjob_run_status==DbtCloudJobRunStatus.CANCELLED.value:message=f"Job run {self.run_id} has been cancelled."raiseDbtCloudJobRunException(message)returnjob_run_status==DbtCloudJobRunStatus.SUCCESS.value
[docs]defexecute(self,context:Context)->None:""" Run the sensor. Depending on whether ``deferrable`` is set, this would either defer to the triggerer or poll for states of the job run, until the job reaches a failure state or success state. """ifnotself.deferrable:super().execute(context)else:end_time=time.time()+self.timeoutifnotself.poke(context=context):self.defer(timeout=self.execution_timeout,trigger=DbtCloudRunJobTrigger(run_id=self.run_id,conn_id=self.dbt_cloud_conn_id,account_id=self.account_id,poll_interval=self.poke_interval,end_time=end_time,),method_name="execute_complete",)
[docs]defexecute_complete(self,context:Context,event:dict[str,Any])->int:""" Execute when the trigger fires - returns immediately. This relies on trigger to throw an exception, otherwise it assumes execution was successful. """ifevent["status"]in["error","cancelled"]:raiseAirflowException()self.log.info(event["message"])returnint(event["run_id"])
[docs]defget_openlineage_facets_on_complete(self,task_instance)->OperatorLineage:"""Implement _on_complete because job_run needs to be triggered first in execute method."""returngenerate_openlineage_events_from_dbt_cloud_run(operator=self,task_instance=task_instance)