AWS DataSync Operator

Overview

Two example_dags are provided which showcase the AWSDataSyncOperator in action.

  • example_datasync_1.py

  • example_datasync_2.py

Both examples use the AWSDataSyncHook to create a boto3 DataSync client. This hook in turn uses the AwsBaseHook

Note this guide differentiates between an Airflow task (identified by a task_id on Airflow), and an AWS DataSync Task (identified by a TaskArn on AWS).

example_datasync_1.py

Purpose

With this DAG we show approaches catering for two simple use cases.

1.1 Specify a TaskARN to be executed. 1.2 Find an AWS DataSync TaskArn based on source and destination URIs, and execute it.

Environment variables

These examples rely on the following variables, which can be passed via OS environment variables.

airflow/providers/amazon/aws/example_dags/example_datasync_1.pyView Source

TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")

airflow/providers/amazon/aws/example_dags/example_datasync_1.pyView Source

SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")

DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")

Get DataSync Tasks

The AWSDataSyncOperator can execute a specific TaskArn by specifying the task_arn parameter. This is useful when you know the TaskArn you want to execute.

airflow/providers/amazon/aws/example_dags/example_datasync_1.pyView Source

    datasync_task_1 = AWSDataSyncOperator(
        aws_conn_id="aws_default", task_id="datasync_task_1", task_arn=TASK_ARN
    )

Alternatively, the operator can search in AWS DataSync for a Task based on source_location_uri and destination_location_uri. For example, your source_location_uri might point to your on-premises SMB / NFS share, and your destination_location_uri might be an S3 bucket.

In AWS, DataSync Tasks are linked to source and destination Locations. A location has a LocationURI and is referenced by a LocationArn much like other AWS resources. The AWSDataSyncOperator can iterate all DataSync Tasks for their source and destination LocationArns. Then it checks each LocationArn to see if its the URIs match the desired source / destination URI.

To perform a search based on the Location URIs, define the task as follows

airflow/providers/amazon/aws/example_dags/example_datasync_1.pyView Source

    datasync_task_2 = AWSDataSyncOperator(
        aws_conn_id="aws_default",
        task_id="datasync_task_2",
        source_location_uri=SOURCE_LOCATION_URI,
        destination_location_uri=DESTINATION_LOCATION_URI,
    )

Note: The above configuration assumes there is always exactly one DataSync TaskArn in AWS that matches. It will fail if either there were no matching TaskArns or if there were more than one matching TaskArn defined already in AWS DataSync. You may want to add additional logic to handle other cases - see example_datasync_2 and the Operator behaviour section.

example_datasync_2.py

Purpose

Show how DataSync Tasks and Locations can be automatically created, deleted and updated using the AWSDataSyncOperator.

Find and update a DataSync Task, or create one if it doesn’t exist. Update the Task, then execute it. Finally, delete it.

Environment variables

This example relies on the following variables, which can be passed via OS environment variables.

airflow/providers/amazon/aws/example_dags/example_datasync_2.pyView Source

SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")

DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")

default_create_task_kwargs = '{"Name": "Created by Airflow"}'
CREATE_TASK_KWARGS = json.loads(getenv("CREATE_TASK_KWARGS", default_create_task_kwargs))

default_create_source_location_kwargs = "{}"
CREATE_SOURCE_LOCATION_KWARGS = json.loads(
    getenv("CREATE_SOURCE_LOCATION_KWARGS", default_create_source_location_kwargs)
)

bucket_access_role_arn = "arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"
default_destination_location_kwargs = """\
{"S3BucketArn": "arn:aws:s3:::mybucket",
    "S3Config": {"BucketAccessRoleArn":
    "arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"}
}"""
CREATE_DESTINATION_LOCATION_KWARGS = json.loads(
    getenv("CREATE_DESTINATION_LOCATION_KWARGS", re.sub(r"[\s+]", '', default_destination_location_kwargs))
)

default_update_task_kwargs = '{"Name": "Updated by Airflow"}'
UPDATE_TASK_KWARGS = json.loads(getenv("UPDATE_TASK_KWARGS", default_update_task_kwargs))

Get, Create, Update, Run and Delete DataSync Tasks

The AWSDataSyncOperator is used as before but with some extra arguments.

Most of the arguments (CREATE_*_KWARGS) provide a way for the operator to automatically create a Task and/or Locations if no suitable existing Task was found. If these are left to their default value (None) then no create will be attempted.

airflow/providers/amazon/aws/example_dags/example_datasync_2.pyView Source

    datasync_task = AWSDataSyncOperator(
        aws_conn_id="aws_default",
        task_id="datasync_task",
        source_location_uri=SOURCE_LOCATION_URI,
        destination_location_uri=DESTINATION_LOCATION_URI,
        create_task_kwargs=CREATE_TASK_KWARGS,
        create_source_location_kwargs=CREATE_SOURCE_LOCATION_KWARGS,
        create_destination_location_kwargs=CREATE_DESTINATION_LOCATION_KWARGS,
        update_task_kwargs=UPDATE_TASK_KWARGS,
        delete_task_after_execution=True,
    )

Note also the addition of UPDATE_TASK_KWARGS; if this is not None then it will be used to do an update of the Task properties on AWS prior to the Task being executed.

Otherwise the behaviour is very similar to the first examples above. We want to identify a suitable TaskArn based on some criteria (specified task_arn or source and dest URIs) and execute it. In this example, the main differences are that we provide a way to create Tasks/Locations if none are found.

Also, because we specified delete_task_after_execution=True, the TaskArn will be deleted from AWS DataSync after it completes successfully.

Operator behaviour

DataSync Task execution behaviour

Once the AWSDataSyncOperator has identified the correct TaskArn to run (either because you specified it, or because it was found), it will then be executed. Whenever an AWS DataSync Task is executed it creates an AWS DataSync TaskExecution, identified by a TaskExecutionArn.

The TaskExecutionArn will be monitored until completion (success / failure), and its status will be periodically written to the Airflow task log.

After completion, the TaskExecution description is retrieved from AWS and dumped to the Airflow task log for inspection.

Finally, both the TaskArn and the TaskExecutionArn are returned from the execute() method, and pushed to an XCom automatically if do_xcom_push=True.

The AWSDataSyncOperator supports optional passing of additional kwargs to the underlying boto3.start_task_execution() API. This is done with the task_execution_kwargs parameter. This is useful for example to limit bandwidth or filter included files - refer to the boto3 Datasync documentation for more details.

TaskArn selection behaviour

The AWSDataSyncOperator may find 0, 1, or many AWS DataSync Tasks with a matching source_location_uri and destination_location_uri. The operator must decide what to do in each of these scenarios.

To override the default behaviour, simply create an operator which inherits AWSDataSyncOperator and re-implement the choose_task and choose_location methods to suit your use case.

Scenarios and behaviours:

  • No suitable AWS DataSync Tasks found

If there were 0 suitable AWS DataSync Tasks found, the operator will try to create one. This operator will use existing Locations if any are found which match the source or destination location uri that were specified. Or, if either location has no matching LocationArn in AWS then the operator will attempt to create new Location/s if suitable kwargs were provided to do so.

  • 1 AWS DataSync Task found

This is the simplest scenario - just use the one DataSync Task that was found :).

  • More than one AWS DataSync Tasks found

The operator will raise an Exception. To avoid this, you can set allow_random_task_choice=True to randomly choose from candidate Tasks. Alternatively you can subclass this operator and re-implement the choose_task method with your own algorithm.

TaskArn creation behaviour

When creating a Task, the AWSDataSyncOperator will try to find and use existing LocationArns rather than creating new ones. If multiple LocationArns match the specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly to how it chooses a single Task from many Tasks:

The operator will raise an Exception. To avoid this, you can set allow_random_location_choice=True to randomly choose from candidate Locations. Alternatively you can subclass this operator and re-implement the choose_location method with your own algorithm.

Reference

For further information, look at:

Was this entry helpful?