Source code for airflow.providers.amazon.aws.operators.redshift_cluster
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromcollections.abcimportSequencefromdatetimeimporttimedeltafromtypingimportTYPE_CHECKING,Anyfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_clusterimportRedshiftHookfromairflow.providers.amazon.aws.triggers.redshift_clusterimport(RedshiftCreateClusterSnapshotTrigger,RedshiftCreateClusterTrigger,RedshiftDeleteClusterTrigger,RedshiftPauseClusterTrigger,RedshiftResumeClusterTrigger,)fromairflow.providers.amazon.aws.utilsimportvalidate_execute_complete_eventfromairflow.utils.helpersimportprune_dictifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classRedshiftCreateClusterOperator(BaseOperator):""" Creates a new cluster with the specified parameters. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftCreateClusterOperator` :param cluster_identifier: A unique identifier for the cluster. :param node_type: The node type to be provisioned for the cluster. Valid Values: ``ds2.xlarge``, ``ds2.8xlarge``, ``dc1.large``, ``dc1.8xlarge``, ``dc2.large``, ``dc2.8xlarge``, ``ra3.xlplus``, ``ra3.4xlarge``, and ``ra3.16xlarge``. :param master_username: The username associated with the admin user account for the cluster that is being created. :param master_user_password: The password associated with the admin user account for the cluster that is being created. :param cluster_type: The type of the cluster ``single-node`` or ``multi-node``. The default value is ``multi-node``. :param db_name: The name of the first database to be created when the cluster is created. :param number_of_nodes: The number of compute nodes in the cluster. This param require when ``cluster_type`` is ``multi-node``. :param cluster_security_groups: A list of security groups to be associated with this cluster. :param vpc_security_group_ids: A list of VPC security groups to be associated with the cluster. :param cluster_subnet_group_name: The name of a cluster subnet group to be associated with this cluster. :param availability_zone: The EC2 Availability Zone (AZ). :param preferred_maintenance_window: The time range (in UTC) during which automated cluster maintenance can occur. :param cluster_parameter_group_name: The name of the parameter group to be associated with this cluster. :param automated_snapshot_retention_period: The number of days that automated snapshots are retained. The default value is ``1``. :param manual_snapshot_retention_period: The default number of days to retain a manual snapshot. :param port: The port number on which the cluster accepts incoming connections. The Default value is ``5439``. :param cluster_version: The version of a Redshift engine software that you want to deploy on the cluster. :param allow_version_upgrade: Whether major version upgrades can be applied during the maintenance window. The Default value is ``True``. :param publicly_accessible: Whether cluster can be accessed from a public network. :param encrypted: Whether data in the cluster is encrypted at rest. The default value is ``False``. :param hsm_client_certificate_identifier: Name of the HSM client certificate the Amazon Redshift cluster uses to retrieve the data. :param hsm_configuration_identifier: Name of the HSM configuration :param elastic_ip: The Elastic IP (EIP) address for the cluster. :param tags: A list of tag instances :param kms_key_id: KMS key id of encryption key. :param enhanced_vpc_routing: Whether to create the cluster with enhanced VPC routing enabled Default value is ``False``. :param additional_info: Reserved :param iam_roles: A list of IAM roles that can be used by the cluster to access other AWS services. :param maintenance_track_name: Name of the maintenance track for the cluster. :param snapshot_schedule_identifier: A unique identifier for the snapshot schedule. :param availability_zone_relocation: Enable relocation for a Redshift cluster between Availability Zones after the cluster is created. :param aqua_configuration_status: The cluster is configured to use AQUA . :param default_iam_role_arn: ARN for the IAM role. :param aws_conn_id: str | None = The Airflow connection used for AWS credentials. The default connection id is ``aws_default``. :param wait_for_completion: Whether wait for the cluster to be in ``available`` state :param max_attempt: The maximum number of attempts to be made. Default: 5 :param poll_interval: The amount of time in seconds to wait between attempts. Default: 60 :param deferrable: If True, the operator will run in deferrable mode """
[docs]defexecute(self,context:Context):redshift_hook=RedshiftHook(aws_conn_id=self.aws_conn_id)self.log.info("Creating Redshift cluster %s",self.cluster_identifier)params:dict[str,Any]={}ifself.db_name:params["DBName"]=self.db_nameifself.cluster_type:params["ClusterType"]=self.cluster_typeifself.cluster_type=="multi-node":params["NumberOfNodes"]=self.number_of_nodesifself.cluster_security_groups:params["ClusterSecurityGroups"]=self.cluster_security_groupsifself.vpc_security_group_ids:params["VpcSecurityGroupIds"]=self.vpc_security_group_idsifself.cluster_subnet_group_name:params["ClusterSubnetGroupName"]=self.cluster_subnet_group_nameifself.availability_zone:params["AvailabilityZone"]=self.availability_zoneifself.preferred_maintenance_window:params["PreferredMaintenanceWindow"]=self.preferred_maintenance_windowifself.cluster_parameter_group_name:params["ClusterParameterGroupName"]=self.cluster_parameter_group_nameifself.automated_snapshot_retention_period:params["AutomatedSnapshotRetentionPeriod"]=self.automated_snapshot_retention_periodifself.manual_snapshot_retention_period:params["ManualSnapshotRetentionPeriod"]=self.manual_snapshot_retention_periodifself.port:params["Port"]=self.portifself.cluster_version:params["ClusterVersion"]=self.cluster_versionifself.allow_version_upgrade:params["AllowVersionUpgrade"]=self.allow_version_upgradeifself.encrypted:params["Encrypted"]=self.encryptedifself.hsm_client_certificate_identifier:params["HsmClientCertificateIdentifier"]=self.hsm_client_certificate_identifierifself.hsm_configuration_identifier:params["HsmConfigurationIdentifier"]=self.hsm_configuration_identifierifself.elastic_ip:params["ElasticIp"]=self.elastic_ipifself.tags:params["Tags"]=self.tagsifself.kms_key_id:params["KmsKeyId"]=self.kms_key_idifself.enhanced_vpc_routing:params["EnhancedVpcRouting"]=self.enhanced_vpc_routingifself.additional_info:params["AdditionalInfo"]=self.additional_infoifself.iam_roles:params["IamRoles"]=self.iam_rolesifself.maintenance_track_name:params["MaintenanceTrackName"]=self.maintenance_track_nameifself.snapshot_schedule_identifier:params["SnapshotScheduleIdentifier"]=self.snapshot_schedule_identifierifself.availability_zone_relocation:params["AvailabilityZoneRelocation"]=self.availability_zone_relocationifself.aqua_configuration_status:params["AquaConfigurationStatus"]=self.aqua_configuration_statusifself.default_iam_role_arn:params["DefaultIamRoleArn"]=self.default_iam_role_arn# PubliclyAccessible is True by default on Redshift side, hence, we should always set it regardless# of its valueparams["PubliclyAccessible"]=self.publicly_accessiblecluster=redshift_hook.create_cluster(self.cluster_identifier,self.node_type,self.master_username,self.master_user_password,params,)ifself.deferrable:self.defer(trigger=RedshiftCreateClusterTrigger(cluster_identifier=self.cluster_identifier,waiter_delay=self.poll_interval,waiter_max_attempts=self.max_attempt,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",)ifself.wait_for_completion:redshift_hook.get_conn().get_waiter("cluster_available").wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig={"Delay":self.poll_interval,"MaxAttempts":self.max_attempt,},)self.log.info("Created Redshift cluster %s",self.cluster_identifier)self.log.info(cluster)
[docs]classRedshiftCreateClusterSnapshotOperator(BaseOperator):""" Creates a manual snapshot of the specified cluster. The cluster must be in the available state. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftCreateClusterSnapshotOperator` :param snapshot_identifier: A unique identifier for the snapshot that you are requesting :param cluster_identifier: The cluster identifier for which you want a snapshot :param retention_period: The number of days that a manual snapshot is retained. If the value is -1, the manual snapshot is retained indefinitely. :param tags: A list of tag instances :param wait_for_completion: Whether wait for the cluster snapshot to be in ``available`` state :param poll_interval: Time (in seconds) to wait between two consecutive calls to check state :param max_attempt: The maximum number of attempts to be made to check the state :param aws_conn_id: The Airflow connection used for AWS credentials. The default connection id is ``aws_default`` :param deferrable: If True, the operator will run as a deferrable operator. """
[docs]defexecute(self,context:Context)->Any:cluster_state=self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)ifcluster_state!="available":raiseAirflowException("Redshift cluster must be in available state. "f"Redshift cluster current state is {cluster_state}")self.redshift_hook.create_cluster_snapshot(cluster_identifier=self.cluster_identifier,snapshot_identifier=self.snapshot_identifier,retention_period=self.retention_period,tags=self.tags,)ifself.deferrable:self.defer(trigger=RedshiftCreateClusterSnapshotTrigger(cluster_identifier=self.cluster_identifier,waiter_delay=self.poll_interval,waiter_max_attempts=self.max_attempt,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",# timeout is set to ensure that if a trigger dies, the timeout does not restart# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)timeout=timedelta(seconds=self.max_attempt*self.poll_interval+60),)ifself.wait_for_completion:self.redshift_hook.get_conn().get_waiter("snapshot_available").wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig={"Delay":self.poll_interval,"MaxAttempts":self.max_attempt,},)
[docs]classRedshiftDeleteClusterSnapshotOperator(BaseOperator):""" Deletes the specified manual snapshot. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftDeleteClusterSnapshotOperator` :param snapshot_identifier: A unique identifier for the snapshot that you are requesting :param cluster_identifier: The unique identifier of the cluster the snapshot was created from :param wait_for_completion: Whether wait for cluster deletion or not The default value is ``True`` :param aws_conn_id: The Airflow connection used for AWS credentials. The default connection id is ``aws_default`` :param poll_interval: Time (in seconds) to wait between two consecutive calls to check snapshot state """
[docs]classRedshiftResumeClusterOperator(BaseOperator):""" Resume a paused AWS Redshift Cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftResumeClusterOperator` :param cluster_identifier: Unique identifier of the AWS Redshift cluster :param aws_conn_id: The Airflow connection used for AWS credentials. The default connection id is ``aws_default`` :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state :param max_attempts: The maximum number of attempts to check the state of the cluster. :param wait_for_completion: If True, the operator will wait for the cluster to be in the `resumed` state. Default is False. :param deferrable: If True, the operator will run as a deferrable operator. """
# These parameters are used to address an issue with the boto3 API where the API# prematurely reports the cluster as available to receive requests. This causes the cluster# to reject initial attempts to resume the cluster despite reporting the correct state.self._remaining_attempts=10self._attempt_interval=15
[docs]defexecute(self,context:Context):redshift_hook=RedshiftHook(aws_conn_id=self.aws_conn_id)self.log.info("Starting resume cluster")whileself._remaining_attempts:try:redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)breakexceptredshift_hook.get_conn().exceptions.InvalidClusterStateFaultaserror:self._remaining_attempts-=1ifself._remaining_attempts:self.log.error("Unable to resume cluster. %d attempts remaining.",self._remaining_attempts)time.sleep(self._attempt_interval)else:raiseerrorifself.wait_for_completion:ifself.deferrable:cluster_state=redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)ifcluster_state=="available":self.log.info("Resumed cluster successfully")elifcluster_state=="deleting":raiseAirflowException("Unable to resume cluster since cluster is currently in status: %s",cluster_state)else:self.defer(trigger=RedshiftResumeClusterTrigger(cluster_identifier=self.cluster_identifier,waiter_delay=self.poll_interval,waiter_max_attempts=self.max_attempts,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",# timeout is set to ensure that if a trigger dies, the timeout does not restart# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)timeout=timedelta(seconds=self.max_attempts*self.poll_interval+60),)else:waiter=redshift_hook.get_waiter("cluster_resumed")waiter.wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig={"Delay":self.poll_interval,"MaxAttempts":self.max_attempts,},)
[docs]classRedshiftPauseClusterOperator(BaseOperator):""" Pause an AWS Redshift Cluster if it has status `available`. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftPauseClusterOperator` :param cluster_identifier: id of the AWS Redshift Cluster :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param wait_for_completion: If True, waits for the cluster to be paused. (default: False) :param deferrable: Run operator in the deferrable mode :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state :param max_attempts: Maximum number of attempts to poll the cluster """
# These parameters are used to address an issue with the boto3 API where the API# prematurely reports the cluster as available to receive requests. This causes the cluster# to reject initial attempts to pause the cluster despite reporting the correct state.self._remaining_attempts=10self._attempt_interval=15
[docs]defexecute(self,context:Context):redshift_hook=RedshiftHook(aws_conn_id=self.aws_conn_id)whileself._remaining_attempts:try:redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)breakexceptredshift_hook.get_conn().exceptions.InvalidClusterStateFaultaserror:self._remaining_attempts-=1ifself._remaining_attempts:self.log.error("Unable to pause cluster. %d attempts remaining.",self._remaining_attempts)time.sleep(self._attempt_interval)else:raiseerrorifself.wait_for_completion:ifself.deferrable:cluster_state=redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)ifcluster_state=="paused":self.log.info("Paused cluster successfully")elifcluster_state=="deleting":raiseAirflowException(f"Unable to pause cluster since cluster is currently in status: {cluster_state}")else:self.defer(trigger=RedshiftPauseClusterTrigger(cluster_identifier=self.cluster_identifier,waiter_delay=self.poll_interval,waiter_max_attempts=self.max_attempts,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",# timeout is set to ensure that if a trigger dies, the timeout does not restart# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)timeout=timedelta(seconds=self.max_attempts*self.poll_interval+60),)else:waiter=redshift_hook.get_waiter("cluster_paused")waiter.wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig=prune_dict({"Delay":self.poll_interval,"MaxAttempts":self.max_attempts,}),)
[docs]classRedshiftDeleteClusterOperator(BaseOperator):""" Delete an AWS Redshift cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftDeleteClusterOperator` :param cluster_identifier: unique identifier of a cluster :param skip_final_cluster_snapshot: determines cluster snapshot creation :param final_cluster_snapshot_identifier: name of final cluster snapshot :param wait_for_completion: Whether wait for cluster deletion or not The default value is ``True`` :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state :param deferrable: Run operator in the deferrable mode. :param max_attempts: (Deferrable mode only) The maximum number of attempts to be made """
# These parameters are added to keep trying if there is a running operation in the cluster# If there is a running operation in the cluster while trying to delete it, a InvalidClusterStateFault# is thrown. In such case, retryingself._attempts=10self._attempt_interval=15
[docs]defexecute(self,context:Context):whileself._attempts:try:self.redshift_hook.delete_cluster(cluster_identifier=self.cluster_identifier,skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,)breakexceptself.redshift_hook.get_conn().exceptions.InvalidClusterStateFault:self._attempts-=1ifself._attempts:self.log.error("Unable to delete cluster. %d attempts remaining.",self._attempts)time.sleep(self._attempt_interval)else:raiseifself.deferrable:cluster_state=self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)ifcluster_state=="cluster_not_found":self.log.info("Cluster deleted successfully")elifcluster_statein("creating","modifying"):raiseAirflowException(f"Unable to delete cluster since cluster is currently in status: {cluster_state}")else:self.defer(timeout=timedelta(seconds=self.max_attempts*self.poll_interval+60),trigger=RedshiftDeleteClusterTrigger(cluster_identifier=self.cluster_identifier,waiter_delay=self.poll_interval,waiter_max_attempts=self.max_attempts,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",)elifself.wait_for_completion:waiter=self.redshift_hook.get_conn().get_waiter("cluster_deleted")waiter.wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig={"Delay":self.poll_interval,"MaxAttempts":self.max_attempts},)