Source code for airflow.providers.amazon.aws.hooks.ec2

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import functools
import time
from typing import Callable, TypeVar

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.typing_compat import ParamSpec

[docs]PS = ParamSpec("PS")
[docs]RT = TypeVar("RT")
[docs]def only_client_type(func: Callable[PS, RT]) -> Callable[PS, RT]: @functools.wraps(func) def checker(*args, **kwargs) -> RT: if args[0]._api_type == "client_type": return func(*args, **kwargs) ec2_doc_link = "https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html" raise AirflowException( f""" This method is only callable when using client_type API for interacting with EC2. Create the EC2Hook object as follows to use this method ec2 = EC2Hook(api_type="client_type") Read following for details on client_type and resource_type APIs: 1. {ec2_doc_link}#client 2. {ec2_doc_link}#service-resource """ ) return checker
[docs]class EC2Hook(AwsBaseHook): """ Interact with Amazon Elastic Compute Cloud (EC2). Provide thick wrapper around :external+boto3:py:class:`boto3.client("ec2") <EC2.Client>` or :external+boto3:py:class:`boto3.resource("ec2") <EC2.ServiceResource>`. :param api_type: If set to ``client_type`` then hook use ``boto3.client("ec2")`` capabilities, If set to ``resource_type`` then hook use ``boto3.resource("ec2")`` capabilities. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs] API_TYPES = frozenset({"resource_type", "client_type"})
def __init__(self, api_type="resource_type", *args, **kwargs) -> None: if api_type not in self.API_TYPES: raise AirflowException("api_type can only be one of %s", self.API_TYPES) kwargs[api_type] = "ec2" self._api_type = api_type super().__init__(*args, **kwargs)
[docs] def get_instance(self, instance_id: str, filters: list | None = None): """ Get EC2 instance by id and return it. :param instance_id: id of the AWS EC2 instance :param filters: List of filters to specify instances to get :return: Instance object """ if self._api_type == "client_type": return self.get_instances(filters=filters, instance_ids=[instance_id])[0] return self.conn.Instance(id=instance_id)
@only_client_type
[docs] def stop_instances(self, instance_ids: list) -> dict: """ Stop instances with given ids. :param instance_ids: List of instance ids to stop :return: Dict with key `StoppingInstances` and value as list of instances being stopped """ self.log.info("Stopping instances: %s", instance_ids) return self.conn.stop_instances(InstanceIds=instance_ids)
@only_client_type
[docs] def start_instances(self, instance_ids: list) -> dict: """ Start instances with given ids. :param instance_ids: List of instance ids to start :return: Dict with key `StartingInstances` and value as list of instances being started """ self.log.info("Starting instances: %s", instance_ids) return self.conn.start_instances(InstanceIds=instance_ids)
@only_client_type
[docs] def terminate_instances(self, instance_ids: list) -> dict: """ Terminate instances with given ids. :param instance_ids: List of instance ids to terminate :return: Dict with key `TerminatingInstances` and value as list of instances being terminated """ self.log.info("Terminating instances: %s", instance_ids) return self.conn.terminate_instances(InstanceIds=instance_ids)
@only_client_type
[docs] def describe_instances(self, filters: list | None = None, instance_ids: list | None = None): """ Describe EC2 instances, optionally applying filters and selective instance ids. :param filters: List of filters to specify instances to describe :param instance_ids: List of instance IDs to describe :return: Response from EC2 describe_instances API """ filters = filters or [] instance_ids = instance_ids or [] self.log.info("Filters provided: %s", filters) self.log.info("Instance ids provided: %s", instance_ids) return self.conn.describe_instances(Filters=filters, InstanceIds=instance_ids)
@only_client_type
[docs] def get_instances(self, filters: list | None = None, instance_ids: list | None = None) -> list: """ Get list of instance details, optionally applying filters and selective instance ids. :param instance_ids: List of ids to get instances for :param filters: List of filters to specify instances to get :return: List of instances """ description = self.describe_instances(filters=filters, instance_ids=instance_ids) return [ instance for reservation in description["Reservations"] for instance in reservation["Instances"] ]
@only_client_type
[docs] def get_instance_ids(self, filters: list | None = None) -> list: """ Get list of instance ids, optionally applying filters to fetch selective instances. :param filters: List of filters to specify instances to get :return: List of instance ids """ return [instance["InstanceId"] for instance in self.get_instances(filters=filters)]
[docs] async def get_instance_state_async(self, instance_id: str) -> str: async with self.async_conn as client: response = await client.describe_instances(InstanceIds=[instance_id]) return response["Reservations"][0]["Instances"][0]["State"]["Name"]
[docs] def get_instance_state(self, instance_id: str) -> str: """ Get EC2 instance state by id and return it. :param instance_id: id of the AWS EC2 instance :return: current state of the instance """ if self._api_type == "client_type": return self.get_instances(instance_ids=[instance_id])[0]["State"]["Name"] return self.get_instance(instance_id=instance_id).state["Name"]
[docs] def wait_for_state(self, instance_id: str, target_state: str, check_interval: float) -> None: """ Wait EC2 instance until its state is equal to the target_state. :param instance_id: id of the AWS EC2 instance :param target_state: target state of instance :param check_interval: time in seconds that the job should wait in between each instance state checks until operation is completed :return: None """ instance_state = self.get_instance_state(instance_id=instance_id) while instance_state != target_state: time.sleep(check_interval) instance_state = self.get_instance_state(instance_id=instance_id) self.log.info( "instance state: %s. Same as target: %s", instance_state, instance_state == target_state )

Was this entry helpful?