# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportdatetimeimportosimporttypingimportwarningsfromglobimportglobfromtypingimportAnyfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classFileTrigger(BaseTrigger):""" A trigger that fires exactly once after it finds the requested file or folder. :param filepath: File or folder name (relative to the base path set within the connection), can be a glob. :param recursive: when set to ``True``, enables recursive directory matching behavior of ``**`` in glob filepath parameter. Defaults to ``False``. :param poke_interval: Time that the job should wait in between each try """def__init__(self,filepath:str,recursive:bool=False,poke_interval:float=5.0,**kwargs,):super().__init__()self.filepath=filepathself.recursive=recursiveifkwargs.get("poll_interval")isnotNone:warnings.warn("`poll_interval` has been deprecated and will be removed in future.""Please use `poke_interval` instead.",DeprecationWarning,stacklevel=2,)self.poke_interval:float=kwargs["poll_interval"]else:self.poke_interval=poke_interval
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize FileTrigger arguments and classpath."""return("airflow.triggers.file.FileTrigger",{"filepath":self.filepath,"recursive":self.recursive,"poke_interval":self.poke_interval,},)
[docs]asyncdefrun(self)->typing.AsyncIterator[TriggerEvent]:"""Loop until the relevant files are found."""whileTrue:forpathinglob(self.filepath,recursive=self.recursive):ifos.path.isfile(path):mod_time_f=os.path.getmtime(path)mod_time=datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S")self.log.info("Found File %s last modified: %s",path,mod_time)yieldTriggerEvent(True)returnfor_,_,filesinos.walk(self.filepath):iffiles:yieldTriggerEvent(True)returnawaitasyncio.sleep(self.poke_interval)