Amazon S3 to DynamoDB¶
Use the S3ToDynamoDBOperator
transfer to load data stored in Amazon Simple Storage Service (S3) bucket
to an existing or new Amazon DynamoDB table.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Apache Airflow®
Operators¶
Amazon S3 To DynamoDB transfer operator¶
This operator loads data from Amazon S3 to an Amazon DynamoDB table. It uses the Amazon DynamoDB ImportTable Services that interacts with different AWS Services such Amazon S3 and CloudWatch. The default behavior is to load S3 data into a new Amazon DynamoDB table. The import into an existing table is currently not supported by the Service. Thus, the operator uses a custom approach. It creates a temporary DynamoDB table and loads S3 data into the table. Then it scans the temporary Amazon DynamoDB table and writes the received records to the target table.
To get more information visit:
S3ToDynamoDBOperator
Example usage:
transfer_1 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=new_table_name,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)
To load S3 data into an existing DynamoDB table use:
transfer_2 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb_new_table",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=existing_table_name,
use_existing_table=True,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)