from tempfile import NamedTemporaryFile
import subprocess

from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

[docs]class S3FileTransformOperator(BaseOperator): """ Copies data from a source S3 location to a temporary location on the local filesystem. Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location. The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. The transformation script is expected to read the data from source , transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3. :param source_s3_key: The key to be retrieved from S3 :type source_s3_key: str :param source_aws_conn_id: source s3 connection :type source_aws_conn_id: str :param dest_s3_key: The key to be written from S3 :type dest_s3_key: str :param dest_aws_conn_id: destination s3 connection :type dest_aws_conn_id: str :param replace: Replace dest S3 key if it already exists :type replace: bool :param transform_script: location of the executable transformation script :type transform_script: str """ template_fields = ('source_s3_key', 'dest_s3_key') template_ext = () ui_color = '#f9c915' @apply_defaults def __init__( self, source_s3_key, dest_s3_key, transform_script, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, *args, **kwargs): super(S3FileTransformOperator, self).__init__(*args, **kwargs) self.source_s3_key = source_s3_key self.source_aws_conn_id = source_aws_conn_id self.dest_s3_key = dest_s3_key self.dest_aws_conn_id = dest_aws_conn_id self.replace = replace self.transform_script = transform_script def execute(self, context): source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id) dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id)"Downloading source S3 file %s", self.source_s3_key) if not source_s3.check_for_key(self.source_s3_key): raise AirflowException("The source key {0} does not exist".format(self.source_s3_key)) source_s3_key_object = source_s3.get_key(self.source_s3_key) with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest: "Dumping S3 file %s contents to local file %s", self.source_s3_key, ) source_s3_key_object.get_contents_to_file(f_source) f_source.flush() source_s3.connection.close() transform_script_process = subprocess.Popen( [self.transform_script,,], stdout=subprocess.PIPE, stderr=subprocess.PIPE) (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()"Transform script stdout %s", transform_script_stdoutdata) if transform_script_process.returncode > 0: raise AirflowException("Transform script failed %s", transform_script_stderrdata) else: "Transform script successful. Output temporarily located at %s", )"Uploading transformed file to S3") f_dest.flush() dest_s3.load_file(, key=self.dest_s3_key, replace=self.replace )"Upload successful") dest_s3.connection.close()