Source code for airflow.providers.amazon.aws.hooks.elasticache_replication_group

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from time import sleep

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


[docs]class ElastiCacheReplicationGroupHook(AwsBaseHook): """ Interact with Amazon ElastiCache. Provide thick wrapper around :external+boto3:py:class:`boto3.client("elasticache") <ElastiCache.Client>`. :param max_retries: Max retries for checking availability of and deleting replication group If this is not supplied then this is defaulted to 10 :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to 1 :param initial_poke_interval: Initial sleep time in seconds If this is not supplied then this is defaulted to 60 seconds Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs] TERMINAL_STATES = frozenset({"available", "create-failed", "deleting"})
def __init__( self, max_retries: int = 10, exponential_back_off_factor: float = 1, initial_poke_interval: float = 60, *args, **kwargs, ): self.max_retries = max_retries self.exponential_back_off_factor = exponential_back_off_factor self.initial_poke_interval = initial_poke_interval kwargs["client_type"] = "elasticache" super().__init__(*args, **kwargs)
[docs] def create_replication_group(self, config: dict) -> dict: """ Creates a Redis (cluster mode disabled) or a Redis (cluster mode enabled) replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.create_replication_group` :param config: Configuration for creating the replication group :return: Response from ElastiCache create replication group API """ return self.conn.create_replication_group(**config)
[docs] def delete_replication_group(self, replication_group_id: str) -> dict: """ Deletes an existing replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.delete_replication_group` :param replication_group_id: ID of replication group to delete :return: Response from ElastiCache delete replication group API """ return self.conn.delete_replication_group(ReplicationGroupId=replication_group_id)
[docs] def describe_replication_group(self, replication_group_id: str) -> dict: """ Get information about a particular replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.describe_replication_groups` :param replication_group_id: ID of replication group to describe :return: Response from ElastiCache describe replication group API """ return self.conn.describe_replication_groups(ReplicationGroupId=replication_group_id)
[docs] def get_replication_group_status(self, replication_group_id: str) -> str: """ Get current status of replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.describe_replication_groups` :param replication_group_id: ID of replication group to check for status :return: Current status of replication group """ return self.describe_replication_group(replication_group_id)["ReplicationGroups"][0]["Status"]
[docs] def is_replication_group_available(self, replication_group_id: str) -> bool: """ Helper for checking if replication group is available or not :param replication_group_id: ID of replication group to check for availability :return: True if available else False """ return self.get_replication_group_status(replication_group_id) == "available"
[docs] def wait_for_availability( self, replication_group_id: str, initial_sleep_time: float | None = None, exponential_back_off_factor: float | None = None, max_retries: int | None = None, ) -> bool: """ Check if replication group is available or not by performing a describe over it :param replication_group_id: ID of replication group to check for availability :param initial_sleep_time: Initial sleep time in seconds If this is not supplied then this is defaulted to class level value :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to class level value :param max_retries: Max retries for checking availability of replication group If this is not supplied then this is defaulted to class level value :return: True if replication is available else False """ sleep_time = initial_sleep_time or self.initial_poke_interval exponential_back_off_factor = exponential_back_off_factor or self.exponential_back_off_factor max_retries = max_retries or self.max_retries num_tries = 0 status = "not-found" stop_poking = False while not stop_poking and num_tries <= max_retries: status = self.get_replication_group_status(replication_group_id=replication_group_id) stop_poking = status in self.TERMINAL_STATES self.log.info( "Current status of replication group with ID %s is %s", replication_group_id, status ) if not stop_poking: num_tries += 1 # No point in sleeping if all tries have exhausted if num_tries > max_retries: break self.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...", num_tries, sleep_time) sleep(sleep_time) sleep_time *= exponential_back_off_factor if status != "available": self.log.warning('Replication group is not available. Current status is "%s"', status) return False return True
[docs] def wait_for_deletion( self, replication_group_id: str, initial_sleep_time: float | None = None, exponential_back_off_factor: float | None = None, max_retries: int | None = None, ): """ Helper for deleting a replication group ensuring it is either deleted or can't be deleted :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second If this is not supplied then this is defaulted to class level value :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to class level value :param max_retries: Max retries for checking availability of replication group If this is not supplied then this is defaulted to class level value :return: Response from ElastiCache delete replication group API and flag to identify if deleted or not """ deleted = False sleep_time = initial_sleep_time or self.initial_poke_interval exponential_back_off_factor = exponential_back_off_factor or self.exponential_back_off_factor max_retries = max_retries or self.max_retries num_tries = 0 response = None while not deleted and num_tries <= max_retries: try: status = self.get_replication_group_status(replication_group_id=replication_group_id) self.log.info( "Current status of replication group with ID %s is %s", replication_group_id, status ) # Can only delete if status is `available` # Status becomes `deleting` after this call so this will only run once if status == "available": self.log.info("Initiating delete and then wait for it to finish") response = self.delete_replication_group(replication_group_id=replication_group_id) except self.conn.exceptions.ReplicationGroupNotFoundFault: self.log.info("Replication group with ID '%s' does not exist", replication_group_id) deleted = True # This should never occur as we only issue a delete request when status is `available` # which is a valid status for deletion. Still handling for safety. except self.conn.exceptions.InvalidReplicationGroupStateFault as exp: # status Error Response # creating - Cache cluster <cluster_id> is not in a valid state to be deleted. # deleting - Replication group <replication_group_id> has status deleting which is not valid # for deletion. # modifying - Replication group <replication_group_id> has status deleting which is not valid # for deletion. message = exp.response["Error"]["Message"] self.log.warning("Received error message from AWS ElastiCache API : %s", message) if not deleted: num_tries += 1 # No point in sleeping if all tries have exhausted if num_tries > max_retries: break self.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...", num_tries, sleep_time) sleep(sleep_time) sleep_time *= exponential_back_off_factor return response, deleted
[docs] def ensure_delete_replication_group( self, replication_group_id: str, initial_sleep_time: float | None = None, exponential_back_off_factor: float | None = None, max_retries: int | None = None, ) -> dict: """ Delete a replication group ensuring it is either deleted or can't be deleted :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second If this is not supplied then this is defaulted to class level value :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to class level value :param max_retries: Max retries for checking availability of replication group If this is not supplied then this is defaulted to class level value :return: Response from ElastiCache delete replication group API :raises AirflowException: If replication group is not deleted """ self.log.info("Deleting replication group with ID %s", replication_group_id) response, deleted = self.wait_for_deletion( replication_group_id=replication_group_id, initial_sleep_time=initial_sleep_time, exponential_back_off_factor=exponential_back_off_factor, max_retries=max_retries, ) if not deleted: raise AirflowException(f'Replication group could not be deleted. Response "{response}"') return response

Was this entry helpful?