Source code for airflow.providers.amazon.aws.operators.rds
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportwarningsfromdatetimeimporttimedeltafromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,Sequencefromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.rdsimportRdsHookfromairflow.providers.amazon.aws.triggers.rdsimport(RdsDbAvailableTrigger,RdsDbDeletedTrigger,RdsDbStoppedTrigger,)fromairflow.providers.amazon.aws.utils.rdsimportRdsDbTypefromairflow.providers.amazon.aws.utils.tagsimportformat_tagsfromairflow.providers.amazon.aws.utils.waiter_with_loggingimportwaitifTYPE_CHECKING:frommypy_boto3_rds.type_defsimportTagTypeDeffromairflow.utils.contextimportContextclassRdsBaseOperator(BaseOperator):"""Base operator that implements common functions for all operators."""ui_color="#eeaa88"ui_fgcolor="#ffffff"def__init__(self,*args,aws_conn_id:str="aws_conn_id",region_name:str|None=None,hook_params:dict|None=None,**kwargs,):ifhook_paramsisnotNone:warnings.warn("The parameter hook_params is deprecated and will be removed. ""Note that it is also incompatible with deferrable mode. ""You can use the region_name parameter to specify the region. ""If you were using hook_params for other purposes, please get in touch either on ""airflow slack, or by opening a github issue on the project. ""You can mention https://github.com/apache/airflow/pull/32352",AirflowProviderDeprecationWarning,stacklevel=3,# 2 is in the operator's init, 3 is in the user code creating the operator)self.hook_params=hook_paramsor{}self.aws_conn_id=aws_conn_idself.region_name=region_nameorself.hook_params.pop("region_name",None)super().__init__(*args,**kwargs)self._await_interval=60# seconds@cached_propertydefhook(self)->RdsHook:returnRdsHook(aws_conn_id=self.aws_conn_id,region_name=self.region_name,**self.hook_params)defexecute(self,context:Context)->str:"""Different implementations for snapshots, tasks and events."""raiseNotImplementedErrordefon_kill(self)->None:"""Different implementations for snapshots, tasks and events."""raiseNotImplementedError
[docs]classRdsCreateDbSnapshotOperator(RdsBaseOperator):""" Creates a snapshot of a DB instance or DB cluster. The source DB instance or cluster must be in the available or storage-optimization state. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCreateDbSnapshotOperator` :param db_type: Type of the DB - either "instance" or "cluster" :param db_identifier: The identifier of the instance or cluster that you want to create the snapshot of :param db_snapshot_identifier: The identifier for the DB snapshot :param tags: A dictionary of tags or a list of tags in format `[{"Key": "...", "Value": "..."},]` `USER Tagging <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Tagging.html>`__ :param wait_for_completion: If True, waits for creation of the DB snapshot to complete. (default: True) """
[docs]defexecute(self,context:Context)->str:self.log.info("Starting to create snapshot of RDS %s '%s': %s",self.db_type,self.db_identifier,self.db_snapshot_identifier,)formatted_tags=format_tags(self.tags)ifself.db_type.value=="instance":create_instance_snap=self.hook.conn.create_db_snapshot(DBInstanceIdentifier=self.db_identifier,DBSnapshotIdentifier=self.db_snapshot_identifier,Tags=formatted_tags,)create_response=json.dumps(create_instance_snap,default=str)ifself.wait_for_completion:self.hook.wait_for_db_snapshot_state(self.db_snapshot_identifier,target_state="available")else:create_cluster_snap=self.hook.conn.create_db_cluster_snapshot(DBClusterIdentifier=self.db_identifier,DBClusterSnapshotIdentifier=self.db_snapshot_identifier,Tags=formatted_tags,)create_response=json.dumps(create_cluster_snap,default=str)ifself.wait_for_completion:self.hook.wait_for_db_cluster_snapshot_state(self.db_snapshot_identifier,target_state="available")returncreate_response
[docs]classRdsCopyDbSnapshotOperator(RdsBaseOperator):""" Copies the specified DB instance or DB cluster snapshot. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCopyDbSnapshotOperator` :param db_type: Type of the DB - either "instance" or "cluster" :param source_db_snapshot_identifier: The identifier of the source snapshot :param target_db_snapshot_identifier: The identifier of the target snapshot :param kms_key_id: The AWS KMS key identifier for an encrypted DB snapshot :param tags: A dictionary of tags or a list of tags in format `[{"Key": "...", "Value": "..."},]` `USER Tagging <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Tagging.html>`__ :param copy_tags: Whether to copy all tags from the source snapshot to the target snapshot (default False) :param pre_signed_url: The URL that contains a Signature Version 4 signed request :param option_group_name: The name of an option group to associate with the copy of the snapshot Only when db_type='instance' :param target_custom_availability_zone: The external custom Availability Zone identifier for the target Only when db_type='instance' :param source_region: The ID of the region that contains the snapshot to be copied :param wait_for_completion: If True, waits for snapshot copy to complete. (default: True) """
[docs]defexecute(self,context:Context)->str:self.log.info("Starting to copy snapshot '%s' as '%s'",self.source_db_snapshot_identifier,self.target_db_snapshot_identifier,)formatted_tags=format_tags(self.tags)ifself.db_type.value=="instance":copy_instance_snap=self.hook.conn.copy_db_snapshot(SourceDBSnapshotIdentifier=self.source_db_snapshot_identifier,TargetDBSnapshotIdentifier=self.target_db_snapshot_identifier,KmsKeyId=self.kms_key_id,Tags=formatted_tags,CopyTags=self.copy_tags,PreSignedUrl=self.pre_signed_url,OptionGroupName=self.option_group_name,TargetCustomAvailabilityZone=self.target_custom_availability_zone,SourceRegion=self.source_region,)copy_response=json.dumps(copy_instance_snap,default=str)ifself.wait_for_completion:self.hook.wait_for_db_snapshot_state(self.target_db_snapshot_identifier,target_state="available")else:copy_cluster_snap=self.hook.conn.copy_db_cluster_snapshot(SourceDBClusterSnapshotIdentifier=self.source_db_snapshot_identifier,TargetDBClusterSnapshotIdentifier=self.target_db_snapshot_identifier,KmsKeyId=self.kms_key_id,Tags=formatted_tags,CopyTags=self.copy_tags,PreSignedUrl=self.pre_signed_url,SourceRegion=self.source_region,)copy_response=json.dumps(copy_cluster_snap,default=str)ifself.wait_for_completion:self.hook.wait_for_db_cluster_snapshot_state(self.target_db_snapshot_identifier,target_state="available")returncopy_response
[docs]classRdsDeleteDbSnapshotOperator(RdsBaseOperator):""" Deletes a DB instance or cluster snapshot or terminating the copy operation. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsDeleteDbSnapshotOperator` :param db_type: Type of the DB - either "instance" or "cluster" :param db_snapshot_identifier: The identifier for the DB instance or DB cluster snapshot """
[docs]defexecute(self,context:Context)->str:self.log.info("Starting to delete snapshot '%s'",self.db_snapshot_identifier)ifself.db_type.value=="instance":delete_instance_snap=self.hook.conn.delete_db_snapshot(DBSnapshotIdentifier=self.db_snapshot_identifier,)delete_response=json.dumps(delete_instance_snap,default=str)ifself.wait_for_completion:self.hook.wait_for_db_snapshot_state(self.db_snapshot_identifier,target_state="deleted")else:delete_cluster_snap=self.hook.conn.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier=self.db_snapshot_identifier,)delete_response=json.dumps(delete_cluster_snap,default=str)ifself.wait_for_completion:self.hook.wait_for_db_cluster_snapshot_state(self.db_snapshot_identifier,target_state="deleted")returndelete_response
[docs]classRdsStartExportTaskOperator(RdsBaseOperator):""" Starts an export of a snapshot to Amazon S3. The provided IAM role must have access to the S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsStartExportTaskOperator` :param export_task_identifier: A unique identifier for the snapshot export task. :param source_arn: The Amazon Resource Name (ARN) of the snapshot to export to Amazon S3. :param s3_bucket_name: The name of the Amazon S3 bucket to export the snapshot to. :param iam_role_arn: The name of the IAM role to use for writing to the Amazon S3 bucket. :param kms_key_id: The ID of the Amazon Web Services KMS key to use to encrypt the snapshot. :param s3_prefix: The Amazon S3 bucket prefix to use as the file name and path of the exported snapshot. :param export_only: The data to be exported from the snapshot. :param wait_for_completion: If True, waits for the DB snapshot export to complete. (default: True) :param waiter_interval: The number of seconds to wait before checking the export status. (default: 30) :param waiter_max_attempts: The number of attempts to make before failing. (default: 40) """
[docs]defexecute(self,context:Context)->str:self.log.info("Starting export task %s for snapshot %s",self.export_task_identifier,self.source_arn)start_export=self.hook.conn.start_export_task(ExportTaskIdentifier=self.export_task_identifier,SourceArn=self.source_arn,S3BucketName=self.s3_bucket_name,IamRoleArn=self.iam_role_arn,KmsKeyId=self.kms_key_id,S3Prefix=self.s3_prefix,ExportOnly=self.export_only,)ifself.wait_for_completion:self.hook.wait_for_export_task_state(export_task_id=self.export_task_identifier,target_state="complete",check_interval=self.waiter_interval,max_attempts=self.waiter_max_attempts,)returnjson.dumps(start_export,default=str)
[docs]classRdsCancelExportTaskOperator(RdsBaseOperator):""" Cancels an export task in progress that is exporting a snapshot to Amazon S3. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCancelExportTaskOperator` :param export_task_identifier: The identifier of the snapshot export task to cancel :param wait_for_completion: If True, waits for DB snapshot export to cancel. (default: True) :param check_interval: The amount of time in seconds to wait between attempts :param max_attempts: The maximum number of attempts to be made """
[docs]classRdsCreateEventSubscriptionOperator(RdsBaseOperator):""" Creates an RDS event notification subscription. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCreateEventSubscriptionOperator` :param subscription_name: The name of the subscription (must be less than 255 characters) :param sns_topic_arn: The ARN of the SNS topic created for event notification :param source_type: The type of source that is generating the events. Valid values: db-instance | db-cluster | db-parameter-group | db-security-group | db-snapshot | db-cluster-snapshot | db-proxy :param event_categories: A list of event categories for a source type that you want to subscribe to `USER Events <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Events.Messages.html>`__ :param source_ids: The list of identifiers of the event sources for which events are returned :param enabled: A value that indicates whether to activate the subscription (default True)l :param tags: A dictionary of tags or a list of tags in format `[{"Key": "...", "Value": "..."},]` `USER Tagging <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Tagging.html>`__ :param wait_for_completion: If True, waits for creation of the subscription to complete. (default: True) """
[docs]defexecute(self,context:Context)->str:self.log.info("Creating event subscription '%s' to '%s'",self.subscription_name,self.sns_topic_arn)formatted_tags=format_tags(self.tags)create_subscription=self.hook.conn.create_event_subscription(SubscriptionName=self.subscription_name,SnsTopicArn=self.sns_topic_arn,SourceType=self.source_type,EventCategories=self.event_categories,SourceIds=self.source_ids,Enabled=self.enabled,Tags=formatted_tags,)ifself.wait_for_completion:self.hook.wait_for_event_subscription_state(self.subscription_name,target_state="active")returnjson.dumps(create_subscription,default=str)
[docs]classRdsDeleteEventSubscriptionOperator(RdsBaseOperator):""" Deletes an RDS event notification subscription. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsDeleteEventSubscriptionOperator` :param subscription_name: The name of the RDS event notification subscription you want to delete """
[docs]classRdsCreateDbInstanceOperator(RdsBaseOperator):""" Creates an RDS DB instance. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCreateDbInstanceOperator` :param db_instance_identifier: The DB instance identifier, must start with a letter and contain from 1 to 63 letters, numbers, or hyphens :param db_instance_class: The compute and memory capacity of the DB instance, for example db.m5.large :param engine: The name of the database engine to be used for this instance :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance :param wait_for_completion: If True, waits for creation of the DB instance to complete. (default: True) :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check DB instance state :param waiter_max_attempts: The maximum number of attempts to check DB instance state :param deferrable: If True, the operator will wait asynchronously for the DB instance to be created. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) """
[docs]defexecute(self,context:Context)->str:self.log.info("Creating new DB instance %s",self.db_instance_identifier)create_db_instance=self.hook.conn.create_db_instance(DBInstanceIdentifier=self.db_instance_identifier,DBInstanceClass=self.db_instance_class,Engine=self.engine,**self.rds_kwargs,)ifself.deferrable:self.defer(trigger=RdsDbAvailableTrigger(db_identifier=self.db_instance_identifier,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,region_name=self.region_name,# ignoring type because create_db_instance is a dictresponse=create_db_instance,# type: ignore[arg-type]db_type=RdsDbType.INSTANCE,),method_name="execute_complete",timeout=timedelta(seconds=self.waiter_delay*self.waiter_max_attempts),)ifself.wait_for_completion:waiter=self.hook.conn.get_waiter("db_instance_available")wait(waiter=waiter,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,args={"DBInstanceIdentifier":self.db_instance_identifier},failure_message="DB instance creation failed",status_message="DB Instance status is",status_args=["DBInstances[0].DBInstanceStatus"],)returnjson.dumps(create_db_instance,default=str)
[docs]classRdsDeleteDbInstanceOperator(RdsBaseOperator):""" Deletes an RDS DB Instance. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsDeleteDbInstanceOperator` :param db_instance_identifier: The DB instance identifier for the DB instance to be deleted :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.delete_db_instance :param wait_for_completion: If True, waits for deletion of the DB instance to complete. (default: True) :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check DB instance state :param waiter_max_attempts: The maximum number of attempts to check DB instance state :param deferrable: If True, the operator will wait asynchronously for the DB instance to be created. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) """
[docs]defexecute(self,context:Context)->str:self.log.info("Deleting DB instance %s",self.db_instance_identifier)delete_db_instance=self.hook.conn.delete_db_instance(DBInstanceIdentifier=self.db_instance_identifier,**self.rds_kwargs,)ifself.deferrable:self.defer(trigger=RdsDbDeletedTrigger(db_identifier=self.db_instance_identifier,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,region_name=self.region_name,# ignoring type because delete_db_instance is a dictresponse=delete_db_instance,# type: ignore[arg-type]db_type=RdsDbType.INSTANCE,),method_name="execute_complete",timeout=timedelta(seconds=self.waiter_delay*self.waiter_max_attempts),)ifself.wait_for_completion:waiter=self.hook.conn.get_waiter("db_instance_deleted")wait(waiter=waiter,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,args={"DBInstanceIdentifier":self.db_instance_identifier},failure_message="DB instance deletion failed",status_message="DB Instance status is",status_args=["DBInstances[0].DBInstanceStatus"],)returnjson.dumps(delete_db_instance,default=str)
[docs]classRdsStartDbOperator(RdsBaseOperator):""" Starts an RDS DB instance / cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsStartDbOperator` :param db_identifier: The AWS identifier of the DB to start :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance") :param wait_for_completion: If True, waits for DB to start. (default: True) :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check DB instance state :param waiter_max_attempts: The maximum number of attempts to check DB instance state :param deferrable: If True, the operator will wait asynchronously for the DB instance to be created. This implies waiting for completion. This mode requires aiobotocore module to be installed. """
[docs]defexecute_complete(self,context:Context,event:dict[str,Any]|None=None)->str:ifeventisNoneorevent["status"]!="success":raiseAirflowException(f"Failed to start DB: {event}")else:returnjson.dumps(event["response"],default=str)
def_start_db(self):self.log.info("Starting DB %s '%s'",self.db_type.value,self.db_identifier)ifself.db_type==RdsDbType.INSTANCE:response=self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier,)else:response=self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)returnresponsedef_wait_until_db_available(self):self.log.info("Waiting for DB %s to reach 'available' state",self.db_type.value)ifself.db_type==RdsDbType.INSTANCE:self.hook.wait_for_db_instance_state(self.db_identifier,target_state="available",check_interval=self.waiter_delay,max_attempts=self.waiter_max_attempts,)else:self.hook.wait_for_db_cluster_state(self.db_identifier,target_state="available",check_interval=self.waiter_delay,max_attempts=self.waiter_max_attempts,)
[docs]classRdsStopDbOperator(RdsBaseOperator):""" Stops an RDS DB instance / cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsStopDbOperator` :param db_identifier: The AWS identifier of the DB to stop :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance") :param db_snapshot_identifier: The instance identifier of the DB Snapshot to create before stopping the DB instance. The default value (None) skips snapshot creation. This parameter is ignored when ``db_type`` is "cluster" :param wait_for_completion: If True, waits for DB to stop. (default: True) :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check DB instance state :param waiter_max_attempts: The maximum number of attempts to check DB instance state :param deferrable: If True, the operator will wait asynchronously for the DB instance to be created. This implies waiting for completion. This mode requires aiobotocore module to be installed. """
[docs]defexecute_complete(self,context:Context,event:dict[str,Any]|None=None)->str:ifeventisNoneorevent["status"]!="success":raiseAirflowException(f"Failed to start DB: {event}")else:returnjson.dumps(event["response"],default=str)
def_stop_db(self):self.log.info("Stopping DB %s '%s'",self.db_type.value,self.db_identifier)ifself.db_type==RdsDbType.INSTANCE:conn_params={"DBInstanceIdentifier":self.db_identifier}# The db snapshot parameter is optional, but the AWS SDK raises an exception# if passed a null value. Only set snapshot id if value is present.ifself.db_snapshot_identifier:conn_params["DBSnapshotIdentifier"]=self.db_snapshot_identifierresponse=self.hook.conn.stop_db_instance(**conn_params)else:ifself.db_snapshot_identifier:self.log.warning("'db_snapshot_identifier' does not apply to db clusters. ""Remove it to silence this warning.")response=self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)returnresponsedef_wait_until_db_stopped(self):self.log.info("Waiting for DB %s to reach 'stopped' state",self.db_type.value)ifself.db_type==RdsDbType.INSTANCE:self.hook.wait_for_db_instance_state(self.db_identifier,target_state="stopped",check_interval=self.waiter_delay,max_attempts=self.waiter_max_attempts,)else:self.hook.wait_for_db_cluster_state(self.db_identifier,target_state="stopped",check_interval=self.waiter_delay,max_attempts=self.waiter_max_attempts,)