RDS management operators

Create DB snapshot

To create a snapshot of AWS RDS DB instance or DB cluster snapshot you can use RDSCreateDBSnapshotOperator. The source DB instance must be in the available or storage-optimization state.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

with DAG(
    dag_id='rds_snapshots', start_date=datetime(2021, 1, 1), schedule_interval=None, catchup=False
) as dag:
    create_snapshot = RdsCreateDbSnapshotOperator(
        task_id='create_snapshot',
        db_type='instance',
        db_identifier='auth-db',
        db_snapshot_identifier='auth-db-snap',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )
    copy_snapshot = RdsCopyDbSnapshotOperator(
        task_id='copy_snapshot',
        db_type='instance',
        target_db_snapshot_identifier='auth-db-snap-backup',
        source_db_snapshot_identifier='auth-db-snap',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )
    delete_snapshot = RdsDeleteDbSnapshotOperator(
        task_id='delete_snapshot',
        db_type='instance',
        db_snapshot_identifier='auth-db-snap-backup',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

    create_snapshot >> copy_snapshot >> delete_snapshot

This Operator leverages the AWS CLI create-db-snapshot API create-db-cluster-snapshot API

Copy DB snapshot

To copy AWS RDS DB instance or DB cluster snapshot you can use RDSCopyDBSnapshotOperator. The source DB snapshot must be in the available state.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    copy_snapshot = RdsCopyDbSnapshotOperator(
        task_id='copy_snapshot',
        db_type='instance',
        target_db_snapshot_identifier='auth-db-snap-backup',
        source_db_snapshot_identifier='auth-db-snap',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

This Operator leverages the AWS CLI copy-db-snapshot API copy-db-cluster-snapshot API

Delete DB snapshot

To delete AWS RDS DB instance or DB cluster snapshot you can use RDSDeleteDBSnapshotOperator. The DB snapshot must be in the available state to be deleted.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    delete_snapshot = RdsDeleteDbSnapshotOperator(
        task_id='delete_snapshot',
        db_type='instance',
        db_snapshot_identifier='auth-db-snap-backup',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

This Operator leverages the AWS CLI delete-db-snapshot API delete-db-cluster-snapshot API

Start export task

To start task that exports RDS snapshot to S3 you can use RDSStartExportTaskOperator. The provided IAM role must have access to the S3 bucket.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    start_export = RdsStartExportTaskOperator(
        task_id='start_export',
        export_task_identifier='export-auth-db-snap-{{ ds }}',
        source_arn='arn:aws:rds:<region>:<account number>:snapshot:auth-db-snap',
        s3_bucket_name='my_s3_bucket',
        s3_prefix='some/prefix',
        iam_role_arn='arn:aws:iam:<region>:<account number>:role/MyRole',
        kms_key_id='arn:aws:kms:<region>:<account number>:key/*****-****-****-****-********',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

This Operator leverages the AWS CLI start-export-task API

Cancel export task

To cancel task that exports RDS snapshot to S3 you can use RDSCancelExportTaskOperator. Any data that has already been written to the S3 bucket isn’t removed.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    cancel_export = RdsCancelExportTaskOperator(
        task_id='cancel_export',
        export_task_identifier='export-auth-db-snap-{{ ds }}',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

This Operator leverages the AWS CLI cancel-export-task API

Create event subscription

To create event subscription you can use RDSCreateEventSubscriptionOperator. This action requires a topic Amazon Resource Name (ARN) created by either the RDS console, the SNS console, or the SNS API. To obtain an ARN with SNS, you must create a topic in Amazon SNS and subscribe to the topic. RDS event notification is only available for not encrypted SNS topics. If you specify an encrypted SNS topic, event notifications are not sent for the topic.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    create_subscription = RdsCreateEventSubscriptionOperator(
        task_id='create_subscription',
        subscription_name='my-topic-subscription',
        sns_topic_arn='arn:aws:sns:<region>:<account number>:MyTopic',
        source_type='db-instance',
        source_ids=['auth-db'],
        event_categories=['Availability', 'Backup'],
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

This Operator leverages the AWS CLI create-event-subscription API

Delete event subscription

To delete event subscription you can use RDSDeleteEventSubscriptionOperator

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    delete_subscription = RdsDeleteEventSubscriptionOperator(
        task_id='delete_subscription',
        subscription_name='my-topic-subscription',
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

This Operator leverages the AWS CLI delete-event-subscription API

RDS management sensors

DB snapshot sensor

To wait a snapshot with certain statuses of AWS RDS DB instance or DB cluster snapshot you can use RdsSnapshotExistenceSensor. By default, sensor waits existence of snapshot with status available.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    snapshot_sensor = RdsSnapshotExistenceSensor(
        task_id='snapshot_sensor',
        db_type='instance',
        db_snapshot_identifier='auth-db-snap-{{ ds }}',
        target_statuses=['available'],
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

Export task sensor

To wait a snapshot export task with certain statuses you can use RdsExportTaskExistenceSensor. By default, sensor waits existence of export task with status available.

airflow/providers/amazon/aws/example_dags/example_rds.py[source]

    export_sensor = RdsExportTaskExistenceSensor(
        task_id='export_sensor',
        export_task_identifier='export-auth-db-snap-{{ ds }}',
        target_statuses=['starting', 'in_progress', 'complete', 'canceling', 'canceled'],
        aws_conn_id='aws_default',
        hook_params={'region_name': 'us-east-1'},
    )

Reference

For further information, look at:

Was this entry helpful?