Source code for airflow.providers.amazon.aws.transfers.dynamodb_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains operators to replicate records fromDynamoDB table to S3."""from__future__importannotationsimportjsonfromcopyimportcopyfromdecimalimportDecimalfromos.pathimportgetsizefromtempfileimportNamedTemporaryFilefromtypingimportIO,TYPE_CHECKING,Any,Callable,Sequencefromuuidimportuuid4fromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookfromairflow.providers.amazon.aws.hooks.dynamodbimportDynamoDBHookfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.transfers.baseimportAwsToAwsBaseOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]defdefault(self,obj):"""Convert decimal objects in a json serializable format."""ifisinstance(obj,Decimal):returnfloat(obj)returnsuper().default(obj)
[docs]classDynamoDBToS3Operator(AwsToAwsBaseOperator):""" Replicates records from a DynamoDB table to S3. It scans a DynamoDB table and writes the received records to a file on the local filesystem. It flushes the file to S3 once the file size exceeds the file size limit specified by the user. Users can also specify a filtering criteria using dynamodb_scan_kwargs to only replicate records that satisfy the criteria. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/transfer:DynamoDBToS3Operator` :param dynamodb_table_name: Dynamodb table to replicate data from :param s3_bucket_name: S3 bucket to replicate data to :param file_size: Flush file to s3 if file size >= file_size :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> :param s3_key_prefix: Prefix of s3 object key :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json """
def_scan_dynamodb_and_upload_to_s3(self,temp_file:IO,scan_kwargs:dict,table:Any)->IO:whileTrue:response=table.scan(**scan_kwargs)items=response["Items"]foriteminitems:temp_file.write(self.process_func(item))if"LastEvaluatedKey"notinresponse:# no more items to scanbreaklast_evaluated_key=response["LastEvaluatedKey"]scan_kwargs["ExclusiveStartKey"]=last_evaluated_key# Upload the file to S3 if reach file size limitifgetsize(temp_file.name)>=self.file_size:_upload_file_to_s3(temp_file,self.s3_bucket_name,self.s3_key_prefix,self.dest_aws_conn_id)temp_file.close()temp_file=NamedTemporaryFile()returntemp_file