Source code for airflow.providers.sftp.hooks.sftp

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains SFTP hook."""
import datetime
import stat
from typing import Dict, List, Optional, Tuple

import pysftp

from airflow.providers.ssh.hooks.ssh import SSHHook


[docs]class SFTPHook(SSHHook): """ This hook is inherited from SSH hook. Please refer to SSH hook for the input arguments. Interact with SFTP. Aims to be interchangeable with FTPHook. :Pitfalls:: - In contrast with FTPHook describe_directory only returns size, type and modify. It doesn't return unix.owner, unix.mode, perm, unix.group and unique. - retrieve_file and store_file only take a local full path and not a buffer. - If no mode is passed to create_directory it will be created with 777 permissions. Errors that may occur throughout but should be handled downstream. """
[docs] conn_name_attr = 'ftp_conn_id'
[docs] default_conn_name = 'sftp_default'
[docs] conn_type = 'sftp'
[docs] hook_name = 'SFTP'
@staticmethod
[docs] def get_ui_field_behaviour() -> Dict: return { "hidden_fields": ['schema'], "relabeling": { 'login': 'Username',
}, } def __init__(self, ftp_conn_id: str = 'sftp_default', *args, **kwargs) -> None: kwargs['ssh_conn_id'] = ftp_conn_id super().__init__(*args, **kwargs) self.conn = None self.private_key_pass = None self.ciphers = None # Fail for unverified hosts, unless this is explicitly allowed self.no_host_key_check = False if self.ssh_conn_id is not None: conn = self.get_connection(self.ssh_conn_id) if conn.extra is not None: extra_options = conn.extra_dejson if 'private_key_pass' in extra_options: self.private_key_pass = extra_options.get('private_key_pass') # For backward compatibility # TODO: remove in Airflow 2.1 import warnings if 'ignore_hostkey_verification' in extra_options: warnings.warn( 'Extra option `ignore_hostkey_verification` is deprecated.' 'Please use `no_host_key_check` instead.' 'This option will be removed in Airflow 2.1', DeprecationWarning, stacklevel=2, ) self.no_host_key_check = ( str(extra_options['ignore_hostkey_verification']).lower() == 'true' ) if 'no_host_key_check' in extra_options: self.no_host_key_check = str(extra_options['no_host_key_check']).lower() == 'true' if 'ciphers' in extra_options: self.ciphers = extra_options['ciphers'] if 'private_key' in extra_options: warnings.warn( 'Extra option `private_key` is deprecated.' 'Please use `key_file` instead.' 'This option will be removed in Airflow 2.1', DeprecationWarning, stacklevel=2, ) self.key_file = extra_options.get('private_key')
[docs] def get_conn(self) -> pysftp.Connection: """Returns an SFTP connection object""" if self.conn is None: cnopts = pysftp.CnOpts() if self.no_host_key_check: cnopts.hostkeys = None cnopts.compression = self.compress cnopts.ciphers = self.ciphers conn_params = { 'host': self.remote_host, 'port': self.port, 'username': self.username, 'cnopts': cnopts, } if self.password and self.password.strip(): conn_params['password'] = self.password if self.key_file: conn_params['private_key'] = self.key_file if self.private_key_pass: conn_params['private_key_pass'] = self.private_key_pass self.conn = pysftp.Connection(**conn_params) return self.conn
[docs] def close_conn(self) -> None: """Closes the connection""" if self.conn is not None: self.conn.close() self.conn = None
[docs] def describe_directory(self, path: str) -> Dict[str, Dict[str, str]]: """ Returns a dictionary of {filename: {attributes}} for all files on the remote system (where the MLSD command is supported). :param path: full path to the remote directory :type path: str """ conn = self.get_conn() flist = conn.listdir_attr(path) files = {} for f in flist: modify = datetime.datetime.fromtimestamp(f.st_mtime).strftime('%Y%m%d%H%M%S') files[f.filename] = { 'size': f.st_size, 'type': 'dir' if stat.S_ISDIR(f.st_mode) else 'file', 'modify': modify, } return files
[docs] def list_directory(self, path: str) -> List[str]: """ Returns a list of files on the remote system. :param path: full path to the remote directory to list :type path: str """ conn = self.get_conn() files = conn.listdir(path) return files
[docs] def create_directory(self, path: str, mode: int = 777) -> None: """ Creates a directory on the remote system. :param path: full path to the remote directory to create :type path: str :param mode: int representation of octal mode for directory """ conn = self.get_conn() conn.makedirs(path, mode)
[docs] def delete_directory(self, path: str) -> None: """ Deletes a directory on the remote system. :param path: full path to the remote directory to delete :type path: str """ conn = self.get_conn() conn.rmdir(path)
[docs] def retrieve_file(self, remote_full_path: str, local_full_path: str) -> None: """ Transfers the remote file to a local location. If local_full_path is a string path, the file will be put at that location :param remote_full_path: full path to the remote file :type remote_full_path: str :param local_full_path: full path to the local file :type local_full_path: str """ conn = self.get_conn() self.log.info('Retrieving file from FTP: %s', remote_full_path) conn.get(remote_full_path, local_full_path) self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
[docs] def store_file(self, remote_full_path: str, local_full_path: str) -> None: """ Transfers a local file to the remote location. If local_full_path_or_buffer is a string path, the file will be read from that location :param remote_full_path: full path to the remote file :type remote_full_path: str :param local_full_path: full path to the local file :type local_full_path: str """ conn = self.get_conn() conn.put(local_full_path, remote_full_path)
[docs] def delete_file(self, path: str) -> None: """ Removes a file on the FTP Server :param path: full path to the remote file :type path: str """ conn = self.get_conn() conn.remove(path)
[docs] def get_mod_time(self, path: str) -> str: """ Returns modification time. :param path: full path to the remote file :type path: str """ conn = self.get_conn() ftp_mdtm = conn.stat(path).st_mtime return datetime.datetime.fromtimestamp(ftp_mdtm).strftime('%Y%m%d%H%M%S')
[docs] def path_exists(self, path: str) -> bool: """ Returns True if a remote entity exists :param path: full path to the remote file or directory :type path: str """ conn = self.get_conn() return conn.exists(path)
@staticmethod
[docs] def _is_path_match(path: str, prefix: Optional[str] = None, delimiter: Optional[str] = None) -> bool: """ Return True if given path starts with prefix (if set) and ends with delimiter (if set). :param path: path to be checked :type path: str :param prefix: if set path will be checked is starting with prefix :type prefix: str :param delimiter: if set path will be checked is ending with suffix :type delimiter: str :return: bool """ if prefix is not None and not path.startswith(prefix): return False if delimiter is not None and not path.endswith(delimiter): return False return True
[docs] def get_tree_map( self, path: str, prefix: Optional[str] = None, delimiter: Optional[str] = None ) -> Tuple[List[str], List[str], List[str]]: """ Return tuple with recursive lists of files, directories and unknown paths from given path. It is possible to filter results by giving prefix and/or delimiter parameters. :param path: path from which tree will be built :type path: str :param prefix: if set paths will be added if start with prefix :type prefix: str :param delimiter: if set paths will be added if end with delimiter :type delimiter: str :return: tuple with list of files, dirs and unknown items :rtype: Tuple[List[str], List[str], List[str]] """ conn = self.get_conn() files, dirs, unknowns = [], [], [] # type: List[str], List[str], List[str] def append_matching_path_callback(list_): return lambda item: list_.append(item) if self._is_path_match(item, prefix, delimiter) else None conn.walktree( remotepath=path, fcallback=append_matching_path_callback(files), dcallback=append_matching_path_callback(dirs), ucallback=append_matching_path_callback(unknowns), recurse=True, ) return files, dirs, unknowns

Was this entry helpful?