## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP hook."""from__future__importannotationsimportdatetimeimportosimportstatimportwarningsfromcollections.abcimportGenerator,SequencefromcontextlibimportcontextmanagerfromfnmatchimportfnmatchfromioimportBytesIOfrompathlibimportPathfromtypingimportTYPE_CHECKING,Any,Callableimportasyncsshfromasgiref.syncimportsync_to_asyncfromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.hooks.baseimportBaseHookfromairflow.providers.ssh.hooks.sshimportSSHHookifTYPE_CHECKING:fromparamikoimportSSHClientfromparamiko.sftp_attrimportSFTPAttributesfromparamiko.sftp_clientimportSFTPClientfromairflow.models.connectionimportConnection
[docs]classSFTPHook(SSHHook):""" Interact with SFTP. This hook inherits the SSH hook. Please refer to SSH hook for the input arguments. :Pitfalls:: - In contrast with FTPHook describe_directory only returns size, type and modify. It doesn't return unix.owner, unix.mode, perm, unix.group and unique. - If no mode is passed to create_directory it will be created with 777 permissions. Errors that may occur throughout but should be handled downstream. For consistency reasons with SSHHook, the preferred parameter is "ssh_conn_id". :param ssh_conn_id: The :ref:`sftp connection id<howto/connection:sftp>` """
# TODO: remove support for ssh_hook when it is removed from SFTPOperatorifkwargs.get("ssh_hook")isnotNone:warnings.warn("Parameter `ssh_hook` is deprecated and will be ignored.",AirflowProviderDeprecationWarning,stacklevel=2,)ftp_conn_id=kwargs.pop("ftp_conn_id",None)ifftp_conn_id:warnings.warn("Parameter `ftp_conn_id` is deprecated. Please use `ssh_conn_id` instead.",AirflowProviderDeprecationWarning,stacklevel=2,)ssh_conn_id=ftp_conn_idkwargs["ssh_conn_id"]=ssh_conn_idkwargs["host_proxy_cmd"]=host_proxy_cmd
[docs]defget_conn(self)->SFTPClient:# type: ignore[override]"""Open an SFTP connection to the remote host."""ifself.connisNone:self.conn=super().get_conn().open_sftp()returnself.conn
[docs]defclose_conn(self)->None:"""Close the SFTP connection."""ifself.connisnotNone:self.conn.close()self.conn=None
@contextmanager
[docs]defget_managed_conn(self)->Generator[SFTPClient,None,None]:"""Context manager that closes the connection after use."""ifself._sftp_connisNone:ssh_conn:SSHClient=super().get_conn()self._ssh_conn=ssh_connself._sftp_conn=ssh_conn.open_sftp()self._conn_count+=1try:yieldself._sftp_connfinally:self._conn_count-=1ifself._conn_count==0andself._ssh_connisnotNoneandself._sftp_connisnotNone:self._sftp_conn.close()self._sftp_conn=Noneself._ssh_conn.close()self._ssh_conn=None
[docs]defget_conn_count(self)->int:"""Get the number of open connections."""returnself._conn_count
[docs]defdescribe_directory(self,path:str)->dict[str,dict[str,str|int|None]]:""" Get file information in a directory on the remote system. The return format is ``{filename: {attributes}}``. The remote system support the MLSD command. :param path: full path to the remote directory """withself.get_managed_conn()asconn:# type: SFTPClientflist=sorted(conn.listdir_attr(path),key=lambdax:x.filename)files={}forfinflist:modify=datetime.datetime.fromtimestamp(f.st_mtime).strftime("%Y%m%d%H%M%S")# type: ignorefiles[f.filename]={"size":f.st_size,"type":"dir"ifstat.S_ISDIR(f.st_mode)else"file",# type: ignore"modify":modify,}returnfiles
[docs]deflist_directory(self,path:str)->list[str]:""" List files in a directory on the remote system. :param path: full path to the remote directory to list """withself.get_managed_conn()asconn:returnsorted(conn.listdir(path))
[docs]deflist_directory_with_attr(self,path:str)->list[SFTPAttributes]:""" List files in a directory on the remote system including their SFTPAttributes. :param path: full path to the remote directory to list """withself.get_managed_conn()asconn:return[fileforfileinconn.listdir_attr(path)]
[docs]defmkdir(self,path:str,mode:int=0o777)->None:""" Create a directory on the remote system. The default mode is ``0o777``, but on some systems, the current umask value may be first masked out. :param path: full path to the remote directory to create :param mode: int permissions of octal mode for directory """withself.get_managed_conn()asconn:conn.mkdir(path,mode=mode)
[docs]defisdir(self,path:str)->bool:""" Check if the path provided is a directory. :param path: full path to the remote directory to check """withself.get_managed_conn()asconn:try:returnstat.S_ISDIR(conn.stat(path).st_mode)# type: ignoreexceptOSError:returnFalse
[docs]defisfile(self,path:str)->bool:""" Check if the path provided is a file. :param path: full path to the remote file to check """withself.get_managed_conn()asconn:try:returnstat.S_ISREG(conn.stat(path).st_mode)# type: ignoreexceptOSError:returnFalse
[docs]defcreate_directory(self,path:str,mode:int=0o777)->None:""" Create a directory on the remote system. The default mode is ``0o777``, but on some systems, the current umask value may be first masked out. Different from :func:`.mkdir`, this function attempts to create parent directories if needed, and returns silently if the target directory already exists. :param path: full path to the remote directory to create :param mode: int permissions of octal mode for directory """ifself.isdir(path):self.log.info("%s already exists",path)returnelifself.isfile(path):raiseAirflowException(f"{path} already exists and is a file")else:dirname,basename=os.path.split(path)ifdirnameandnotself.isdir(dirname):self.create_directory(dirname,mode)ifbasename:self.log.info("Creating %s",path)withself.get_managed_conn()asconn:conn.mkdir(path,mode=mode)
[docs]defdelete_directory(self,path:str,include_files:bool=False)->None:""" Delete a directory on the remote system. :param path: full path to the remote directory to delete """files:list[str]=[]dirs:list[str]=[]ifinclude_filesisTrue:files,dirs,_=self.get_tree_map(path)dirs=dirs[::-1]# reverse the order for deleting deepest directories firstwithself.get_managed_conn()asconn:forfile_pathinfiles:conn.remove(file_path)fordir_pathindirs:conn.rmdir(dir_path)conn.rmdir(path)
[docs]defretrieve_file(self,remote_full_path:str,local_full_path:str,prefetch:bool=True)->None:""" Transfer the remote file to a local location. If local_full_path is a string path, the file will be put at that location. :param remote_full_path: full path to the remote file :param local_full_path: full path to the local file or a file-like buffer :param prefetch: controls whether prefetch is performed (default: True) """withself.get_managed_conn()asconn:ifisinstance(local_full_path,BytesIO):conn.getfo(remote_full_path,local_full_path,prefetch=prefetch)else:conn.get(remote_full_path,local_full_path,prefetch=prefetch)
[docs]defstore_file(self,remote_full_path:str,local_full_path:str,confirm:bool=True)->None:""" Transfer a local file to the remote location. If local_full_path_or_buffer is a string path, the file will be read from that location. :param remote_full_path: full path to the remote file :param local_full_path: full path to the local file or a file-like buffer """withself.get_managed_conn()asconn:ifisinstance(local_full_path,BytesIO):conn.putfo(local_full_path,remote_full_path,confirm=confirm)else:conn.put(local_full_path,remote_full_path,confirm=confirm)
[docs]defdelete_file(self,path:str)->None:""" Remove a file on the server. :param path: full path to the remote file """withself.get_managed_conn()asconn:conn.remove(path)
[docs]defretrieve_directory(self,remote_full_path:str,local_full_path:str,prefetch:bool=True)->None:""" Transfer the remote directory to a local location. If local_full_path is a string path, the directory will be put at that location. :param remote_full_path: full path to the remote directory :param local_full_path: full path to the local directory :param prefetch: controls whether prefetch is performed (default: True) """ifPath(local_full_path).exists():raiseAirflowException(f"{local_full_path} already exists")Path(local_full_path).mkdir(parents=True)withself.get_managed_conn():files,dirs,_=self.get_tree_map(remote_full_path)fordir_pathindirs:new_local_path=os.path.join(local_full_path,os.path.relpath(dir_path,remote_full_path))Path(new_local_path).mkdir(parents=True,exist_ok=True)forfile_pathinfiles:new_local_path=os.path.join(local_full_path,os.path.relpath(file_path,remote_full_path))self.retrieve_file(file_path,new_local_path,prefetch)
[docs]defstore_directory(self,remote_full_path:str,local_full_path:str,confirm:bool=True)->None:""" Transfer a local directory to the remote location. If local_full_path is a string path, the directory will be read from that location. :param remote_full_path: full path to the remote directory :param local_full_path: full path to the local directory """ifself.path_exists(remote_full_path):raiseAirflowException(f"{remote_full_path} already exists")withself.get_managed_conn():self.create_directory(remote_full_path)forroot,dirs,filesinos.walk(local_full_path):fordir_nameindirs:dir_path=os.path.join(root,dir_name)new_remote_path=os.path.join(remote_full_path,os.path.relpath(dir_path,local_full_path))self.create_directory(new_remote_path)forfile_nameinfiles:file_path=os.path.join(root,file_name)new_remote_path=os.path.join(remote_full_path,os.path.relpath(file_path,local_full_path))self.store_file(new_remote_path,file_path,confirm)
[docs]defget_mod_time(self,path:str)->str:""" Get an entry's modification time. :param path: full path to the remote file """withself.get_managed_conn()asconn:ftp_mdtm=conn.stat(path).st_mtimereturndatetime.datetime.fromtimestamp(ftp_mdtm).strftime("%Y%m%d%H%M%S")# type: ignore
[docs]defpath_exists(self,path:str)->bool:""" Whether a remote entity exists. :param path: full path to the remote file or directory """withself.get_managed_conn()asconn:try:conn.stat(path)exceptOSError:returnFalsereturnTrue
@staticmethoddef_is_path_match(path:str,prefix:str|None=None,delimiter:str|None=None)->bool:""" Whether given path starts with ``prefix`` (if set) and ends with ``delimiter`` (if set). :param path: path to be checked :param prefix: if set path will be checked is starting with prefix :param delimiter: if set path will be checked is ending with suffix :return: bool """ifprefixisnotNoneandnotpath.startswith(prefix):returnFalseifdelimiterisnotNoneandnotpath.endswith(delimiter):returnFalsereturnTrue
[docs]defwalktree(self,path:str,fcallback:Callable[[str],Any|None],dcallback:Callable[[str],Any|None],ucallback:Callable[[str],Any|None],recurse:bool=True,)->None:""" Recursively descend, depth first, the directory tree at ``path``. This calls discrete callback functions for each regular file, directory, and unknown file type. :param str path: root of remote directory to descend, use '.' to start at :attr:`.pwd` :param callable fcallback: callback function to invoke for a regular file. (form: ``func(str)``) :param callable dcallback: callback function to invoke for a directory. (form: ``func(str)``) :param callable ucallback: callback function to invoke for an unknown file type. (form: ``func(str)``) :param bool recurse: *Default: True* - should it recurse """forentryinself.list_directory_with_attr(path):pathname=os.path.join(path,entry.filename)mode=entry.st_modeifstat.S_ISDIR(mode):# type: ignore# It's a directory, call the dcallback functiondcallback(pathname)ifrecurse:# now, recurse into itself.walktree(pathname,fcallback,dcallback,ucallback)elifstat.S_ISREG(mode):# type: ignore# It's a file, call the fcallback functionfcallback(pathname)else:# Unknown file typeucallback(pathname)
[docs]defget_tree_map(self,path:str,prefix:str|None=None,delimiter:str|None=None)->tuple[list[str],list[str],list[str]]:""" Get tuple with recursive lists of files, directories and unknown paths. It is possible to filter results by giving prefix and/or delimiter parameters. :param path: path from which tree will be built :param prefix: if set paths will be added if start with prefix :param delimiter: if set paths will be added if end with delimiter :return: tuple with list of files, dirs and unknown items """files:list[str]=[]dirs:list[str]=[]unknowns:list[str]=[]defappend_matching_path_callback(list_:list[str])->Callable:returnlambdaitem:list_.append(item)ifself._is_path_match(item,prefix,delimiter)elseNoneself.walktree(path=path,fcallback=append_matching_path_callback(files),dcallback=append_matching_path_callback(dirs),ucallback=append_matching_path_callback(unknowns),recurse=True,)returnfiles,dirs,unknowns
[docs]deftest_connection(self)->tuple[bool,str]:"""Test the SFTP connection by calling path with directory."""try:withself.get_managed_conn()asconn:conn.normalize(".")returnTrue,"Connection successfully tested"exceptExceptionase:returnFalse,str(e)
[docs]defget_file_by_pattern(self,path,fnmatch_pattern)->str:""" Get the first matching file based on the given fnmatch type pattern. :param path: path to be checked :param fnmatch_pattern: The pattern that will be matched with `fnmatch` :return: string containing the first found file, or an empty string if none matched """forfileinself.list_directory(path):iffnmatch(file,fnmatch_pattern):returnfilereturn""
[docs]defget_files_by_pattern(self,path,fnmatch_pattern)->list[str]:""" Get all matching files based on the given fnmatch type pattern. :param path: path to be checked :param fnmatch_pattern: The pattern that will be matched with `fnmatch` :return: list of string containing the found files, or an empty list if none matched """matched_files=[]forfileinself.list_directory_with_attr(path):iffnmatch(file.filename,fnmatch_pattern):matched_files.append(file.filename)returnmatched_files
[docs]classSFTPHookAsync(BaseHook):""" Interact with an SFTP server via asyncssh package. :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server :param host: hostname of the SFTP server :param port: port of the SFTP server :param username: username used when authenticating to the SFTP server :param password: password used when authenticating to the SFTP server. Can be left blank if using a key file :param known_hosts: path to the known_hosts file on the local file system. Defaults to ``~/.ssh/known_hosts``. :param key_file: path to the client key file used for authentication to SFTP server :param passphrase: passphrase used with the key_file for authentication to SFTP server """
def_parse_extras(self,conn:Connection)->None:"""Parse extra fields from the connection into instance fields."""extra_options=conn.extra_dejsonif"key_file"inextra_optionsandself.key_file=="":self.key_file=extra_options["key_file"]if"known_hosts"inextra_optionsandself.known_hosts!=self.default_known_hosts:self.known_hosts=extra_options["known_hosts"]if("passphrase"or"private_key_passphrase")inextra_options:self.passphrase=extra_options["passphrase"]if"private_key"inextra_options:self.private_key=extra_options["private_key"]host_key=extra_options.get("host_key")no_host_key_check=extra_options.get("no_host_key_check")ifno_host_key_checkisnotNone:no_host_key_check=str(no_host_key_check).lower()=="true"ifhost_keyisnotNoneandno_host_key_check:raiseValueError("Host key check was skipped, but `host_key` value was given")ifno_host_key_check:self.log.warning("No Host Key Verification. This won't protect against Man-In-The-Middle attacks")self.known_hosts="none"ifhost_keyisnotNone:self.known_hosts=f"{conn.host}{host_key}".encode()asyncdef_get_conn(self)->asyncssh.SSHClientConnection:""" Asynchronously connect to the SFTP server as an SSH client. The following parameters are provided either in the extra json object in the SFTP connection definition - key_file - known_hosts - passphrase """conn=awaitsync_to_async(self.get_connection)(self.sftp_conn_id)ifconn.extraisnotNone:self._parse_extras(conn)conn_config={"host":conn.host,"port":conn.port,"username":conn.login,"password":conn.password,}ifself.key_file:conn_config.update(client_keys=self.key_file)ifself.known_hosts:ifself.known_hosts.lower()=="none":conn_config.update(known_hosts=None)else:conn_config.update(known_hosts=self.known_hosts)ifself.private_key:_private_key=asyncssh.import_private_key(self.private_key,self.passphrase)conn_config.update(client_keys=[_private_key])ifself.passphrase:conn_config.update(passphrase=self.passphrase)ssh_client_conn=awaitasyncssh.connect(**conn_config)returnssh_client_conn
[docs]asyncdeflist_directory(self,path:str="")->list[str]|None:# type: ignore[return]"""Return a list of files on the SFTP server at the provided path."""asyncwithawaitself._get_conn()asssh_conn:sftp_client=awaitssh_conn.start_sftp_client()try:files=awaitsftp_client.listdir(path)returnsorted(files)exceptasyncssh.SFTPNoSuchFile:returnNone
[docs]asyncdefread_directory(self,path:str="")->Sequence[asyncssh.sftp.SFTPName]|None:# type: ignore[return]"""Return a list of files along with their attributes on the SFTP server at the provided path."""asyncwithawaitself._get_conn()asssh_conn:sftp_client=awaitssh_conn.start_sftp_client()try:returnawaitsftp_client.readdir(path)exceptasyncssh.SFTPNoSuchFile:returnNone
[docs]asyncdefget_files_and_attrs_by_pattern(self,path:str="",fnmatch_pattern:str="")->Sequence[asyncssh.sftp.SFTPName]:""" Get the files along with their attributes matching the pattern (e.g. ``*.pdf``) at the provided path. if one exists. Otherwise, raises an AirflowException to be handled upstream for deferring """files_list=awaitself.read_directory(path)iffiles_listisNone:raiseFileNotFoundError(f"No files at path {path!r} found...")matched_files=[fileforfileinfiles_listiffnmatch(str(file.filename),fnmatch_pattern)]returnmatched_files
[docs]asyncdefget_mod_time(self,path:str)->str:# type: ignore[return]""" Make SFTP async connection. Looks for last modified time in the specific file path and returns last modification time for the file path. :param path: full path to the remote file """asyncwithawaitself._get_conn()asssh_conn:try:sftp_client=awaitssh_conn.start_sftp_client()ftp_mdtm=awaitsftp_client.stat(path)modified_time=ftp_mdtm.mtimemod_time=datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S")# type: ignore[arg-type]self.log.info("Found File %s last modified: %s",str(path),str(mod_time))returnmod_timeexceptasyncssh.SFTPNoSuchFile:raiseAirflowException("No files matching")