airflow.providers.amazon.aws.operators.datasync

Create, get, update, execute and delete an AWS DataSync Task.

Module Contents

Classes

DataSyncOperator

Find, Create, Update, Execute and Delete AWS DataSync Tasks.

class airflow.providers.amazon.aws.operators.datasync.DataSyncOperator(*, aws_conn_id='aws_default', wait_interval_seconds=30, max_iterations=60, wait_for_completion=True, task_arn=None, source_location_uri=None, destination_location_uri=None, allow_random_task_choice=False, allow_random_location_choice=False, create_task_kwargs=None, create_source_location_kwargs=None, create_destination_location_kwargs=None, update_task_kwargs=None, task_execution_kwargs=None, delete_task_after_execution=False, **kwargs)[source]

Bases: airflow.models.BaseOperator

Find, Create, Update, Execute and Delete AWS DataSync Tasks.

If do_xcom_push is True, then the DataSync TaskArn and TaskExecutionArn which were executed will be pushed to an XCom.

See also

For more information on how to use this operator, take a look at the guide: Interact with AWS DataSync Tasks

Note

There may be 0, 1, or many existing DataSync Tasks defined in your AWS environment. The default behavior is to create a new Task if there are 0, or execute the Task if there was 1 Task, or fail if there were many Tasks.

Parameters
  • aws_conn_id (str) – AWS connection to use.

  • wait_interval_seconds (int) – Time to wait between two consecutive calls to check TaskExecution status.

  • max_iterations (int) – Maximum number of consecutive calls to check TaskExecution status.

  • wait_for_completion (bool) – If True, wait for the task execution to reach a final state

  • task_arn (str | None) – AWS DataSync TaskArn to use. If None, then this operator will attempt to either search for an existing Task or attempt to create a new Task.

  • source_location_uri (str | None) – Source location URI to search for. All DataSync Tasks with a LocationArn with this URI will be considered. Example: smb://server/subdir

  • destination_location_uri (str | None) – Destination location URI to search for. All DataSync Tasks with a LocationArn with this URI will be considered. Example: s3://airflow_bucket/stuff

  • allow_random_task_choice (bool) – If multiple Tasks match, one must be chosen to execute. If allow_random_task_choice is True then a random one is chosen.

  • allow_random_location_choice (bool) – If multiple Locations match, one must be chosen when creating a task. If allow_random_location_choice is True then a random one is chosen.

  • create_task_kwargs (dict | None) – If no suitable TaskArn is identified, it will be created if create_task_kwargs is defined. create_task_kwargs is then used internally like this: boto3.create_task(**create_task_kwargs) Example: {'Name': 'xyz', 'Options': ..., 'Excludes': ..., 'Tags': ...}

  • create_source_location_kwargs (dict | None) – If no suitable LocationArn is found, a Location will be created if create_source_location_kwargs is defined. create_source_location_kwargs is then used internally like this: boto3.create_location_xyz(**create_source_location_kwargs) The xyz is determined from the prefix of source_location_uri, eg smb:/... or s3:/... Example: {'Subdirectory': ..., 'ServerHostname': ..., ...}

  • create_destination_location_kwargs (dict | None) – If no suitable LocationArn is found, a Location will be created if create_destination_location_kwargs is defined. create_destination_location_kwargs is used internally like this: boto3.create_location_xyz(**create_destination_location_kwargs) The xyz is determined from the prefix of destination_location_uri, eg smb:/...` or ``s3:/... Example: {'S3BucketArn': ..., 'S3Config': {'BucketAccessRoleArn': ...}, ...}

  • update_task_kwargs (dict | None) – If a suitable TaskArn is found or created, it will be updated if update_task_kwargs is defined. update_task_kwargs is used internally like this: boto3.update_task(TaskArn=task_arn, **update_task_kwargs) Example: {'Name': 'xyz', 'Options': ..., 'Excludes': ...}

  • task_execution_kwargs (dict | None) – Additional kwargs passed directly when starting the Task execution, used internally like this: boto3.start_task_execution(TaskArn=task_arn, **task_execution_kwargs)

  • delete_task_after_execution (bool) – If True then the TaskArn which was executed will be deleted from AWS DataSync on successful completion.

Raises
  • AirflowException – If task_arn was not specified, or if either source_location_uri or destination_location_uri were not specified.

  • AirflowException – If source or destination Location were not found and could not be created.

  • AirflowException – If choose_task or choose_location fails.

  • AirflowException – If Task creation, update, execution or delete fails.

template_fields :Sequence[str] = ['task_arn', 'source_location_uri', 'destination_location_uri', 'create_task_kwargs',...[source]
template_fields_renderers[source]
ui_color = #44b5e2[source]
get_hook()[source]

Create and return DataSyncHook.

Return DataSyncHook

An DataSyncHook instance.

Return type

airflow.providers.amazon.aws.hooks.datasync.DataSyncHook

execute(context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

choose_task(task_arn_list)[source]

Select 1 DataSync TaskArn from a list

choose_location(location_arn_list)[source]

Select 1 DataSync LocationArn from a list

on_kill()[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

Was this entry helpful?