AWS DataSync Operator¶
Overview¶
Two example_dags are provided which showcase the
DataSyncOperator
in action.
example_datasync_1.py
example_datasync_2.py
Both examples use the DataSyncHook
to create a boto3 DataSync client. This hook in turn uses the AwsBaseHook
Note this guide differentiates between an Airflow task (identified by a task_id on Airflow), and an AWS DataSync Task (identified by a TaskArn on AWS).
example_datasync_1.py¶
Purpose¶
With this DAG we show approaches catering for two simple use cases.
1.1 Specify a TaskARN to be executed. 1.2 Find an AWS DataSync TaskArn based on source and destination URIs, and execute it.
Environment variables¶
These examples rely on the following variables, which can be passed via OS environment variables.
TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")
SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")
DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
Get DataSync Tasks¶
The DataSyncOperator
can execute a specific
TaskArn by specifying the task_arn
parameter. This is useful when you know the TaskArn you want to execute.
datasync_task_1 = DataSyncOperator(task_id="datasync_task_1", task_arn=TASK_ARN)
Alternatively, the operator can search in AWS DataSync for a Task based on
source_location_uri
and destination_location_uri
. For example, your
source_location_uri
might point to your on-premises SMB / NFS share, and your
destination_location_uri
might be an S3 bucket.
In AWS, DataSync Tasks are linked to source and destination Locations. A location has a LocationURI and
is referenced by a LocationArn much like other AWS resources.
The DataSyncOperator
can iterate all DataSync Tasks for their source and destination LocationArns. Then it checks
each LocationArn to see if its the URIs match the desired source / destination URI.
To perform a search based on the Location URIs, define the task as follows
datasync_task_2 = DataSyncOperator(
task_id="datasync_task_2",
source_location_uri=SOURCE_LOCATION_URI,
destination_location_uri=DESTINATION_LOCATION_URI,
)
Note: The above configuration assumes there is always exactly one DataSync TaskArn in AWS that matches. It will fail if either there were no matching TaskArns or if there were more than one matching TaskArn defined already in AWS DataSync. You may want to add additional logic to handle other cases - see example_datasync_2 and the Operator behaviour section.
example_datasync_2.py¶
Purpose¶
Show how DataSync Tasks and Locations can be automatically created, deleted and updated using the
DataSyncOperator
.
Find and update a DataSync Task, or create one if it doesn't exist. Update the Task, then execute it. Finally, delete it.
Environment variables¶
This example relies on the following variables, which can be passed via OS environment variables.
SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")
DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
default_create_task_kwargs = '{"Name": "Created by Airflow"}'
CREATE_TASK_KWARGS = json.loads(getenv("CREATE_TASK_KWARGS", default_create_task_kwargs))
default_create_source_location_kwargs = "{}"
CREATE_SOURCE_LOCATION_KWARGS = json.loads(
getenv("CREATE_SOURCE_LOCATION_KWARGS", default_create_source_location_kwargs)
)
bucket_access_role_arn = "arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"
default_destination_location_kwargs = """\
{"S3BucketArn": "arn:aws:s3:::mybucket",
"S3Config": {"BucketAccessRoleArn":
"arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"}
}"""
CREATE_DESTINATION_LOCATION_KWARGS = json.loads(
getenv("CREATE_DESTINATION_LOCATION_KWARGS", re.sub(r"[\s+]", '', default_destination_location_kwargs))
)
default_update_task_kwargs = '{"Name": "Updated by Airflow"}'
UPDATE_TASK_KWARGS = json.loads(getenv("UPDATE_TASK_KWARGS", default_update_task_kwargs))
Get, Create, Update, Run and Delete DataSync Tasks¶
The DataSyncOperator
is used
as before but with some extra arguments.
Most of the arguments (CREATE_*_KWARGS
) provide a way for the operator to automatically create a Task
and/or Locations if no suitable existing Task was found. If these are left to their default value (None)
then no create will be attempted.
datasync_task = DataSyncOperator(
task_id="datasync_task",
source_location_uri=SOURCE_LOCATION_URI,
destination_location_uri=DESTINATION_LOCATION_URI,
create_task_kwargs=CREATE_TASK_KWARGS,
create_source_location_kwargs=CREATE_SOURCE_LOCATION_KWARGS,
create_destination_location_kwargs=CREATE_DESTINATION_LOCATION_KWARGS,
update_task_kwargs=UPDATE_TASK_KWARGS,
delete_task_after_execution=True,
)
Note also the addition of UPDATE_TASK_KWARGS
; if this is not None then it will be used to do an
update of the Task properties on AWS prior to the Task being executed.
Otherwise the behaviour is very similar to the first examples above. We want to identify a suitable TaskArn based on some criteria (specified task_arn or source and dest URIs) and execute it. In this example, the main differences are that we provide a way to create Tasks/Locations if none are found.
Also, because we specified delete_task_after_execution=True
, the TaskArn will be deleted
from AWS DataSync after it completes successfully.
Operator behaviour¶
DataSync Task execution behaviour¶
Once the DataSyncOperator
has identified
the correct TaskArn to run (either because you specified it, or because it was found), it will then be
executed. Whenever an AWS DataSync Task is executed it creates an AWS DataSync TaskExecution, identified
by a TaskExecutionArn.
The TaskExecutionArn will be monitored until completion (success / failure), and its status will be periodically written to the Airflow task log.
After completion, the TaskExecution description is retrieved from AWS and dumped to the Airflow task log for inspection.
Finally, both the TaskArn and the TaskExecutionArn are returned from the execute()
method, and pushed to
an XCom automatically if do_xcom_push=True
.
The DataSyncOperator
supports
optional passing of additional kwargs to the underlying boto3.start_task_execution()
API.
This is done with the task_execution_kwargs
parameter.
This is useful for example to limit bandwidth or filter included files - refer to the boto3 Datasync
documentation for more details.
TaskArn selection behaviour¶
The DataSyncOperator
may find 0, 1, or many AWS DataSync Tasks with a matching source_location_uri
and
destination_location_uri
. The operator must decide what to do in each of these scenarios.
To override the default behaviour, simply create an operator which inherits
DataSyncOperator
and re-implement the choose_task
and choose_location
methods
to suit your use case.
Scenarios and behaviours:
No suitable AWS DataSync Tasks found
If there were 0 suitable AWS DataSync Tasks found, the operator will try to create one. This operator will use existing Locations if any are found which match the source or destination location uri that were specified. Or, if either location has no matching LocationArn in AWS then the operator will attempt to create new Location/s if suitable kwargs were provided to do so.
1 AWS DataSync Task found
This is the simplest scenario - just use the one DataSync Task that was found :).
More than one AWS DataSync Tasks found
The operator will raise an Exception. To avoid this, you can set allow_random_task_choice=True
to randomly choose from candidate Tasks. Alternatively you can subclass this operator
and re-implement the choose_task
method with your own algorithm.
TaskArn creation behaviour¶
When creating a Task, the
DataSyncOperator
will try to find
and use existing LocationArns rather than creating new ones. If multiple LocationArns match the
specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly
to how it chooses a single Task from many Tasks:
The operator will raise an Exception. To avoid this, you can set allow_random_location_choice=True
to randomly choose from candidate Locations. Alternatively you can subclass this operator
and re-implement the choose_location
method with your own algorithm.