Source code for airflow.providers.airbyte.triggers.airbyte
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimporttimefromcollections.abcimportAsyncIteratorfromtypingimportAnyfromairbyte_api.modelsimportJobStatusEnumfromairflow.providers.airbyte.hooks.airbyteimportAirbyteHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classAirbyteSyncTrigger(BaseTrigger):""" Triggers Airbyte Sync, makes an asynchronous HTTP call to get the status via a job ID. This trigger is designed to initiate and monitor the status of Airbyte Sync jobs. It makes use of asynchronous communication to check the progress of a job run over time. :param conn_id: The connection identifier for connecting to Airbyte. :param job_id: The ID of an Airbyte Sync job. :param end_time: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days. :param poll_interval: polling period in seconds to check for the status. """def__init__(self,job_id:int,conn_id:str,end_time:float,poll_interval:float,):super().__init__()
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize AirbyteSyncTrigger arguments and classpath."""return("airflow.providers.airbyte.triggers.airbyte.AirbyteSyncTrigger",{"job_id":self.job_id,"conn_id":self.conn_id,"end_time":self.end_time,"poll_interval":self.poll_interval,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Make async connection to Airbyte, polls for the pipeline run status."""hook=AirbyteHook(airbyte_conn_id=self.conn_id)try:whileawaitself.is_still_running(hook):ifself.end_time<time.time():yieldTriggerEvent({"status":"error","message":f"Job run {self.job_id} has not reached a terminal status after "f"{self.end_time} seconds.","job_id":self.job_id,})returnawaitasyncio.sleep(self.poll_interval)job_run_status=hook.get_job_status(self.job_id)ifjob_run_status==JobStatusEnum.SUCCEEDED:yieldTriggerEvent({"status":"success","message":f"Job run {self.job_id} has completed successfully.","job_id":self.job_id,})elifjob_run_status==JobStatusEnum.CANCELLED:yieldTriggerEvent({"status":"cancelled","message":f"Job run {self.job_id} has been cancelled.","job_id":self.job_id,})else:yieldTriggerEvent({"status":"error","message":f"Job run {self.job_id} has failed.","job_id":self.job_id,})exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e),"job_id":self.job_id})
[docs]asyncdefis_still_running(self,hook:AirbyteHook)->bool:""" Async function to check whether the job is submitted via async API. If job is in running state returns True if it is still running else return False """job_run_status=hook.get_job_status(self.job_id)ifjob_run_statusin(JobStatusEnum.RUNNING,JobStatusEnum.PENDING,JobStatusEnum.INCOMPLETE):returnTruereturnFalse