Source code for airflow.providers.amazon.aws.operators.rds
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importjsonimporttimefromtypingimportTYPE_CHECKING,List,Optional,Sequencefrommypy_boto3_rds.type_defsimportTagTypeDeffromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.rdsimportRdsHookfromairflow.providers.amazon.aws.utils.rdsimportRdsDbTypeifTYPE_CHECKING:fromairflow.utils.contextimportContextclassRdsBaseOperator(BaseOperator):"""Base operator that implements common functions for all operators"""ui_color="#eeaa88"ui_fgcolor="#ffffff"def__init__(self,*args,aws_conn_id:str="aws_conn_id",hook_params:Optional[dict]=None,**kwargs):hook_params=hook_paramsor{}self.hook=RdsHook(aws_conn_id=aws_conn_id,**hook_params)super().__init__(*args,**kwargs)self._await_interval=60# secondsdef_describe_item(self,item_type:str,item_name:str)->list:ifitem_type=='instance_snapshot':db_snaps=self.hook.conn.describe_db_snapshots(DBSnapshotIdentifier=item_name)returndb_snaps['DBSnapshots']elifitem_type=='cluster_snapshot':cl_snaps=self.hook.conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier=item_name)returncl_snaps['DBClusterSnapshots']elifitem_type=='export_task':exports=self.hook.conn.describe_export_tasks(ExportTaskIdentifier=item_name)returnexports['ExportTasks']elifitem_type=='event_subscription':subscriptions=self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name)returnsubscriptions['EventSubscriptionsList']else:raiseAirflowException(f"Method for {item_type} is not implemented")def_await_status(self,item_type:str,item_name:str,wait_statuses:Optional[List[str]]=None,ok_statuses:Optional[List[str]]=None,error_statuses:Optional[List[str]]=None,)->None:""" Continuously gets item description from `_describe_item()` and waits until: - status is in `wait_statuses` - status not in `ok_statuses` and `error_statuses` """whileTrue:items=self._describe_item(item_type,item_name)iflen(items)==0:raiseAirflowException(f"There is no {item_type} with identifier {item_name}")iflen(items)>1:raiseAirflowException(f"There are {len(items)}{item_type} with identifier {item_name}")ifwait_statusesanditems[0]['Status'].lower()inwait_statuses:time.sleep(self._await_interval)continueelifok_statusesanditems[0]['Status'].lower()inok_statuses:breakeliferror_statusesanditems[0]['Status'].lower()inerror_statuses:raiseAirflowException(f"Item has error status ({error_statuses}): {items[0]}")else:raiseAirflowException(f"Item has uncertain status: {items[0]}")returnNonedefexecute(self,context:'Context')->str:"""Different implementations for snapshots, tasks and events"""raiseNotImplementedErrordefon_kill(self)->None:"""Different implementations for snapshots, tasks and events"""raiseNotImplementedError
[docs]classRdsCreateDbSnapshotOperator(RdsBaseOperator):""" Creates a snapshot of a DB instance or DB cluster. The source DB instance or cluster must be in the available or storage-optimization state. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCreateDbSnapshotOperator` :param db_type: Type of the DB - either "instance" or "cluster" :param db_identifier: The identifier of the instance or cluster that you want to create the snapshot of :param db_snapshot_identifier: The identifier for the DB snapshot :param tags: A list of tags in format `[{"Key": "something", "Value": "something"},] `USER Tagging <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Tagging.html>`__ """
[docs]defexecute(self,context:'Context')->str:self.log.info("Starting to create snapshot of RDS %s '%s': %s",self.db_type,self.db_identifier,self.db_snapshot_identifier,)ifself.db_type.value=="instance":create_instance_snap=self.hook.conn.create_db_snapshot(DBInstanceIdentifier=self.db_identifier,DBSnapshotIdentifier=self.db_snapshot_identifier,Tags=self.tags,)create_response=json.dumps(create_instance_snap,default=str)self._await_status('instance_snapshot',self.db_snapshot_identifier,wait_statuses=['creating'],ok_statuses=['available'],)else:create_cluster_snap=self.hook.conn.create_db_cluster_snapshot(DBClusterIdentifier=self.db_identifier,DBClusterSnapshotIdentifier=self.db_snapshot_identifier,Tags=self.tags,)create_response=json.dumps(create_cluster_snap,default=str)self._await_status('cluster_snapshot',self.db_snapshot_identifier,wait_statuses=['creating'],ok_statuses=['available'],)returncreate_response
[docs]classRdsCopyDbSnapshotOperator(RdsBaseOperator):""" Copies the specified DB instance or DB cluster snapshot .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCopyDbSnapshotOperator` :param db_type: Type of the DB - either "instance" or "cluster" :param source_db_snapshot_identifier: The identifier of the source snapshot :param target_db_snapshot_identifier: The identifier of the target snapshot :param kms_key_id: The AWS KMS key identifier for an encrypted DB snapshot :param tags: A list of tags in format `[{"Key": "something", "Value": "something"},] `USER Tagging <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Tagging.html>`__ :param copy_tags: Whether to copy all tags from the source snapshot to the target snapshot (default False) :param pre_signed_url: The URL that contains a Signature Version 4 signed request :param option_group_name: The name of an option group to associate with the copy of the snapshot Only when db_type='instance' :param target_custom_availability_zone: The external custom Availability Zone identifier for the target Only when db_type='instance' :param source_region: The ID of the region that contains the snapshot to be copied """
[docs]defexecute(self,context:'Context')->str:self.log.info("Starting to copy snapshot '%s' as '%s'",self.source_db_snapshot_identifier,self.target_db_snapshot_identifier,)ifself.db_type.value=="instance":copy_instance_snap=self.hook.conn.copy_db_snapshot(SourceDBSnapshotIdentifier=self.source_db_snapshot_identifier,TargetDBSnapshotIdentifier=self.target_db_snapshot_identifier,KmsKeyId=self.kms_key_id,Tags=self.tags,CopyTags=self.copy_tags,PreSignedUrl=self.pre_signed_url,OptionGroupName=self.option_group_name,TargetCustomAvailabilityZone=self.target_custom_availability_zone,SourceRegion=self.source_region,)copy_response=json.dumps(copy_instance_snap,default=str)self._await_status('instance_snapshot',self.target_db_snapshot_identifier,wait_statuses=['creating'],ok_statuses=['available'],)else:copy_cluster_snap=self.hook.conn.copy_db_cluster_snapshot(SourceDBClusterSnapshotIdentifier=self.source_db_snapshot_identifier,TargetDBClusterSnapshotIdentifier=self.target_db_snapshot_identifier,KmsKeyId=self.kms_key_id,Tags=self.tags,CopyTags=self.copy_tags,PreSignedUrl=self.pre_signed_url,SourceRegion=self.source_region,)copy_response=json.dumps(copy_cluster_snap,default=str)self._await_status('cluster_snapshot',self.target_db_snapshot_identifier,wait_statuses=['copying'],ok_statuses=['available'],)returncopy_response
[docs]classRdsDeleteDbSnapshotOperator(RdsBaseOperator):""" Deletes a DB instance or cluster snapshot or terminating the copy operation .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsDeleteDbSnapshotOperator` :param db_type: Type of the DB - either "instance" or "cluster" :param db_snapshot_identifier: The identifier for the DB instance or DB cluster snapshot """
[docs]defexecute(self,context:'Context')->str:self.log.info("Starting to delete snapshot '%s'",self.db_snapshot_identifier)ifself.db_type.value=="instance":delete_instance_snap=self.hook.conn.delete_db_snapshot(DBSnapshotIdentifier=self.db_snapshot_identifier,)delete_response=json.dumps(delete_instance_snap,default=str)else:delete_cluster_snap=self.hook.conn.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier=self.db_snapshot_identifier,)delete_response=json.dumps(delete_cluster_snap,default=str)returndelete_response
[docs]classRdsStartExportTaskOperator(RdsBaseOperator):""" Starts an export of a snapshot to Amazon S3. The provided IAM role must have access to the S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsStartExportTaskOperator` :param export_task_identifier: A unique identifier for the snapshot export task. :param source_arn: The Amazon Resource Name (ARN) of the snapshot to export to Amazon S3. :param s3_bucket_name: The name of the Amazon S3 bucket to export the snapshot to. :param iam_role_arn: The name of the IAM role to use for writing to the Amazon S3 bucket. :param kms_key_id: The ID of the Amazon Web Services KMS key to use to encrypt the snapshot. :param s3_prefix: The Amazon S3 bucket prefix to use as the file name and path of the exported snapshot. :param export_only: The data to be exported from the snapshot. """
[docs]defexecute(self,context:'Context')->str:self.log.info("Starting export task %s for snapshot %s",self.export_task_identifier,self.source_arn)start_export=self.hook.conn.start_export_task(ExportTaskIdentifier=self.export_task_identifier,SourceArn=self.source_arn,S3BucketName=self.s3_bucket_name,IamRoleArn=self.iam_role_arn,KmsKeyId=self.kms_key_id,S3Prefix=self.s3_prefix,ExportOnly=self.export_only,)self._await_status('export_task',self.export_task_identifier,wait_statuses=['starting','in_progress'],ok_statuses=['complete'],error_statuses=['canceling','canceled'],)returnjson.dumps(start_export,default=str)
[docs]classRdsCancelExportTaskOperator(RdsBaseOperator):""" Cancels an export task in progress that is exporting a snapshot to Amazon S3 .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCancelExportTaskOperator` :param export_task_identifier: The identifier of the snapshot export task to cancel """
[docs]classRdsCreateEventSubscriptionOperator(RdsBaseOperator):""" Creates an RDS event notification subscription .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsCreateEventSubscriptionOperator` :param subscription_name: The name of the subscription (must be less than 255 characters) :param sns_topic_arn: The ARN of the SNS topic created for event notification :param source_type: The type of source that is generating the events. Valid values: db-instance | db-cluster | db-parameter-group | db-security-group | db-snapshot | db-cluster-snapshot | db-proxy :param event_categories: A list of event categories for a source type that you want to subscribe to `USER Events <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Events.Messages.html>`__ :param source_ids: The list of identifiers of the event sources for which events are returned :param enabled: A value that indicates whether to activate the subscription (default True)l :param tags: A list of tags in format `[{"Key": "something", "Value": "something"},] `USER Tagging <https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Tagging.html>`__ """
[docs]defexecute(self,context:'Context')->str:self.log.info("Creating event subscription '%s' to '%s'",self.subscription_name,self.sns_topic_arn)create_subscription=self.hook.conn.create_event_subscription(SubscriptionName=self.subscription_name,SnsTopicArn=self.sns_topic_arn,SourceType=self.source_type,EventCategories=self.event_categories,SourceIds=self.source_ids,Enabled=self.enabled,Tags=self.tags,)self._await_status('event_subscription',self.subscription_name,wait_statuses=['creating'],ok_statuses=['active'],)returnjson.dumps(create_subscription,default=str)
[docs]classRdsDeleteEventSubscriptionOperator(RdsBaseOperator):""" Deletes an RDS event notification subscription .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RdsDeleteEventSubscriptionOperator` :param subscription_name: The name of the RDS event notification subscription you want to delete """