Source code for airflow.providers.sftp.sensors.sftp
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP sensor."""from__future__importannotationsimportosfromdatetimeimportdatetimefromtypingimportTYPE_CHECKING,Sequencefromparamiko.sftpimportSFTP_NO_SUCH_FILEfromairflow.providers.sftp.hooks.sftpimportSFTPHookfromairflow.sensors.baseimportBaseSensorOperatorfromairflow.utils.timezoneimportconvert_to_utcifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classSFTPSensor(BaseSensorOperator):""" Waits for a file or directory to be present on SFTP. :param path: Remote file or directory path :param file_pattern: The pattern that will be used to match the file (fnmatch format) :param sftp_conn_id: The connection to run the sensor against :param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive """
[docs]defpoke(self,context:Context)->bool:self.hook=SFTPHook(self.sftp_conn_id)self.log.info("Poking for %s, with pattern %s",self.path,self.file_pattern)ifself.file_pattern:files_from_pattern=self.hook.get_files_by_pattern(self.path,self.file_pattern)iffiles_from_pattern:actual_files_to_check=[os.path.join(self.path,file_from_pattern)forfile_from_patterninfiles_from_pattern]else:returnFalseelse:actual_files_to_check=[self.path]foractual_file_to_checkinactual_files_to_check:try:mod_time=self.hook.get_mod_time(actual_file_to_check)self.log.info("Found File %s last modified: %s",str(actual_file_to_check),str(mod_time))exceptOSErrorase:ife.errno!=SFTP_NO_SUCH_FILE:raiseecontinueifself.newer_than:_mod_time=convert_to_utc(datetime.strptime(mod_time,"%Y%m%d%H%M%S"))_newer_than=convert_to_utc(self.newer_than)if_newer_than<=_mod_time:returnTrueelse:returnTrueself.hook.close_conn()returnFalse