AWS Database Migration Service Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Overview¶
Airflow to AWS Database Migration Service (DMS) integration provides several operators to create and interact with DMS replication tasks.
One example_dag is provided which showcases some of these operators in action.
example_dms_full_load_task.py
Create replication task, wait for it completion and delete it.¶
Purpose¶
This example dag example_dms_full_load_task.py
uses DmsCreateTaskOperator
, DmsStartTaskOperator
,
DmsTaskCompletedSensor
, DmsDeleteTaskOperator
to create replication task, start it, wait for it
to be completed, and then delete it.
Defining tasks¶
In the following code we create a new replication task, start it, wait for it to be completed and then delete it.
create_task = DmsCreateTaskOperator(
task_id='create_task',
replication_task_id=REPLICATION_TASK_ID,
source_endpoint_arn=SOURCE_ENDPOINT_ARN,
target_endpoint_arn=TARGET_ENDPOINT_ARN,
replication_instance_arn=REPLICATION_INSTANCE_ARN,
table_mappings=TABLE_MAPPINGS,
)
start_task = DmsStartTaskOperator(
task_id='start_task',
replication_task_arn='{{ task_instance.xcom_pull(task_ids="create_task", key="return_value") }}',
)
wait_for_completion = DmsTaskCompletedSensor(
task_id='wait_for_completion',
replication_task_arn='{{ task_instance.xcom_pull(task_ids="create_task", key="return_value") }}',
)
delete_task = DmsDeleteTaskOperator(
task_id='delete_task',
replication_task_arn='{{ task_instance.xcom_pull(task_ids="create_task", key="return_value") }}',
)