Source code for airflow.providers.amazon.aws.example_dags.example_dms
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Note: DMS requires you to configure specific IAM roles/permissions. For more information, seehttps://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.APIRole"""importjsonimportosfromdatetimeimportdatetimeimportboto3fromsqlalchemyimportColumn,MetaData,String,Table,create_enginefromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.operators.pythonimportget_current_contextfromairflow.providers.amazon.aws.operators.dmsimport(DmsCreateTaskOperator,DmsDeleteTaskOperator,DmsDescribeTasksOperator,DmsStartTaskOperator,DmsStopTaskOperator,)fromairflow.providers.amazon.aws.operators.rdsimport(RdsCreateDbInstanceOperator,RdsDeleteDbInstanceOperator,)fromairflow.providers.amazon.aws.sensors.dmsimportDmsTaskBaseSensor,DmsTaskCompletedSensor
# The project name will be used as a prefix for various entity names.# Use either PascalCase or camelCase. While some names require kebab-case# and others require snake_case, they all accept mixedCase strings.
[docs]defcreate_sample_table():print('Creating sample table.')rds_endpoint=_get_rds_instance_endpoint()hostname=rds_endpoint['Address']port=rds_endpoint['Port']rds_url=f'{RDS_PROTOCOL}://{RDS_USERNAME}:{RDS_PASSWORD}@{hostname}:{port}/{RDS_DB_NAME}'engine=create_engine(rds_url)table=Table(TABLE_NAME,MetaData(engine),Column(TABLE_HEADERS[0],String,primary_key=True),Column(TABLE_HEADERS[1],String),)withengine.connect()asconnection:# Create the Table.table.create()load_data=table.insert().values(SAMPLE_DATA)connection.execute(load_data)# Read the data back to verify everything is working.connection.execute(table.select())