Source code for airflow.providers.sftp.operators.sftp
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP operator."""from__future__importannotationsimportosimportwarningsfrompathlibimportPathfromtypingimportAny,Sequencefromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.modelsimportBaseOperatorfromairflow.providers.sftp.hooks.sftpimportSFTPHookfromairflow.providers.ssh.hooks.sshimportSSHHook
[docs]classSFTPOperation:"""Operation that can be used with SFTP."""
[docs]classSFTPOperator(BaseOperator):""" SFTPOperator for transferring files from remote host to local or vice a versa. This operator uses sftp_hook to open sftp transport channel that serve as basis for file transfer. :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>` from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook` or `sftp_hook` is provided. :param sftp_hook: predefined SFTPHook to use Either `sftp_hook` or `ssh_conn_id` needs to be provided. :param ssh_hook: Deprecated - predefined SSHHook to use for remote execution Use `sftp_hook` instead. :param remote_host: remote host to connect (templated) Nullable. If provided, it will replace the `remote_host` which was defined in `sftp_hook`/`ssh_hook` or predefined in the connection of `ssh_conn_id`. :param local_filepath: local file path or list of local file paths to get or put. (templated) :param remote_filepath: remote file path or list of remote file paths to get or put. (templated) :param operation: specify operation 'get' or 'put', defaults to put :param confirm: specify if the SFTP operation should be confirmed, defaults to True :param create_intermediate_dirs: create missing intermediate directories when copying from remote to local and vice-versa. Default is False. Example: The following task would copy ``file.txt`` to the remote host at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they don't exist. If the parameter is not passed it would error as the directory does not exist. :: put_file = SFTPOperator( task_id="test_sftp", ssh_conn_id="ssh_default", local_filepath="/tmp/file.txt", remote_filepath="/tmp/tmp1/tmp2/file.txt", operation="put", create_intermediate_dirs=True, dag=dag ) """
[docs]defexecute(self,context:Any)->str|list[str]|None:ifisinstance(self.local_filepath,str):local_filepath_array=[self.local_filepath]else:local_filepath_array=self.local_filepathifisinstance(self.remote_filepath,str):remote_filepath_array=[self.remote_filepath]else:remote_filepath_array=self.remote_filepathiflen(local_filepath_array)!=len(remote_filepath_array):raiseValueError(f"{len(local_filepath_array)} paths in local_filepath "f"!= {len(remote_filepath_array)} paths in remote_filepath")ifnot(self.operation.lower()==SFTPOperation.GETorself.operation.lower()==SFTPOperation.PUT):raiseTypeError(f"Unsupported operation value {self.operation}, "f"expected {SFTPOperation.GET} or {SFTPOperation.PUT}.")# TODO: remove support for ssh_hook in next major provider version in hook and operatorifself.ssh_hookisnotNoneandself.sftp_hookisnotNone:raiseAirflowException("Both `ssh_hook` and `sftp_hook` are defined. Please use only one of them.")ifself.ssh_hookisnotNone:ifnotisinstance(self.ssh_hook,SSHHook):self.log.info("ssh_hook is invalid. Trying ssh_conn_id to create SFTPHook.")self.sftp_hook=SFTPHook(ssh_conn_id=self.ssh_conn_id)ifself.sftp_hookisNone:warnings.warn("Parameter `ssh_hook` is deprecated""Please use `sftp_hook` instead.""The old parameter `ssh_hook` will be removed in a future version.",AirflowProviderDeprecationWarning,stacklevel=2,)self.sftp_hook=SFTPHook(ssh_hook=self.ssh_hook)file_msg=Nonetry:ifself.ssh_conn_id:ifself.sftp_hookandisinstance(self.sftp_hook,SFTPHook):self.log.info("ssh_conn_id is ignored when sftp_hook/ssh_hook is provided.")else:self.log.info("sftp_hook/ssh_hook not provided or invalid. Trying ssh_conn_id to create SFTPHook.")self.sftp_hook=SFTPHook(ssh_conn_id=self.ssh_conn_id)ifnotself.sftp_hook:raiseAirflowException("Cannot operate without sftp_hook or ssh_conn_id.")ifself.remote_hostisnotNone:self.log.info("remote_host is provided explicitly. ""It will replace the remote_host which was defined ""in sftp_hook or predefined in connection of ssh_conn_id.")self.sftp_hook.remote_host=self.remote_hostfor_local_filepath,_remote_filepathinzip(local_filepath_array,remote_filepath_array):ifself.operation.lower()==SFTPOperation.GET:local_folder=os.path.dirname(_local_filepath)ifself.create_intermediate_dirs:Path(local_folder).mkdir(parents=True,exist_ok=True)file_msg=f"from {_remote_filepath} to {_local_filepath}"self.log.info("Starting to transfer %s",file_msg)self.sftp_hook.retrieve_file(_remote_filepath,_local_filepath)else:remote_folder=os.path.dirname(_remote_filepath)ifself.create_intermediate_dirs:self.sftp_hook.create_directory(remote_folder)file_msg=f"from {_local_filepath} to {_remote_filepath}"self.log.info("Starting to transfer file %s",file_msg)self.sftp_hook.store_file(_remote_filepath,_local_filepath,confirm=self.confirm)exceptExceptionase:raiseAirflowException(f"Error while transferring {file_msg}, error: {str(e)}")returnself.local_filepath