Source code for airflow.providers.amazon.aws.operators.dms
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKING,Sequencefromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.dmsimportDmsHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classDmsCreateTaskOperator(BaseOperator):""" Creates AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsCreateTaskOperator` :param replication_task_id: Replication task id :param source_endpoint_arn: Source endpoint ARN :param target_endpoint_arn: Target endpoint ARN :param replication_instance_arn: Replication instance ARN :param table_mappings: Table mappings :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default. :param create_task_kwargs: Extra arguments for DMS replication task creation. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defexecute(self,context:Context):""" Create AWS DMS replication task from Airflow. :return: replication task arn """dms_hook=DmsHook(aws_conn_id=self.aws_conn_id)task_arn=dms_hook.create_replication_task(replication_task_id=self.replication_task_id,source_endpoint_arn=self.source_endpoint_arn,target_endpoint_arn=self.target_endpoint_arn,replication_instance_arn=self.replication_instance_arn,migration_type=self.migration_type,table_mappings=self.table_mappings,**self.create_task_kwargs,)self.log.info("DMS replication task(%s) is ready.",self.replication_task_id)returntask_arn
[docs]classDmsDeleteTaskOperator(BaseOperator):""" Deletes AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDeleteTaskOperator` :param replication_task_arn: Replication task ARN :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defexecute(self,context:Context):""" Delete AWS DMS replication task from Airflow. :return: replication task arn """dms_hook=DmsHook(aws_conn_id=self.aws_conn_id)dms_hook.delete_replication_task(replication_task_arn=self.replication_task_arn)self.log.info("DMS replication task(%s) has been deleted.",self.replication_task_arn)
[docs]classDmsDescribeTasksOperator(BaseOperator):""" Describes AWS DMS replication tasks. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDescribeTasksOperator` :param describe_tasks_kwargs: Describe tasks command arguments :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defexecute(self,context:Context)->tuple[str|None,list]:""" Describe AWS DMS replication tasks from Airflow. :return: Marker and list of replication tasks """dms_hook=DmsHook(aws_conn_id=self.aws_conn_id)returndms_hook.describe_replication_tasks(**self.describe_tasks_kwargs)
[docs]classDmsStartTaskOperator(BaseOperator):""" Starts AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsStartTaskOperator` :param replication_task_arn: Replication task ARN :param start_replication_task_type: Replication task start type (default='start-replication') ('start-replication'|'resume-processing'|'reload-target') :param start_task_kwargs: Extra start replication task arguments :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defexecute(self,context:Context):""" Start AWS DMS replication task from Airflow. :return: replication task arn """dms_hook=DmsHook(aws_conn_id=self.aws_conn_id)dms_hook.start_replication_task(replication_task_arn=self.replication_task_arn,start_replication_task_type=self.start_replication_task_type,**self.start_task_kwargs,)self.log.info("DMS replication task(%s) is starting.",self.replication_task_arn)
[docs]classDmsStopTaskOperator(BaseOperator):""" Stops AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsStopTaskOperator` :param replication_task_arn: Replication task ARN :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """