Source code for tests.system.amazon.aws.example_dms_serverless
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Note: DMS requires you to configure specific IAM roles/permissions. For more information, seehttps://docs.aws.amazon.com/dms/latest/userguide/security-iam.html#CHAP_Security.APIRole"""from__future__importannotationsimportjsonfromdatetimeimportdatetimeimportboto3fromsqlalchemyimportColumn,MetaData,String,Table,create_enginefromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.amazon.aws.operators.dmsimport(DmsCreateReplicationConfigOperator,DmsDeleteReplicationConfigOperator,DmsDescribeReplicationConfigsOperator,DmsDescribeReplicationsOperator,)fromairflow.providers.amazon.aws.operators.rdsimport(RdsCreateDbInstanceOperator,RdsDeleteDbInstanceOperator,)fromairflow.providers.amazon.aws.operators.s3importS3CreateBucketOperator,S3DeleteBucketOperatorfromairflow.utils.trigger_ruleimportTriggerRulefromsystem.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder"""This example demonstrates how to use the DMS operators to create a serverless replication task to replicate datafrom a PostgreSQL database to Amazon S3.The IAM role used for the replication must have the permissions defined in the [Amazon S3 target](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.S3.html#CHAP_Target.S3.Prerequisites)documentation."""
[docs]defcreate_sample_table(instance_name:str,db_name:str,table_name:str):print("Creating sample table.")rds_endpoint=_get_rds_instance_endpoint(instance_name)hostname=rds_endpoint["Address"]port=rds_endpoint["Port"]rds_url=f"{RDS_PROTOCOL}://{RDS_USERNAME}:{RDS_PASSWORD}@{hostname}:{port}/{db_name}"engine=create_engine(rds_url)table=Table(table_name,MetaData(engine),Column(TABLE_HEADERS[0],String,primary_key=True),Column(TABLE_HEADERS[1],String),)withengine.connect()asconnection:# Create the Table.table.create()load_data=table.insert().values(SAMPLE_DATA)connection.execute(load_data)# Read the data back to verify everything is working.connection.execute(table.select())
env_id=test_context[ENV_ID_KEY]role_arn=test_context[ROLE_ARN_KEY]bucket_name=f"{env_id}-dms-serverless-bucket"rds_instance_name=f"{env_id}-instance"rds_db_name=f"{env_id}_source_database"# dashes are not allowed in db namerds_table_name=f"{env_id}-table"source_endpoint_identifier=f"{env_id}-source-endpoint"target_endpoint_identifier=f"{env_id}-target-endpoint"replication_id=f"{env_id}-replication-id"create_s3_bucket=S3CreateBucketOperator(task_id="create_s3_bucket",bucket_name=bucket_name)create_db_instance=RdsCreateDbInstanceOperator(task_id="create_db_instance",db_instance_identifier=rds_instance_name,db_instance_class="db.t3.micro",engine=RDS_ENGINE,rds_kwargs={"DBName":rds_db_name,"AllocatedStorage":20,"MasterUsername":RDS_USERNAME,"MasterUserPassword":RDS_PASSWORD,"PubliclyAccessible":True,},)# Sample data.table_definition={"TableCount":"1","Tables":[{"TableName":rds_table_name,"TableColumns":[{"ColumnName":TABLE_HEADERS[0],"ColumnType":"STRING","ColumnNullable":"false","ColumnIsPk":"true",},{"ColumnName":TABLE_HEADERS[1],"ColumnType":"STRING","ColumnLength":"4"},],"TableColumnsTotal":"2",}],}table_mappings={"rules":[{"rule-type":"selection","rule-id":"1","rule-name":"1","object-locator":{"schema-name":"public","table-name":rds_table_name,},"rule-action":"include",}]}create_assets=create_dms_assets(db_name=rds_db_name,instance_name=rds_instance_name,bucket_name=bucket_name,role_arn=role_arn,source_endpoint_identifier=source_endpoint_identifier,target_endpoint_identifier=target_endpoint_identifier,table_definition=table_definition,)# [START howto_operator_dms_create_replication_config]create_replication_config=DmsCreateReplicationConfigOperator(task_id="create_replication_config",replication_config_id=replication_id,source_endpoint_arn=create_assets["source_endpoint_arn"],target_endpoint_arn=create_assets["target_endpoint_arn"],compute_config={"MaxCapacityUnits":4,"MinCapacityUnits":1,"MultiAZ":False,"ReplicationSubnetGroupId":"default",},replication_type="full-load",table_mappings=json.dumps(table_mappings),)# [END howto_operator_dms_create_replication_config]# [START howto_operator_dms_describe_replication_config]describe_replication_configs=DmsDescribeReplicationConfigsOperator(task_id="describe_replication_configs",)# [END howto_operator_dms_describe_replication_config]# [START howto_operator_dms_serverless_describe_replication]describe_replications=DmsDescribeReplicationsOperator(task_id="describe_replications",)# [END howto_operator_dms_serverless_describe_replication]# Comment the next two tasks because they take too much time to be run in the CI# Keep them for documentation purposes""" # [START howto_operator_dms_serverless_start_replication] replicate = DmsStartReplicationOperator( task_id="replicate", replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}", replication_start_type="start-replication", wait_for_completion=True, waiter_delay=60, waiter_max_attempts=200, ) # [END howto_operator_dms_serverless_start_replication] # [START howto_operator_dms_serverless_stop_replication] stop_replication = DmsStopReplicationOperator( task_id="stop_replication", replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}", wait_for_completion=True, waiter_delay=120, waiter_max_attempts=200, ) # [END howto_operator_dms_serverless_stop_replication] """# [START howto_operator_dms_serverless_delete_replication_config]delete_replication_config=DmsDeleteReplicationConfigOperator(task_id="delete_replication_config",waiter_max_attempts=200,replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",)# [END howto_operator_dms_serverless_delete_replication_config]delete_replication_config.trigger_rule=TriggerRule.ALL_DONEdelete_assets=delete_dms_assets(source_endpoint_arn=create_assets["source_endpoint_arn"],target_endpoint_arn=create_assets["target_endpoint_arn"],source_endpoint_identifier=source_endpoint_identifier,target_endpoint_identifier=target_endpoint_identifier,)delete_db_instance=RdsDeleteDbInstanceOperator(task_id="delete_db_instance",db_instance_identifier=rds_instance_name,rds_kwargs={"SkipFinalSnapshot":True,},trigger_rule=TriggerRule.ALL_DONE,)delete_s3_bucket=S3DeleteBucketOperator(task_id="delete_s3_bucket",bucket_name=bucket_name,force_delete=True,trigger_rule=TriggerRule.ALL_DONE,)chain(# TEST SETUPtest_context,create_s3_bucket,create_db_instance,create_sample_table(rds_instance_name,rds_db_name,rds_table_name),create_assets,# TEST BODYcreate_replication_config,describe_replication_configs,describe_replications,delete_replication_config,# TEST TEARDOWNdelete_assets,delete_db_instance,delete_s3_bucket,)fromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)