## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP hook."""from__future__importannotationsimportdatetimeimportosimportstatimportwarningsfromfnmatchimportfnmatchfromtypingimportTYPE_CHECKING,Any,Callable,Sequenceimportasyncsshfromasgiref.syncimportsync_to_asyncfromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.hooks.baseimportBaseHookfromairflow.providers.ssh.hooks.sshimportSSHHookifTYPE_CHECKING:importparamikofromairflow.models.connectionimportConnection
[docs]classSFTPHook(SSHHook):"""Interact with SFTP. This hook inherits the SSH hook. Please refer to SSH hook for the input arguments. :Pitfalls:: - In contrast with FTPHook describe_directory only returns size, type and modify. It doesn't return unix.owner, unix.mode, perm, unix.group and unique. - retrieve_file and store_file only take a local full path and not a buffer. - If no mode is passed to create_directory it will be created with 777 permissions. Errors that may occur throughout but should be handled downstream. For consistency reasons with SSHHook, the preferred parameter is "ssh_conn_id". :param ssh_conn_id: The :ref:`sftp connection id<howto/connection:sftp>` :param ssh_hook: Optional SSH hook (included to support passing of an SSH hook to the SFTP operator) """
def__init__(self,ssh_conn_id:str|None="sftp_default",ssh_hook:SSHHook|None=None,*args,**kwargs,)->None:self.conn:paramiko.SFTPClient|None=None# TODO: remove support for ssh_hook when it is removed from SFTPOperatorself.ssh_hook=ssh_hookifself.ssh_hookisnotNone:warnings.warn("Parameter `ssh_hook` is deprecated and will be removed in a future version.",AirflowProviderDeprecationWarning,stacklevel=2,)ifnotisinstance(self.ssh_hook,SSHHook):raiseAirflowException(f"ssh_hook must be an instance of SSHHook, but got {type(self.ssh_hook)}")self.log.info("ssh_hook is provided. It will be used to generate SFTP connection.")self.ssh_conn_id=self.ssh_hook.ssh_conn_idreturnftp_conn_id=kwargs.pop("ftp_conn_id",None)ifftp_conn_id:warnings.warn("Parameter `ftp_conn_id` is deprecated. Please use `ssh_conn_id` instead.",AirflowProviderDeprecationWarning,stacklevel=2,)ssh_conn_id=ftp_conn_idkwargs["ssh_conn_id"]=ssh_conn_idself.ssh_conn_id=ssh_conn_idsuper().__init__(*args,**kwargs)
[docs]defget_conn(self)->paramiko.SFTPClient:# type: ignore[override]"""Open an SFTP connection to the remote host."""ifself.connisNone:# TODO: remove support for ssh_hook when it is removed from SFTPOperatorifself.ssh_hookisnotNone:self.conn=self.ssh_hook.get_conn().open_sftp()else:self.conn=super().get_conn().open_sftp()returnself.conn
[docs]defclose_conn(self)->None:"""Close the SFTP connection."""ifself.connisnotNone:self.conn.close()self.conn=None
[docs]defdescribe_directory(self,path:str)->dict[str,dict[str,str|int|None]]:"""Get file information in a directory on the remote system. The return format is ``{filename: {attributes}}``. The remote system support the MLSD command. :param path: full path to the remote directory """conn=self.get_conn()flist=sorted(conn.listdir_attr(path),key=lambdax:x.filename)files={}forfinflist:modify=datetime.datetime.fromtimestamp(f.st_mtime).strftime("%Y%m%d%H%M%S")# type: ignorefiles[f.filename]={"size":f.st_size,"type":"dir"ifstat.S_ISDIR(f.st_mode)else"file",# type: ignore"modify":modify,}returnfiles
[docs]deflist_directory(self,path:str)->list[str]:"""List files in a directory on the remote system. :param path: full path to the remote directory to list """conn=self.get_conn()files=sorted(conn.listdir(path))returnfiles
[docs]defmkdir(self,path:str,mode:int=0o777)->None:"""Create a directory on the remote system. The default mode is ``0o777``, but on some systems, the current umask value may be first masked out. :param path: full path to the remote directory to create :param mode: int permissions of octal mode for directory """conn=self.get_conn()conn.mkdir(path,mode=mode)
[docs]defisdir(self,path:str)->bool:"""Check if the path provided is a directory. :param path: full path to the remote directory to check """conn=self.get_conn()try:result=stat.S_ISDIR(conn.stat(path).st_mode)# type: ignoreexceptOSError:result=Falsereturnresult
[docs]defisfile(self,path:str)->bool:"""Check if the path provided is a file. :param path: full path to the remote file to check """conn=self.get_conn()try:result=stat.S_ISREG(conn.stat(path).st_mode)# type: ignoreexceptOSError:result=Falsereturnresult
[docs]defcreate_directory(self,path:str,mode:int=0o777)->None:"""Create a directory on the remote system. The default mode is ``0o777``, but on some systems, the current umask value may be first masked out. Different from :func:`.mkdir`, this function attempts to create parent directories if needed, and returns silently if the target directory already exists. :param path: full path to the remote directory to create :param mode: int permissions of octal mode for directory """conn=self.get_conn()ifself.isdir(path):self.log.info("%s already exists",path)returnelifself.isfile(path):raiseAirflowException(f"{path} already exists and is a file")else:dirname,basename=os.path.split(path)ifdirnameandnotself.isdir(dirname):self.create_directory(dirname,mode)ifbasename:self.log.info("Creating %s",path)conn.mkdir(path,mode=mode)
[docs]defdelete_directory(self,path:str)->None:"""Delete a directory on the remote system. :param path: full path to the remote directory to delete """conn=self.get_conn()conn.rmdir(path)
[docs]defretrieve_file(self,remote_full_path:str,local_full_path:str,prefetch:bool=True)->None:"""Transfer the remote file to a local location. If local_full_path is a string path, the file will be put at that location. :param remote_full_path: full path to the remote file :param local_full_path: full path to the local file :param prefetch: controls whether prefetch is performed (default: True) """conn=self.get_conn()conn.get(remote_full_path,local_full_path,prefetch=prefetch)
[docs]defstore_file(self,remote_full_path:str,local_full_path:str,confirm:bool=True)->None:"""Transfer a local file to the remote location. If local_full_path_or_buffer is a string path, the file will be read from that location. :param remote_full_path: full path to the remote file :param local_full_path: full path to the local file """conn=self.get_conn()conn.put(local_full_path,remote_full_path,confirm=confirm)
[docs]defdelete_file(self,path:str)->None:"""Remove a file on the server. :param path: full path to the remote file """conn=self.get_conn()conn.remove(path)
[docs]defget_mod_time(self,path:str)->str:"""Get an entry's modification time. :param path: full path to the remote file """conn=self.get_conn()ftp_mdtm=conn.stat(path).st_mtimereturndatetime.datetime.fromtimestamp(ftp_mdtm).strftime("%Y%m%d%H%M%S")# type: ignore
[docs]defpath_exists(self,path:str)->bool:"""Whether a remote entity exists. :param path: full path to the remote file or directory """conn=self.get_conn()try:conn.stat(path)exceptOSError:returnFalsereturnTrue
@staticmethoddef_is_path_match(path:str,prefix:str|None=None,delimiter:str|None=None)->bool:"""Whether given path starts with ``prefix`` (if set) and ends with ``delimiter`` (if set). :param path: path to be checked :param prefix: if set path will be checked is starting with prefix :param delimiter: if set path will be checked is ending with suffix :return: bool """ifprefixisnotNoneandnotpath.startswith(prefix):returnFalseifdelimiterisnotNoneandnotpath.endswith(delimiter):returnFalsereturnTrue
[docs]defwalktree(self,path:str,fcallback:Callable[[str],Any|None],dcallback:Callable[[str],Any|None],ucallback:Callable[[str],Any|None],recurse:bool=True,)->None:"""Recursively descend, depth first, the directory tree at ``path``. This calls discrete callback functions for each regular file, directory, and unknown file type. :param str path: root of remote directory to descend, use '.' to start at :attr:`.pwd` :param callable fcallback: callback function to invoke for a regular file. (form: ``func(str)``) :param callable dcallback: callback function to invoke for a directory. (form: ``func(str)``) :param callable ucallback: callback function to invoke for an unknown file type. (form: ``func(str)``) :param bool recurse: *Default: True* - should it recurse """conn=self.get_conn()forentryinself.list_directory(path):pathname=os.path.join(path,entry)mode=conn.stat(pathname).st_modeifstat.S_ISDIR(mode):# type: ignore# It's a directory, call the dcallback functiondcallback(pathname)ifrecurse:# now, recurse into itself.walktree(pathname,fcallback,dcallback,ucallback)elifstat.S_ISREG(mode):# type: ignore# It's a file, call the fcallback functionfcallback(pathname)else:# Unknown file typeucallback(pathname)
[docs]defget_tree_map(self,path:str,prefix:str|None=None,delimiter:str|None=None)->tuple[list[str],list[str],list[str]]:"""Get tuple with recursive lists of files, directories and unknown paths. It is possible to filter results by giving prefix and/or delimiter parameters. :param path: path from which tree will be built :param prefix: if set paths will be added if start with prefix :param delimiter: if set paths will be added if end with delimiter :return: tuple with list of files, dirs and unknown items """files:list[str]=[]dirs:list[str]=[]unknowns:list[str]=[]defappend_matching_path_callback(list_:list[str])->Callable:returnlambdaitem:list_.append(item)ifself._is_path_match(item,prefix,delimiter)elseNoneself.walktree(path=path,fcallback=append_matching_path_callback(files),dcallback=append_matching_path_callback(dirs),ucallback=append_matching_path_callback(unknowns),recurse=True,)returnfiles,dirs,unknowns
[docs]deftest_connection(self)->tuple[bool,str]:"""Test the SFTP connection by calling path with directory."""try:conn=self.get_conn()conn.normalize(".")returnTrue,"Connection successfully tested"exceptExceptionase:returnFalse,str(e)
[docs]defget_file_by_pattern(self,path,fnmatch_pattern)->str:"""Get the first matching file based on the given fnmatch type pattern. :param path: path to be checked :param fnmatch_pattern: The pattern that will be matched with `fnmatch` :return: string containing the first found file, or an empty string if none matched """forfileinself.list_directory(path):iffnmatch(file,fnmatch_pattern):returnfilereturn""
[docs]defget_files_by_pattern(self,path,fnmatch_pattern)->list[str]:"""Get all matching files based on the given fnmatch type pattern. :param path: path to be checked :param fnmatch_pattern: The pattern that will be matched with `fnmatch` :return: list of string containing the found files, or an empty list if none matched """matched_files=[]forfileinself.list_directory(path):iffnmatch(file,fnmatch_pattern):matched_files.append(file)returnmatched_files
[docs]classSFTPHookAsync(BaseHook):""" Interact with an SFTP server via asyncssh package. :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server :param host: hostname of the SFTP server :param port: port of the SFTP server :param username: username used when authenticating to the SFTP server :param password: password used when authenticating to the SFTP server. Can be left blank if using a key file :param known_hosts: path to the known_hosts file on the local file system. Defaults to ``~/.ssh/known_hosts``. :param key_file: path to the client key file used for authentication to SFTP server :param passphrase: passphrase used with the key_file for authentication to SFTP server """
def__init__(# nosec: B107self,sftp_conn_id:str=default_conn_name,host:str="",port:int=22,username:str="",password:str="",known_hosts:str=default_known_hosts,key_file:str="",passphrase:str="",private_key:str="",)->None:self.sftp_conn_id=sftp_conn_idself.host=hostself.port=portself.username=usernameself.password=passwordself.known_hosts:bytes|str=os.path.expanduser(known_hosts)self.key_file=key_fileself.passphrase=passphraseself.private_key=private_keydef_parse_extras(self,conn:Connection)->None:"""Parse extra fields from the connection into instance fields."""extra_options=conn.extra_dejsonif"key_file"inextra_optionsandself.key_file=="":self.key_file=extra_options["key_file"]if"known_hosts"inextra_optionsandself.known_hosts!=self.default_known_hosts:self.known_hosts=extra_options["known_hosts"]if("passphrase"or"private_key_passphrase")inextra_options:self.passphrase=extra_options["passphrase"]if"private_key"inextra_options:self.private_key=extra_options["private_key"]host_key=extra_options.get("host_key")no_host_key_check=extra_options.get("no_host_key_check")ifno_host_key_checkisnotNone:no_host_key_check=str(no_host_key_check).lower()=="true"ifhost_keyisnotNoneandno_host_key_check:raiseValueError("Host key check was skipped, but `host_key` value was given")ifno_host_key_check:self.log.warning("No Host Key Verification. This won't protect against Man-In-The-Middle attacks")self.known_hosts="none"ifhost_keyisnotNone:self.known_hosts=f"{conn.host}{host_key}".encode()asyncdef_get_conn(self)->asyncssh.SSHClientConnection:""" Asynchronously connect to the SFTP server as an SSH client. The following parameters are provided either in the extra json object in the SFTP connection definition - key_file - known_hosts - passphrase """conn=awaitsync_to_async(self.get_connection)(self.sftp_conn_id)ifconn.extraisnotNone:self._parse_extras(conn)conn_config={"host":conn.host,"port":conn.port,"username":conn.login,"password":conn.password,}ifself.key_file:conn_config.update(client_keys=self.key_file)ifself.known_hosts:ifself.known_hosts.lower()=="none":conn_config.update(known_hosts=None)else:conn_config.update(known_hosts=self.known_hosts)ifself.private_key:_private_key=asyncssh.import_private_key(self.private_key,self.passphrase)conn_config.update(client_keys=[_private_key])ifself.passphrase:conn_config.update(passphrase=self.passphrase)ssh_client_conn=awaitasyncssh.connect(**conn_config)returnssh_client_conn
[docs]asyncdeflist_directory(self,path:str="")->list[str]|None:"""Return a list of files on the SFTP server at the provided path."""ssh_conn=awaitself._get_conn()sftp_client=awaitssh_conn.start_sftp_client()try:files=awaitsftp_client.listdir(path)returnsorted(files)exceptasyncssh.SFTPNoSuchFile:returnNone
[docs]asyncdefread_directory(self,path:str="")->Sequence[asyncssh.sftp.SFTPName]|None:"""Return a list of files along with their attributes on the SFTP server at the provided path."""ssh_conn=awaitself._get_conn()sftp_client=awaitssh_conn.start_sftp_client()try:files=awaitsftp_client.readdir(path)returnfilesexceptasyncssh.SFTPNoSuchFile:returnNone
[docs]asyncdefget_files_and_attrs_by_pattern(self,path:str="",fnmatch_pattern:str="")->Sequence[asyncssh.sftp.SFTPName]:""" Get the files along with their attributes matching the pattern (e.g. ``*.pdf``) at the provided path. if one exists. Otherwise, raises an AirflowException to be handled upstream for deferring """files_list=awaitself.read_directory(path)iffiles_listisNone:raiseFileNotFoundError(f"No files at path {path!r} found...")matched_files=[fileforfileinfiles_listiffnmatch(str(file.filename),fnmatch_pattern)]returnmatched_files
[docs]asyncdefget_mod_time(self,path:str)->str:""" Make SFTP async connection. Looks for last modified time in the specific file path and returns last modification time for the file path. :param path: full path to the remote file """ssh_conn=awaitself._get_conn()sftp_client=awaitssh_conn.start_sftp_client()try:ftp_mdtm=awaitsftp_client.stat(path)modified_time=ftp_mdtm.mtimemod_time=datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S")# type: ignore[arg-type]self.log.info("Found File %s last modified: %s",str(path),str(mod_time))returnmod_timeexceptasyncssh.SFTPNoSuchFile:raiseAirflowException("No files matching")