AWS DataSync

AWS DataSync is a data-transfer service that simplifies, automates, and accelerates moving and replicating data between on-premises storage systems and AWS storage services over the internet or AWS Direct Connect.

Prerequisite Tasks

To use these operators, you must do a few things:

Operators

Interact with AWS DataSync Tasks

You can use DataSyncOperator to find, create, update, execute and delete AWS DataSync tasks.

Once the DataSyncOperator has identified the correct TaskArn to run (either because you specified it, or because it was found), it will then be executed. Whenever an AWS DataSync Task is executed it creates an AWS DataSync TaskExecution, identified by a TaskExecutionArn.

The TaskExecutionArn will be monitored until completion (success / failure), and its status will be periodically written to the Airflow task log.

The DataSyncOperator supports optional passing of additional kwargs to the underlying boto3.start_task_execution() API. This is done with the task_execution_kwargs parameter. This is useful for example to limit bandwidth or filter included files, see the boto3 Datasync documentation for more details.

Execute a task

To execute a specific task, you can pass the task_arn to the operator.

tests/system/providers/amazon/aws/example_datasync.py[source]

# Execute a specific task
execute_task_by_arn = DataSyncOperator(
    task_id="execute_task_by_arn",
    task_arn=created_task_arn,
)

Search and execute a task

To search for a task, you can specify the source_location_uri and destination_location_uri to the operator. If one task is found, this one will be executed. If more than one task is found, the operator will raise an Exception. To avoid this, you can set allow_random_task_choice to True to randomly choose from candidate tasks.

tests/system/providers/amazon/aws/example_datasync.py[source]

# Search and execute a task
execute_task_by_locations = DataSyncOperator(
    task_id="execute_task_by_locations",
    source_location_uri=f"s3://{s3_bucket_source}/test",
    destination_location_uri=f"s3://{s3_bucket_destination}/test",
    # Only transfer files from /test/subdir folder
    task_execution_kwargs={
        "Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
    },
)

Create and execute a task

When searching for a task, if no task is found you have the option to create one before executing it. In order to do that, you need to provide the extra parameters create_task_kwargs, create_source_location_kwargs and create_destination_location_kwargs.

These extra parameters provide a way for the operator to automatically create a Task and/or Locations if no suitable existing Task was found. If these are left to their default value (None) then no create will be attempted.

Also, because delete_task_after_execution is set to True, the task will be deleted from AWS DataSync after it completes successfully.

tests/system/providers/amazon/aws/example_datasync.py[source]

# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
    task_id="create_and_execute_task",
    source_location_uri=f"s3://{s3_bucket_source}/test_create",
    destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
    create_task_kwargs={"Name": "Created by Airflow"},
    create_source_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    create_destination_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    delete_task_after_execution=False,
)

When creating a Task, the DataSyncOperator will try to find and use existing LocationArns rather than creating new ones. If multiple LocationArns match the specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly to how it chooses a single Task from many Tasks:

The operator will raise an Exception. To avoid this, you can set allow_random_location_choice to True to randomly choose from candidate Locations.

Was this entry helpful?