Source code for airflow.providers.amazon.aws.operators.redshift_cluster
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromtypingimportTYPE_CHECKING,Any,Sequencefromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_clusterimportRedshiftHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classRedshiftCreateClusterOperator(BaseOperator):"""Creates a new cluster with the specified parameters. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftCreateClusterOperator` :param cluster_identifier: A unique identifier for the cluster. :param node_type: The node type to be provisioned for the cluster. Valid Values: ``ds2.xlarge``, ``ds2.8xlarge``, ``dc1.large``, ``dc1.8xlarge``, ``dc2.large``, ``dc2.8xlarge``, ``ra3.xlplus``, ``ra3.4xlarge``, and ``ra3.16xlarge``. :param master_username: The username associated with the admin user account for the cluster that is being created. :param master_user_password: The password associated with the admin user account for the cluster that is being created. :param cluster_type: The type of the cluster ``single-node`` or ``multi-node``. The default value is ``multi-node``. :param db_name: The name of the first database to be created when the cluster is created. :param number_of_nodes: The number of compute nodes in the cluster. This param require when ``cluster_type`` is ``multi-node``. :param cluster_security_groups: A list of security groups to be associated with this cluster. :param vpc_security_group_ids: A list of VPC security groups to be associated with the cluster. :param cluster_subnet_group_name: The name of a cluster subnet group to be associated with this cluster. :param availability_zone: The EC2 Availability Zone (AZ). :param preferred_maintenance_window: The time range (in UTC) during which automated cluster maintenance can occur. :param cluster_parameter_group_name: The name of the parameter group to be associated with this cluster. :param automated_snapshot_retention_period: The number of days that automated snapshots are retained. The default value is ``1``. :param manual_snapshot_retention_period: The default number of days to retain a manual snapshot. :param port: The port number on which the cluster accepts incoming connections. The Default value is ``5439``. :param cluster_version: The version of a Redshift engine software that you want to deploy on the cluster. :param allow_version_upgrade: Whether major version upgrades can be applied during the maintenance window. The Default value is ``True``. :parma publicly_accessible: Whether cluster can be accessed from a public network. :parma encrypted: Whether data in the cluster is encrypted at rest. The default value is ``False``. :parma hsm_client_certificate_identifier: Name of the HSM client certificate the Amazon Redshift cluster uses to retrieve the data. :parma hsm_configuration_identifier: Name of the HSM configuration :parma elastic_ip: The Elastic IP (EIP) address for the cluster. :parma tags: A list of tag instances :parma kms_key_id: KMS key id of encryption key. :param enhanced_vpc_routing: Whether to create the cluster with enhanced VPC routing enabled Default value is ``False``. :param additional_info: Reserved :param iam_roles: A list of IAM roles that can be used by the cluster to access other AWS services. :param maintenance_track_name: Name of the maintenance track for the cluster. :param snapshot_schedule_identifier: A unique identifier for the snapshot schedule. :param availability_zone_relocation: Enable relocation for a Redshift cluster between Availability Zones after the cluster is created. :param aqua_configuration_status: The cluster is configured to use AQUA . :param default_iam_role_arn: ARN for the IAM role. :param aws_conn_id: str = The Airflow connection used for AWS credentials. The default connection id is ``aws_default``. """
[docs]classRedshiftCreateClusterSnapshotOperator(BaseOperator):""" Creates a manual snapshot of the specified cluster. The cluster must be in the available state .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftCreateClusterSnapshotOperator` :param snapshot_identifier: A unique identifier for the snapshot that you are requesting :param cluster_identifier: The cluster identifier for which you want a snapshot :param retention_period: The number of days that a manual snapshot is retained. If the value is -1, the manual snapshot is retained indefinitely. :param wait_for_completion: Whether wait for the cluster snapshot to be in ``available`` state :param poll_interval: Time (in seconds) to wait between two consecutive calls to check state :param max_attempt: The maximum number of attempts to be made to check the state :param aws_conn_id: The Airflow connection used for AWS credentials. The default connection id is ``aws_default`` """
[docs]defexecute(self,context:Context)->Any:cluster_state=self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)ifcluster_state!="available":raiseAirflowException("Redshift cluster must be in available state. "f"Redshift cluster current state is {cluster_state}")self.redshift_hook.create_cluster_snapshot(cluster_identifier=self.cluster_identifier,snapshot_identifier=self.snapshot_identifier,retention_period=self.retention_period,)ifself.wait_for_completion:self.redshift_hook.get_conn().get_waiter("snapshot_available").wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig={"Delay":self.poll_interval,"MaxAttempts":self.max_attempt,
},)
[docs]classRedshiftDeleteClusterSnapshotOperator(BaseOperator):""" Deletes the specified manual snapshot .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftDeleteClusterSnapshotOperator` :param snapshot_identifier: A unique identifier for the snapshot that you are requesting :param cluster_identifier: The unique identifier of the cluster the snapshot was created from :param wait_for_completion: Whether wait for cluster deletion or not The default value is ``True`` :param aws_conn_id: The Airflow connection used for AWS credentials. The default connection id is ``aws_default`` :param poll_interval: Time (in seconds) to wait between two consecutive calls to check snapshot state """
[docs]classRedshiftResumeClusterOperator(BaseOperator):""" Resume a paused AWS Redshift Cluster .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftResumeClusterOperator` :param cluster_identifier: id of the AWS Redshift Cluster :param aws_conn_id: aws connection to use """
def__init__(self,*,cluster_identifier:str,aws_conn_id:str="aws_default",**kwargs,):super().__init__(**kwargs)self.cluster_identifier=cluster_identifierself.aws_conn_id=aws_conn_id# These parameters are added to address an issue with the boto3 API where the API# prematurely reports the cluster as available to receive requests. This causes the cluster# to reject initial attempts to resume the cluster despite reporting the correct state.self._attempts=10self._attempt_interval=15
[docs]defexecute(self,context:Context):redshift_hook=RedshiftHook(aws_conn_id=self.aws_conn_id)whileself._attempts>=1:try:redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)returnexceptredshift_hook.get_conn().exceptions.InvalidClusterStateFaultaserror:self._attempts=self._attempts-1ifself._attempts>0:self.log.error("Unable to resume cluster. %d attempts remaining.",self._attempts)time.sleep(self._attempt_interval)else:raiseerror
[docs]classRedshiftPauseClusterOperator(BaseOperator):""" Pause an AWS Redshift Cluster if it has status `available`. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftPauseClusterOperator` :param cluster_identifier: id of the AWS Redshift Cluster :param aws_conn_id: aws connection to use """
def__init__(self,*,cluster_identifier:str,aws_conn_id:str="aws_default",**kwargs,):super().__init__(**kwargs)self.cluster_identifier=cluster_identifierself.aws_conn_id=aws_conn_id# These parameters are added to address an issue with the boto3 API where the API# prematurely reports the cluster as available to receive requests. This causes the cluster# to reject initial attempts to pause the cluster despite reporting the correct state.self._attempts=10self._attempt_interval=15
[docs]defexecute(self,context:Context):redshift_hook=RedshiftHook(aws_conn_id=self.aws_conn_id)whileself._attempts>=1:try:redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)returnexceptredshift_hook.get_conn().exceptions.InvalidClusterStateFaultaserror:self._attempts=self._attempts-1ifself._attempts>0:self.log.error("Unable to pause cluster. %d attempts remaining.",self._attempts)time.sleep(self._attempt_interval)else:raiseerror
[docs]classRedshiftDeleteClusterOperator(BaseOperator):""" Delete an AWS Redshift cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftDeleteClusterOperator` :param cluster_identifier: unique identifier of a cluster :param skip_final_cluster_snapshot: determines cluster snapshot creation :param final_cluster_snapshot_identifier: name of final cluster snapshot :param wait_for_completion: Whether wait for cluster deletion or not The default value is ``True`` :param aws_conn_id: aws connection to use :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state """
def__init__(self,*,cluster_identifier:str,skip_final_cluster_snapshot:bool=True,final_cluster_snapshot_identifier:str|None=None,wait_for_completion:bool=True,aws_conn_id:str="aws_default",poll_interval:float=30.0,**kwargs,):super().__init__(**kwargs)self.cluster_identifier=cluster_identifierself.skip_final_cluster_snapshot=skip_final_cluster_snapshotself.final_cluster_snapshot_identifier=final_cluster_snapshot_identifierself.wait_for_completion=wait_for_completionself.poll_interval=poll_interval# These parameters are added to keep trying if there is a running operation in the cluster# If there is a running operation in the cluster while trying to delete it, a InvalidClusterStateFault# is thrown. In such case, retryingself._attempts=10self._attempt_interval=15self.redshift_hook=RedshiftHook(aws_conn_id=aws_conn_id)
[docs]defexecute(self,context:Context):whileself._attempts>=1:try:self.redshift_hook.delete_cluster(cluster_identifier=self.cluster_identifier,skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,)breakexceptself.redshift_hook.get_conn().exceptions.InvalidClusterStateFault:self._attempts=self._attempts-1ifself._attempts>0:self.log.error("Unable to delete cluster. %d attempts remaining.",self._attempts)time.sleep(self._attempt_interval)else:raiseifself.wait_for_completion:waiter=self.redshift_hook.get_conn().get_waiter("cluster_deleted")waiter.wait(ClusterIdentifier=self.cluster_identifier,WaiterConfig={"Delay":self.poll_interval,"MaxAttempts":30},