Source code for airflow.providers.amazon.aws.transfers.ftp_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportSequencefromtempfileimportNamedTemporaryFilefromtypingimportTYPE_CHECKINGfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.ftp.hooks.ftpimportFTPHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classFTPToS3Operator(BaseOperator):""" Transfer of one or more files from an FTP server to S3. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:FTPToS3Operator` :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well. For multiple files, it is the route where the files will be found. :param s3_bucket: The targeted s3 bucket in which to upload the file(s). :param s3_key: The targeted s3 key. For one file it must include the file path. For several, it must end with "/". :param ftp_filenames: Only used if you want to move multiple files. You can pass a list with exact filenames present in the ftp path, or a prefix that all files must meet. It can also be the string '*' for moving all the files within the ftp path. :param s3_filenames: Only used if you want to move multiple files and name them different from the originals from the ftp. It can be a list of filenames or file prefix (that will replace the ftp prefix). :param ftp_conn_id: The ftp connection id. The name or identifier for establishing a connection to the FTP server. :param aws_conn_id: The s3 connection id. The name or identifier for establishing a connection to S3. :param replace: A flag to decide whether or not to overwrite the key if it already exists. If replace is False and the key exists, an error will be raised. :param encrypt: If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3. :param gzip: If True, the file will be compressed locally :param acl_policy: String specifying the canned ACL policy for the file being uploaded to the S3 bucket. """
def__upload_to_s3_from_ftp(self,remote_filename,s3_file_key):withNamedTemporaryFile()aslocal_tmp_file:self.ftp_hook.retrieve_file(remote_full_path=remote_filename,local_full_path_or_buffer=local_tmp_file.name)self.s3_hook.load_file(filename=local_tmp_file.name,key=s3_file_key,bucket_name=self.s3_bucket,replace=self.replace,encrypt=self.encrypt,gzip=self.gzip,acl_policy=self.acl_policy,)self.log.info("File upload to %s",s3_file_key)
[docs]defexecute(self,context:Context):self.ftp_hook=FTPHook(ftp_conn_id=self.ftp_conn_id)self.s3_hook=S3Hook(self.aws_conn_id)ifself.ftp_filenames:ifisinstance(self.ftp_filenames,str):self.log.info("Getting files in %s",self.ftp_path)list_dir=self.ftp_hook.list_directory(path=self.ftp_path,)ifself.ftp_filenames=="*":files=list_direlse:ftp_filename:str=self.ftp_filenamesfiles=[fforfinlist_dirifftp_filenameinf]forfileinfiles:self.log.info("Moving file %s",file)ifself.s3_filenamesandisinstance(self.s3_filenames,str):filename=file.replace(self.ftp_filenames,self.s3_filenames)else:filename=files3_file_key=f"{self.s3_key}{filename}"self.__upload_to_s3_from_ftp(file,s3_file_key)else:ifself.s3_filenames:forftp_file,s3_fileinzip(self.ftp_filenames,self.s3_filenames):self.__upload_to_s3_from_ftp(self.ftp_path+ftp_file,self.s3_key+s3_file)else:forftp_fileinself.ftp_filenames:self.__upload_to_s3_from_ftp(self.ftp_path+ftp_file,self.s3_key+ftp_file)else:self.__upload_to_s3_from_ftp(self.ftp_path,self.s3_key)