Source code for airflow.providers.sftp.sensors.sftp
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP sensor."""from__future__importannotationsimportosfromcollections.abcimportSequencefromdatetimeimportdatetime,timedeltafromtypingimportTYPE_CHECKING,Any,Callablefromparamiko.sftpimportSFTP_NO_SUCH_FILEfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.providers.sftp.hooks.sftpimportSFTPHookfromairflow.providers.sftp.triggers.sftpimportSFTPTriggerfromairflow.sensors.baseimportBaseSensorOperator,PokeReturnValuefromairflow.utils.timezoneimportconvert_to_utc,parseifTYPE_CHECKING:try:fromairflow.sdk.definitions.contextimportContextexceptImportError:# TODO: Remove once provider drops support for Airflow 2fromairflow.utils.contextimportContext
[docs]classSFTPSensor(BaseSensorOperator):""" Waits for a file or directory to be present on SFTP. :param path: Remote file or directory path :param file_pattern: The pattern that will be used to match the file (fnmatch format) :param sftp_conn_id: The connection to run the sensor against :param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive :param deferrable: If waiting for completion, whether to defer the task until done, default is ``False``. """
[docs]defpoke(self,context:Context)->PokeReturnValue|bool:self.hook=SFTPHook(self.sftp_conn_id)self.log.info("Poking for %s, with pattern %s",self.path,self.file_pattern)files_found=[]ifself.file_pattern:files_from_pattern=self.hook.get_files_by_pattern(self.path,self.file_pattern)iffiles_from_pattern:actual_files_to_check=[os.path.join(self.path,file_from_pattern)forfile_from_patterninfiles_from_pattern]else:returnFalseelse:actual_files_to_check=[self.path]foractual_file_to_checkinactual_files_to_check:try:mod_time=self.hook.get_mod_time(actual_file_to_check)self.log.info("Found File %s last modified: %s",actual_file_to_check,mod_time)exceptOSErrorase:ife.errno!=SFTP_NO_SUCH_FILE:raiseAirflowExceptionfromecontinueifself.newer_than:ifisinstance(self.newer_than,str):self.newer_than=parse(self.newer_than)_mod_time=convert_to_utc(datetime.strptime(mod_time,"%Y%m%d%H%M%S"))_newer_than=convert_to_utc(self.newer_than)if_newer_than<=_mod_time:files_found.append(actual_file_to_check)self.log.info("File %s has modification time: '%s', which is newer than: '%s'",actual_file_to_check,str(_mod_time),str(_newer_than),)else:self.log.info("File %s has modification time: '%s', which is older than: '%s'",actual_file_to_check,str(_mod_time),str(_newer_than),)else:files_found.append(actual_file_to_check)ifnotlen(files_found):returnFalseifself.python_callableisnotNone:ifself.op_kwargs:self.op_kwargs["files_found"]=files_foundcallable_return=self.python_callable(*self.op_args,**self.op_kwargs)returnPokeReturnValue(is_done=True,xcom_value={"files_found":files_found,"decorator_return_value":callable_return},)returnTrue
[docs]defexecute(self,context:Context)->Any:# Unlike other async sensors, we do not follow the pattern of calling the synchronous self.poke()# method before deferring here. This is due to the current limitations we have in the synchronous# SFTPHook methods. They are as follows:## For file_pattern sensing, the hook implements list_directory() method which returns a list of# filenames only without the attributes like modified time which is required for the file_pattern# sensing when newer_than is supplied. This leads to intermittent failures potentially due to# throttling by the SFTP server as the hook makes multiple calls to the server to get the# attributes for each of the files in the directory.This limitation is resolved here by instead# calling the read_directory() method which returns a list of files along with their attributes# in a single call. We can add back the call to self.poke() before deferring once the above# limitations are resolved in the sync sensor.ifself.deferrable:self.defer(timeout=timedelta(seconds=self.timeout),trigger=SFTPTrigger(path=self.path,file_pattern=self.file_pattern,sftp_conn_id=self.sftp_conn_id,poke_interval=self.poke_interval,newer_than=self.newer_than,),method_name="execute_complete",)else:returnsuper().execute(context=context)
[docs]defexecute_complete(self,context:dict[str,Any],event:Any=None)->None:""" Execute callback when the trigger fires; returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ifeventisnotNone:if"status"ineventandevent["status"]=="error":raiseAirflowException(event["message"])if"status"ineventandevent["status"]=="success":self.log.info("%s completed successfully.",self.task_id)self.log.info(event["message"])returnNoneraiseAirflowException("No event received in trigger callback")