Source code for airflow.providers.openai.triggers.openai
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimporttimefromcollections.abcimportAsyncIteratorfromtypingimportAnyfromairflow.providers.openai.hooks.openaiimportBatchStatus,OpenAIHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize OpenAIBatchTrigger arguments and class path."""return("airflow.providers.openai.triggers.openai.OpenAIBatchTrigger",{"conn_id":self.conn_id,"batch_id":self.batch_id,"poll_interval":self.poll_interval,"end_time":self.end_time,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Make connection to OpenAI Client, and poll the status of batch."""hook=OpenAIHook(conn_id=self.conn_id)try:while(batch:=hook.get_batch(self.batch_id))andBatchStatus.is_in_progress(batch.status):ifself.end_time<time.time():yieldTriggerEvent({"status":"error","message":f"Batch {self.batch_id} has not reached a terminal status after "f"{time.time()-self.end_time} seconds.","batch_id":self.batch_id,})returnawaitasyncio.sleep(self.poll_interval)ifbatch.status==BatchStatus.COMPLETED:yieldTriggerEvent({"status":"success","message":f"Batch {self.batch_id} has completed successfully.","batch_id":self.batch_id,})elifbatch.statusin{BatchStatus.CANCELLED,BatchStatus.CANCELLING}:yieldTriggerEvent({"status":"cancelled","message":f"Batch {self.batch_id} has been cancelled.","batch_id":self.batch_id,})elifbatch.status==BatchStatus.FAILED:yieldTriggerEvent({"status":"error","message":f"Batch failed:\n{self.batch_id}","batch_id":self.batch_id,})elifbatch.status==BatchStatus.EXPIRED:yieldTriggerEvent({"status":"error","message":f"Batch couldn't be completed within the hour time window :\n{self.batch_id}","batch_id":self.batch_id,})yieldTriggerEvent({"status":"error","message":f"Batch {self.batch_id} has failed.","batch_id":self.batch_id,})exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e),"batch_id":self.batch_id})