Source code for airflow.providers.sftp.operators.sftp
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains SFTP operator."""from__future__importannotationsimportosimportsocketfromcollections.abcimportSequencefrompathlibimportPathfromtypingimportAnyimportparamikofromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.sftp.hooks.sftpimportSFTPHook
[docs]classSFTPOperation:"""Operation that can be used with SFTP."""
[docs]classSFTPOperator(BaseOperator):""" SFTPOperator for transferring files from remote host to local or vice a versa. This operator uses sftp_hook to open sftp transport channel that serve as basis for file transfer. :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>` from airflow Connections. :param sftp_hook: predefined SFTPHook to use Either `sftp_hook` or `ssh_conn_id` needs to be provided. :param remote_host: remote host to connect (templated) Nullable. If provided, it will replace the `remote_host` which was defined in `sftp_hook` or predefined in the connection of `ssh_conn_id`. :param local_filepath: local file path or list of local file paths to get or put. (templated) :param remote_filepath: remote file path or list of remote file paths to get, put, or delete. (templated) :param operation: specify operation 'get', 'put', or 'delete', defaults to put :param confirm: specify if the SFTP operation should be confirmed, defaults to True :param create_intermediate_dirs: create missing intermediate directories when copying from remote to local and vice-versa. Default is False. Example: The following task would copy ``file.txt`` to the remote host at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they don't exist. If the parameter is not passed it would error as the directory does not exist. :: put_file = SFTPOperator( task_id="test_sftp", ssh_conn_id="ssh_default", local_filepath="/tmp/file.txt", remote_filepath="/tmp/tmp1/tmp2/file.txt", operation="put", create_intermediate_dirs=True, dag=dag, ) """
[docs]defexecute(self,context:Any)->str|list[str]|None:ifself.local_filepathisNone:local_filepath_array=[]elifisinstance(self.local_filepath,str):local_filepath_array=[self.local_filepath]else:local_filepath_array=self.local_filepathifisinstance(self.remote_filepath,str):remote_filepath_array=[self.remote_filepath]else:remote_filepath_array=self.remote_filepathifself.operation.lower()in(SFTPOperation.GET,SFTPOperation.PUT)andlen(local_filepath_array)!=len(remote_filepath_array):raiseValueError(f"{len(local_filepath_array)} paths in local_filepath "f"!= {len(remote_filepath_array)} paths in remote_filepath")ifself.operation.lower()==SFTPOperation.DELETEandlocal_filepath_array:raiseValueError("local_filepath should not be provided for delete operation")ifself.operation.lower()notin(SFTPOperation.GET,SFTPOperation.PUT,SFTPOperation.DELETE):raiseTypeError(f"Unsupported operation value {self.operation}, "f"expected {SFTPOperation.GET} or {SFTPOperation.PUT} or {SFTPOperation.DELETE}.")file_msg=Nonetry:ifself.ssh_conn_id:ifself.sftp_hookandisinstance(self.sftp_hook,SFTPHook):self.log.info("ssh_conn_id is ignored when sftp_hook is provided.")else:self.log.info("sftp_hook not provided or invalid. Trying ssh_conn_id to create SFTPHook.")self.sftp_hook=SFTPHook(ssh_conn_id=self.ssh_conn_id)ifnotself.sftp_hook:raiseAirflowException("Cannot operate without sftp_hook or ssh_conn_id.")ifself.remote_hostisnotNone:self.log.info("remote_host is provided explicitly. ""It will replace the remote_host which was defined ""in sftp_hook or predefined in connection of ssh_conn_id.")self.sftp_hook.remote_host=self.remote_hostifself.operation.lower()in(SFTPOperation.GET,SFTPOperation.PUT):for_local_filepath,_remote_filepathinzip(local_filepath_array,remote_filepath_array):ifself.operation.lower()==SFTPOperation.GET:local_folder=os.path.dirname(_local_filepath)ifself.create_intermediate_dirs:Path(local_folder).mkdir(parents=True,exist_ok=True)file_msg=f"from {_remote_filepath} to {_local_filepath}"self.log.info("Starting to transfer %s",file_msg)ifself.sftp_hook.isdir(_remote_filepath):self.sftp_hook.retrieve_directory(_remote_filepath,_local_filepath)else:self.sftp_hook.retrieve_file(_remote_filepath,_local_filepath)elifself.operation.lower()==SFTPOperation.PUT:remote_folder=os.path.dirname(_remote_filepath)ifself.create_intermediate_dirs:self.sftp_hook.create_directory(remote_folder)file_msg=f"from {_local_filepath} to {_remote_filepath}"self.log.info("Starting to transfer file %s",file_msg)ifos.path.isdir(_local_filepath):self.sftp_hook.store_directory(_remote_filepath,_local_filepath,confirm=self.confirm)else:self.sftp_hook.store_file(_remote_filepath,_local_filepath,confirm=self.confirm)elifself.operation.lower()==SFTPOperation.DELETE:for_remote_filepathinremote_filepath_array:file_msg=f"{_remote_filepath}"self.log.info("Starting to delete %s",file_msg)ifself.sftp_hook.isdir(_remote_filepath):self.sftp_hook.delete_directory(_remote_filepath,include_files=True)else:self.sftp_hook.delete_file(_remote_filepath)exceptExceptionase:raiseAirflowException(f"Error while processing {self.operation.upper()} operation {file_msg}, error: {e}")returnself.local_filepath
[docs]defget_openlineage_facets_on_start(self):""" Return OpenLineage datasets. Dataset will have the following structure: input: file://<local_host>/path output: file://<remote_host>:<remote_port>/path. """fromairflow.providers.common.compat.openlineage.facetimportDatasetfromairflow.providers.openlineage.extractorsimportOperatorLineagescheme="file"local_host=socket.gethostname()try:local_host=socket.gethostbyname(local_host)exceptExceptionase:self.log.warning("Failed to resolve local hostname. ""Using the hostname got by socket.gethostbyname() without resolution. %s",e,exc_info=True,)hook=self.sftp_hookorSFTPHook(ssh_conn_id=self.ssh_conn_id)ifself.remote_hostisnotNone:remote_host=self.remote_hostelse:remote_host=hook.get_connection(hook.ssh_conn_id).hosttry:remote_host=socket.gethostbyname(remote_host)exceptOSErrorase:self.log.warning("Failed to resolve remote hostname. Using the provided hostname without resolution. %s",e,exc_info=True,)ifhasattr(hook,"port"):remote_port=hook.port# Since v4.1.0, SFTPOperator accepts both a string (single file) and a list of# strings (multiple files) as local_filepath and remote_filepath, and internally# keeps them as list in both cases. But before 4.1.0, only single string is# allowed. So we consider both cases here for backward compatibility.ifisinstance(self.local_filepath,str):local_filepath=[self.local_filepath]else:local_filepath=self.local_filepathifisinstance(self.remote_filepath,str):remote_filepath=[self.remote_filepath]else:remote_filepath=self.remote_filepathlocal_datasets=[Dataset(namespace=self._get_namespace(scheme,local_host,None,path),name=path)forpathinlocal_filepath]remote_datasets=[Dataset(namespace=self._get_namespace(scheme,remote_host,remote_port,path),name=path)forpathinremote_filepath]ifself.operation.lower()==SFTPOperation.GET:inputs=remote_datasetsoutputs=local_datasetselse:inputs=local_datasetsoutputs=remote_datasetsreturnOperatorLineage(inputs=inputs,outputs=outputs,)