airflow.providers.amazon.aws.transfers.dynamodb_to_s3

This module contains operators to replicate records from DynamoDB table to S3.

Module Contents

Classes

DynamoDBToS3Operator

Replicates records from a DynamoDB table to S3.

class airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator(*, dynamodb_table_name: str, s3_bucket_name: str, file_size: int, dynamodb_scan_kwargs: Optional[Dict[str, Any]] = None, s3_key_prefix: str = '', process_func: Callable[[Dict[str, Any]], bytes] = _convert_item_to_json_bytes, aws_conn_id: str = 'aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

Replicates records from a DynamoDB table to S3. It scans a DynamoDB table and write the received records to a file on the local filesystem. It flushes the file to S3 once the file size exceeds the file size limit specified by the user.

Users can also specify a filtering criteria using dynamodb_scan_kwargs to only replicate records that satisfy the criteria.

To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator. For instance to replicate with parallelism of 2, create two tasks like:

op1 = DynamoDBToS3Operator(
    task_id="replicator-1",
    dynamodb_table_name="hello",
    dynamodb_scan_kwargs={
        "TotalSegments": 2,
        "Segment": 0,
    },
    ...,
)

op2 = DynamoDBToS3Operator(
    task_id="replicator-2",
    dynamodb_table_name="hello",
    dynamodb_scan_kwargs={
        "TotalSegments": 2,
        "Segment": 1,
    },
    ...,
)
Parameters
  • dynamodb_table_name (str) -- Dynamodb table to replicate data from

  • s3_bucket_name (str) -- S3 bucket to replicate data to

  • file_size (int) -- Flush file to s3 if file size >= file_size

  • dynamodb_scan_kwargs (Optional[Dict[str, Any]]) -- kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> # noqa: E501

  • s3_key_prefix (Optional[str]) -- Prefix of s3 object key

  • process_func (Callable[[Dict[str, Any]], bytes]) -- How we transforms a dynamodb item to bytes. By default we dump the json

  • aws_conn_id (str) -- The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

execute(self, context: airflow.utils.context.Context) None[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?