Amazon DynamoDB to Amazon S3¶
Use the DynamoDBToS3Operator
transfer to copy the content of an existing Amazon DynamoDB table
to an existing Amazon Simple Storage Service (S3) bucket.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Operators¶
Amazon DynamoDB To Amazon S3 transfer operator¶
This operator replicates records from an Amazon DynamoDB table to a file in an Amazon S3 bucket. It scans an Amazon DynamoDB table and writes the received records to a file on the local filesystem. It flushes the file to Amazon S3 once the file size exceeds the file size limit specified by the user.
Users can also specify a filtering criteria using dynamodb_scan_kwargs
to only replicate
records that satisfy the criteria.
To get more information visit:
DynamoDBToS3Operator
Example usage:
tests/system/amazon/aws/example_dynamodb_to_s3.py
backup_db = DynamoDBToS3Operator(
task_id="backup_db",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=20,
)
To parallelize the replication, users can create multiple DynamoDBToS3Operator
tasks using the
TotalSegments
parameter. For instance to replicate with parallelism of 2, create two tasks:
tests/system/amazon/aws/example_dynamodb_to_s3.py
# Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks.
backup_db_segment_1 = DynamoDBToS3Operator(
task_id="backup_db_segment_1",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
s3_key_prefix=f"{S3_KEY_PREFIX}-1-",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 0,
},
)
backup_db_segment_2 = DynamoDBToS3Operator(
task_id="backup_db_segment_2",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
s3_key_prefix=f"{S3_KEY_PREFIX}-2-",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 1,
},
)
Users can also pass in point_in_time_export
boolean param to DynamoDBToS3Operator
to recover data from a point in time.
Full export example usage:
tests/system/amazon/aws/example_dynamodb_to_s3.py
backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_full_export",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
point_in_time_export=True,
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
Incremental export example usage:
tests/system/amazon/aws/example_dynamodb_to_s3.py
backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_incremental_export",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
point_in_time_export=True,
s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
export_table_to_point_in_time_kwargs={
"ExportType": "INCREMENTAL_EXPORT",
"IncrementalExportSpecification": {
"ExportFromTime": start_time,
"ExportToTime": end_time,
"ExportViewType": "NEW_AND_OLD_IMAGES",
},
},
)