Source code for airflow.providers.amazon.aws.hooks.ecs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromcollectionsimportdequefromdatetimeimportdatetime,timedeltafromenumimportEnumfromloggingimportLoggerfromthreadingimportEvent,ThreadfromtypingimportGeneratorfrombotocore.exceptionsimportClientError,ConnectionClosedErrorfrombotocore.waiterimportWaiterfromairflow.providers.amazon.aws.exceptionsimportEcsOperatorError,EcsTaskFailToStartfromairflow.providers.amazon.aws.hooks.base_awsimportAwsGenericHookfromairflow.providers.amazon.aws.hooks.logsimportAwsLogsHookfromairflow.typing_compatimportProtocol,runtime_checkable
[docs]defshould_retry(exception:Exception):"""Check if exception is related to ECS resource quota (CPU, MEM)."""ifisinstance(exception,EcsOperatorError):returnany(quota_reasoninfailure["reason"]forquota_reasonin["RESOURCE:MEMORY","RESOURCE:CPU"]forfailureinexception.failures)returnFalse
[docs]defshould_retry_eni(exception:Exception):"""Check if exception is related to ENI (Elastic Network Interfaces)."""ifisinstance(exception,EcsTaskFailToStart):returnany(eni_reasoninexception.messageforeni_reasonin["network interface provisioning","ResourceInitializationError"])returnFalse
[docs]classEcsClusterStates(Enum):"""Contains the possible State values of an ECS Cluster."""
[docs]classEcsHook(AwsGenericHook):""" Interact with Amazon Elastic Container Service (ECS). Provide thin wrapper around :external+boto3:py:class:`boto3.client("ecs") <ECS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` - `Amazon Elastic Container Service \ <https://docs.aws.amazon.com/AmazonECS/latest/APIReference/Welcome.html>`__ """def__init__(self,*args,**kwargs)->None:kwargs["client_type"]="ecs"super().__init__(*args,**kwargs)
[docs]defget_cluster_state(self,cluster_name:str)->str:""" Get ECS Cluster state. .. seealso:: - :external+boto3:py:meth:`ECS.Client.describe_clusters` :param cluster_name: ECS Cluster name or full cluster Amazon Resource Name (ARN) entry. """returnself.conn.describe_clusters(clusters=[cluster_name])["clusters"][0]["status"]
[docs]defget_task_definition_state(self,task_definition:str)->str:""" Get ECS Task Definition state. .. seealso:: - :external+boto3:py:meth:`ECS.Client.describe_task_definition` :param task_definition: The family for the latest ACTIVE revision, family and revision ( family:revision ) for a specific revision in the family, or full Amazon Resource Name (ARN) of the task definition to describe. """returnself.conn.describe_task_definition(taskDefinition=task_definition)["taskDefinition"]["status"]
[docs]defget_task_state(self,cluster,task)->str:""" Get ECS Task state. .. seealso:: - :external+boto3:py:meth:`ECS.Client.describe_tasks` :param cluster: The short name or full Amazon Resource Name (ARN) of the cluster that hosts the task or tasks to describe. :param task: Task ID or full ARN entry. """returnself.conn.describe_tasks(cluster=cluster,tasks=[task])["tasks"][0]["lastStatus"]
[docs]classEcsTaskLogFetcher(Thread):""" Fetches Cloudwatch log events with specific interval as a thread and sends the log events to the info channel of the provided logger. """def__init__(self,*,log_group:str,log_stream_name:str,fetch_interval:timedelta,logger:Logger,aws_conn_id:str|None="aws_default",region_name:str|None=None,):super().__init__()self._event=Event()self.fetch_interval=fetch_intervalself.logger=loggerself.log_group=log_groupself.log_stream_name=log_stream_nameself.hook=AwsLogsHook(aws_conn_id=aws_conn_id,region_name=region_name)
[docs]classEcsProtocol(Protocol):""" A structured Protocol for ``boto3.client('ecs')``. This is used for type hints on :py:meth:`.EcsOperator.client`. .. seealso:: - https://mypy.readthedocs.io/en/latest/protocols.html - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html """