Source code for airflow.providers.sftp.sensors.sftp
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP sensor."""from__future__importannotationsimportosfromdatetimeimportdatetimefromtypingimportTYPE_CHECKING,Any,Callable,Sequencefromparamiko.sftpimportSFTP_NO_SUCH_FILEfromairflow.providers.sftp.hooks.sftpimportSFTPHookfromairflow.sensors.baseimportBaseSensorOperator,PokeReturnValuefromairflow.utils.timezoneimportconvert_to_utcifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classSFTPSensor(BaseSensorOperator):""" Waits for a file or directory to be present on SFTP. :param path: Remote file or directory path :param file_pattern: The pattern that will be used to match the file (fnmatch format) :param sftp_conn_id: The connection to run the sensor against :param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive """
[docs]defpoke(self,context:Context)->PokeReturnValue|bool:self.hook=SFTPHook(self.sftp_conn_id)self.log.info("Poking for %s, with pattern %s",self.path,self.file_pattern)files_found=[]ifself.file_pattern:files_from_pattern=self.hook.get_files_by_pattern(self.path,self.file_pattern)iffiles_from_pattern:actual_files_to_check=[os.path.join(self.path,file_from_pattern)forfile_from_patterninfiles_from_pattern]else:returnFalseelse:actual_files_to_check=[self.path]foractual_file_to_checkinactual_files_to_check:try:mod_time=self.hook.get_mod_time(actual_file_to_check)self.log.info("Found File %s last modified: %s",str(actual_file_to_check),str(mod_time))exceptOSErrorase:ife.errno!=SFTP_NO_SUCH_FILE:raiseecontinueifself.newer_than:_mod_time=convert_to_utc(datetime.strptime(mod_time,"%Y%m%d%H%M%S"))_newer_than=convert_to_utc(self.newer_than)if_newer_than<=_mod_time:files_found.append(actual_file_to_check)else:files_found.append(actual_file_to_check)self.hook.close_conn()ifnotlen(files_found):returnFalseifself.python_callableisnotNone:ifself.op_kwargs:self.op_kwargs["files_found"]=files_foundcallable_return=self.python_callable(*self.op_args,**self.op_kwargs)returnPokeReturnValue(is_done=True,xcom_value={"files_found":files_found,"decorator_return_value":callable_return},)returnTrue