airflow.contrib.operators.dynamodb_to_s3

This module contains operators to replicate records from DynamoDB table to S3.

Module Contents

airflow.contrib.operators.dynamodb_to_s3._convert_item_to_json_bytes(item)[source]
airflow.contrib.operators.dynamodb_to_s3._upload_file_to_s3(file_obj, bucket_name, s3_key_prefix)[source]
class airflow.contrib.operators.dynamodb_to_s3.DynamoDBToS3Operator(dynamodb_table_name, s3_bucket_name, file_size, dynamodb_scan_kwargs=None, s3_key_prefix='', process_func=_convert_item_to_json_bytes, *args, **kwargs)[source]

Bases: airflow.models.baseoperator.BaseOperator

Replicates records from a DynamoDB table to S3. It scans a DynamoDB table and write the received records to a file on the local filesystem. It flushes the file to S3 once the file size exceeds the file size limit specified by the user.

Users can also specify a filtering criteria using dynamodb_scan_kwargs to only replicate records that satisfy the criteria.

To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator. For instance to replicate with parallelism of 2, create two tasks like:

op1 = DynamoDBToS3Operator(
    task_id='replicator-1',
    dynamodb_table_name='hello',
    dynamodb_scan_kwargs={
        'TotalSegments': 2,
        'Segment': 0,
    },
    ...
)

op2 = DynamoDBToS3Operator(
    task_id='replicator-2',
    dynamodb_table_name='hello',
    dynamodb_scan_kwargs={
        'TotalSegments': 2,
        'Segment': 1,
    },
    ...
)
Parameters
execute(self, context)[source]
_scan_dynamodb_and_upload_to_s3(self, temp_file, scan_kwargs, table)[source]

Was this entry helpful?