Source code for tests.system.providers.amazon.aws.example_dms
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Note: DMS requires you to configure specific IAM roles/permissions. For more information, seehttps://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.APIRole"""from__future__importannotationsimportjsonfromdatetimeimportdatetimefromtypingimportcastimportboto3fromsqlalchemyimportColumn,MetaData,String,Table,create_enginefromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.providers.amazon.aws.operators.dmsimport(DmsCreateTaskOperator,DmsDeleteTaskOperator,DmsDescribeTasksOperator,DmsStartTaskOperator,DmsStopTaskOperator,)fromairflow.providers.amazon.aws.operators.rdsimport(RdsCreateDbInstanceOperator,RdsDeleteDbInstanceOperator,)fromairflow.providers.amazon.aws.operators.s3importS3CreateBucketOperator,S3DeleteBucketOperatorfromairflow.providers.amazon.aws.sensors.dmsimportDmsTaskBaseSensor,DmsTaskCompletedSensorfromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder
[docs]defcreate_security_group(security_group_name:str,vpc_id:str):client=boto3.client('ec2')security_group=client.create_security_group(GroupName=security_group_name,Description='Created for DMS system test',VpcId=vpc_id,)client.get_waiter('security_group_exists').wait(GroupIds=[security_group['GroupId']],GroupNames=[security_group_name],WaiterConfig={'Delay':15,'MaxAttempts':4},)client.authorize_security_group_ingress(GroupId=security_group['GroupId'],GroupName=security_group_name,IpPermissions=[SG_IP_PERMISSION],)returnsecurity_group['GroupId']
@task
[docs]defcreate_sample_table(instance_name:str,db_name:str,table_name:str):print('Creating sample table.')rds_endpoint=_get_rds_instance_endpoint(instance_name)hostname=rds_endpoint['Address']port=rds_endpoint['Port']rds_url=f'{RDS_PROTOCOL}://{RDS_USERNAME}:{RDS_PASSWORD}@{hostname}:{port}/{db_name}'engine=create_engine(rds_url)table=Table(table_name,MetaData(engine),Column(TABLE_HEADERS[0],String,primary_key=True),Column(TABLE_HEADERS[1],String),)withengine.connect()asconnection:# Create the Table.table.create()load_data=table.insert().values(SAMPLE_DATA)connection.execute(load_data)# Read the data back to verify everything is working.connection.execute(table.select())
env_id=test_context[ENV_ID_KEY]role_arn=test_context[ROLE_ARN_KEY]vpc_id=test_context[VPC_ID_KEY]bucket_name=f'{env_id}-dms-bucket'rds_instance_name=f'{env_id}-instance'rds_db_name=f'{env_id}_source_database'# dashes are not allowed in db namerds_table_name=f'{env_id}-table'dms_replication_instance_name=f'{env_id}-replication-instance'dms_replication_task_id=f'{env_id}-replication-task'source_endpoint_identifier=f'{env_id}-source-endpoint'target_endpoint_identifier=f'{env_id}-target-endpoint'security_group_name=f'{env_id}-dms-security-group'# Sample data.table_definition={'TableCount':'1','Tables':[{'TableName':rds_table_name,'TableColumns':[{'ColumnName':TABLE_HEADERS[0],'ColumnType':'STRING','ColumnNullable':'false','ColumnIsPk':'true',},{"ColumnName":TABLE_HEADERS[1],"ColumnType":'STRING',"ColumnLength":"4"},],'TableColumnsTotal':'2',}],}table_mappings={'rules':[{'rule-type':'selection','rule-id':'1','rule-name':'1','object-locator':{'schema-name':'public','table-name':rds_table_name,},'rule-action':'include',}]}create_s3_bucket=S3CreateBucketOperator(task_id='create_s3_bucket',bucket_name=bucket_name)create_sg=create_security_group(security_group_name,vpc_id)create_db_instance=RdsCreateDbInstanceOperator(task_id="create_db_instance",db_instance_identifier=rds_instance_name,db_instance_class='db.t3.micro',engine=RDS_ENGINE,rds_kwargs={"DBName":rds_db_name,"AllocatedStorage":20,"MasterUsername":RDS_USERNAME,"MasterUserPassword":RDS_PASSWORD,"PubliclyAccessible":True,"VpcSecurityGroupIds":[create_sg,],},)create_assets=create_dms_assets(db_name=rds_db_name,instance_name=rds_instance_name,replication_instance_name=dms_replication_instance_name,bucket_name=bucket_name,role_arn=role_arn,source_endpoint_identifier=source_endpoint_identifier,target_endpoint_identifier=target_endpoint_identifier,table_definition=table_definition,)# [START howto_operator_dms_create_task]create_task=DmsCreateTaskOperator(task_id='create_task',replication_task_id=dms_replication_task_id,source_endpoint_arn=create_assets['source_endpoint_arn'],target_endpoint_arn=create_assets['target_endpoint_arn'],replication_instance_arn=create_assets['replication_instance_arn'],table_mappings=table_mappings,)# [END howto_operator_dms_create_task]task_arn=cast(str,create_task.output)# [START howto_operator_dms_start_task]start_task=DmsStartTaskOperator(task_id='start_task',replication_task_arn=task_arn,)# [END howto_operator_dms_start_task]# [START howto_operator_dms_describe_tasks]describe_tasks=DmsDescribeTasksOperator(task_id='describe_tasks',describe_tasks_kwargs={'Filters':[{'Name':'replication-instance-arn','Values':[create_assets['replication_instance_arn']],}]},do_xcom_push=False,)# [END howto_operator_dms_describe_tasks]await_task_start=DmsTaskBaseSensor(task_id='await_task_start',replication_task_arn=task_arn,target_statuses=['running'],termination_statuses=['stopped','deleting','failed'],)# [START howto_operator_dms_stop_task]stop_task=DmsStopTaskOperator(task_id='stop_task',replication_task_arn=task_arn,)# [END howto_operator_dms_stop_task]# TaskCompletedSensor actually waits until task reaches the "Stopped" state, so it will work here.# [START howto_sensor_dms_task_completed]await_task_stop=DmsTaskCompletedSensor(task_id='await_task_stop',replication_task_arn=task_arn,)# [END howto_sensor_dms_task_completed]# [START howto_operator_dms_delete_task]delete_task=DmsDeleteTaskOperator(task_id='delete_task',replication_task_arn=task_arn,)# [END howto_operator_dms_delete_task]delete_task.trigger_rule=TriggerRule.ALL_DONEdelete_assets=delete_dms_assets(replication_instance_arn=create_assets['replication_instance_arn'],source_endpoint_arn=create_assets['source_endpoint_arn'],target_endpoint_arn=create_assets['target_endpoint_arn'],source_endpoint_identifier=source_endpoint_identifier,target_endpoint_identifier=target_endpoint_identifier,replication_instance_name=dms_replication_instance_name,)delete_db_instance=RdsDeleteDbInstanceOperator(task_id='delete_db_instance',db_instance_identifier=rds_instance_name,rds_kwargs={"SkipFinalSnapshot":True,},trigger_rule=TriggerRule.ALL_DONE,)delete_s3_bucket=S3DeleteBucketOperator(task_id='delete_s3_bucket',bucket_name=bucket_name,force_delete=True,trigger_rule=TriggerRule.ALL_DONE,)chain(# TEST SETUPtest_context,create_s3_bucket,create_sg,create_db_instance,create_sample_table(rds_instance_name,rds_db_name,rds_table_name),create_assets,# TEST BODYcreate_task,start_task,describe_tasks,await_task_start,stop_task,await_task_stop,# TEST TEARDOWNdelete_task,delete_assets,delete_db_instance,delete_security_group(create_sg,security_group_name),delete_s3_bucket,)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)