Source code for airflow.providers.amazon.aws.transfers.dynamodb_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#"""This module contains operators to replicate records fromDynamoDB table to S3."""importjsonfromcopyimportcopyfromos.pathimportgetsizefromtempfileimportNamedTemporaryFilefromtypingimportIO,TYPE_CHECKING,Any,Callable,Dict,Optional,Sequencefromuuidimportuuid4fromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.dynamodbimportDynamoDBHookfromairflow.providers.amazon.aws.hooks.s3importS3HookifTYPE_CHECKING:fromairflow.utils.contextimportContextdef_convert_item_to_json_bytes(item:Dict[str,Any])->bytes:return(json.dumps(item)+'\n').encode('utf-8')def_upload_file_to_s3(file_obj:IO,bucket_name:str,s3_key_prefix:str,aws_conn_id:str='aws_default')->None:s3_client=S3Hook(aws_conn_id=aws_conn_id).get_conn()file_obj.seek(0)s3_client.upload_file(Filename=file_obj.name,Bucket=bucket_name,Key=s3_key_prefix+str(uuid4()),)
[docs]classDynamoDBToS3Operator(BaseOperator):""" Replicates records from a DynamoDB table to S3. It scans a DynamoDB table and writes the received records to a file on the local filesystem. It flushes the file to S3 once the file size exceeds the file size limit specified by the user. Users can also specify a filtering criteria using dynamodb_scan_kwargs to only replicate records that satisfy the criteria. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/transfer:DynamoDBToS3Operator` :param dynamodb_table_name: Dynamodb table to replicate data from :param s3_bucket_name: S3 bucket to replicate data to :param file_size: Flush file to s3 if file size >= file_size :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> # noqa: E501 :param s3_key_prefix: Prefix of s3 object key :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
def_scan_dynamodb_and_upload_to_s3(self,temp_file:IO,scan_kwargs:dict,table:Any)->IO:whileTrue:response=table.scan(**scan_kwargs)items=response['Items']foriteminitems:temp_file.write(self.process_func(item))if'LastEvaluatedKey'notinresponse:# no more items to scanbreaklast_evaluated_key=response['LastEvaluatedKey']scan_kwargs['ExclusiveStartKey']=last_evaluated_key# Upload the file to S3 if reach file size limitifgetsize(temp_file.name)>=self.file_size:_upload_file_to_s3(temp_file,self.s3_bucket_name,self.s3_key_prefix,self.aws_conn_id)temp_file.close()temp_file=NamedTemporaryFile()returntemp_file