Source code for airflow.providers.amazon.aws.transfers.dynamodb_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonfromcopyimportcopyfromdatetimeimportdatetimefromdecimalimportDecimalfromfunctoolsimportcached_propertyfromos.pathimportgetsizefromtempfileimportNamedTemporaryFilefromtypingimportIO,TYPE_CHECKING,Any,Callable,Sequencefromuuidimportuuid4fromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookfromairflow.providers.amazon.aws.hooks.dynamodbimportDynamoDBHookfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.transfers.baseimportAwsToAwsBaseOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]defdefault(self,obj):"""Convert decimal objects in a json serializable format."""ifisinstance(obj,Decimal):returnfloat(obj)returnsuper().default(obj)
[docs]classDynamoDBToS3Operator(AwsToAwsBaseOperator):""" Replicates records from a DynamoDB table to S3. It scans a DynamoDB table and writes the received records to a file on the local filesystem. It flushes the file to S3 once the file size exceeds the file size limit specified by the user. Users can also specify a filtering criteria using dynamodb_scan_kwargs to only replicate records that satisfy the criteria. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/transfer:DynamoDBToS3Operator` :param dynamodb_table_name: Dynamodb table to replicate data from :param s3_bucket_name: S3 bucket to replicate data to :param file_size: Flush file to s3 if file size >= file_size :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> :param s3_key_prefix: Prefix of s3 object key :param process_func: How we transform a dynamodb item to bytes. By default, we dump the json :param export_time: Time in the past from which to export table data, counted in seconds from the start of the Unix epoch. The table export will be a snapshot of the table's state at this point in time. :param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON or ION. """
def_export_table_to_point_in_time(self):""" Export data from start of epoc till `export_time`. Table export will be a snapshot of the table's state at this point in time. """ifself.export_timeandself.export_time>datetime.now(self.export_time.tzinfo):raiseValueError("The export_time parameter cannot be a future time.")client=self.hook.conn.meta.clienttable_description=client.describe_table(TableName=self.dynamodb_table_name)response=client.export_table_to_point_in_time(TableArn=table_description.get("Table",{}).get("TableArn"),ExportTime=self.export_time,S3Bucket=self.s3_bucket_name,S3Prefix=self.s3_key_prefix,ExportFormat=self.export_format,)waiter=self.hook.get_waiter("export_table")export_arn=response.get("ExportDescription",{}).get("ExportArn")waiter.wait(ExportArn=export_arn)def_export_entire_data(self):"""Export all data from the table."""table=self.hook.conn.Table(self.dynamodb_table_name)scan_kwargs=copy(self.dynamodb_scan_kwargs)ifself.dynamodb_scan_kwargselse{}err=Nonef:IO[Any]withNamedTemporaryFile()asf:try:f=self._scan_dynamodb_and_upload_to_s3(f,scan_kwargs,table)exceptExceptionase:err=eraiseefinally:iferrisNone:_upload_file_to_s3(f,self.s3_bucket_name,self.s3_key_prefix,self.dest_aws_conn_id)def_scan_dynamodb_and_upload_to_s3(self,temp_file:IO,scan_kwargs:dict,table:Any)->IO:whileTrue:response=table.scan(**scan_kwargs)items=response["Items"]foriteminitems:temp_file.write(self.process_func(item))if"LastEvaluatedKey"notinresponse:# no more items to scanbreaklast_evaluated_key=response["LastEvaluatedKey"]scan_kwargs["ExclusiveStartKey"]=last_evaluated_key# Upload the file to S3 if reach file size limitifgetsize(temp_file.name)>=self.file_size:_upload_file_to_s3(temp_file,self.s3_bucket_name,self.s3_key_prefix,self.dest_aws_conn_id)temp_file.close()temp_file=NamedTemporaryFile()returntemp_file