Source code for airflow.providers.airbyte.sensors.airbyte
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a Airbyte Job sensor."""from__future__importannotationsimporttimefromcollections.abcimportSequencefromtypingimportTYPE_CHECKING,Anyfromairbyte_api.modelsimportJobStatusEnumfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.providers.airbyte.hooks.airbyteimportAirbyteHookfromairflow.providers.airbyte.triggers.airbyteimportAirbyteSyncTriggerfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classAirbyteJobSensor(BaseSensorOperator):""" Check for the state of a previously submitted Airbyte job. :param airbyte_job_id: Required. Id of the Airbyte job :param airbyte_conn_id: Optional. The name of the Airflow connection to get :param deferrable: Run sensor in the deferrable mode. connection information for Airbyte. Defaults to "airbyte_default". :param api_version: Optional. Airbyte API version. Defaults to "v1". """
[docs]defpoke(self,context:Context)->bool:hook=AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,api_version=self.api_version)job=hook.get_job_details(job_id=self.airbyte_job_id)status=job.statusifstatus==JobStatusEnum.FAILED:message=f"Job failed: \n{job}"raiseAirflowException(message)elifstatus==JobStatusEnum.CANCELLED:message=f"Job was cancelled: \n{job}"raiseAirflowException(message)elifstatus==JobStatusEnum.SUCCEEDED:self.log.info("Job %s completed successfully.",self.airbyte_job_id)returnTrueself.log.info("Waiting for job %s to complete.",self.airbyte_job_id)returnFalse
[docs]defexecute(self,context:Context)->Any:"""Submit a job which generates a run_id and gets deferred."""ifnotself.deferrable:super().execute(context)else:hook=AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,api_version=self.api_version)job=hook.get_job_details(job_id=(int(self.airbyte_job_id)))state=job.statusend_time=time.time()+self.timeoutself.log.info("Airbyte Job Id: Job %s",self.airbyte_job_id)ifstatein(JobStatusEnum.RUNNING,JobStatusEnum.PENDING,JobStatusEnum.INCOMPLETE):self.defer(timeout=self.execution_timeout,trigger=AirbyteSyncTrigger(conn_id=self.airbyte_conn_id,job_id=self.airbyte_job_id,end_time=end_time,poll_interval=60,),method_name="execute_complete",)elifstate==JobStatusEnum.SUCCEEDED:self.log.info("%s completed successfully.",self.task_id)returnelifstate==JobStatusEnum.FAILED:raiseAirflowException(f"Job failed:\n{job}")elifstate==JobStatusEnum.CANCELLED:raiseAirflowException(f"Job was cancelled:\n{job}")else:raiseAirflowException(f"Encountered unexpected state `{state}` for job_id `{self.airbyte_job_id}")
[docs]defexecute_complete(self,context:Context,event:Any=None)->None:""" Invoke this callback when the trigger fires; return immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ifevent["status"]=="error":raiseAirflowException(event["message"])self.log.info("%s completed successfully.",self.task_id)returnNone