Source code for airflow.providers.amazon.aws.sensors.ecs
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKING,Sequenceimportboto3fromairflow.compat.functoolsimportcached_propertyfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.ecsimport(EcsClusterStates,EcsHook,EcsTaskDefinitionStates,EcsTaskStates,)fromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
def_check_failed(current_state,target_state,failure_states):if(current_state!=target_state)and(current_stateinfailure_states):raiseAirflowException(f"Terminal state reached. Current state: {current_state}, Expected state: {target_state}")
[docs]classEcsBaseSensor(BaseSensorOperator):"""Contains general sensor behavior for Elastic Container Service."""def__init__(self,*,aws_conn_id:str|None=DEFAULT_CONN_ID,region:str|None=None,**kwargs):self.aws_conn_id=aws_conn_idself.region=regionsuper().__init__(**kwargs)@cached_property
[docs]defhook(self)->EcsHook:"""Create and return an EcsHook."""returnEcsHook(aws_conn_id=self.aws_conn_id,region_name=self.region)
@cached_property
[docs]defclient(self)->boto3.client:"""Create and return an EcsHook client."""returnself.hook.conn
[docs]classEcsClusterStateSensor(EcsBaseSensor):""" Polls the cluster state until it reaches a terminal state. Raises an AirflowException with the failure reason if a failed state is reached. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/sensor:EcsClusterStateSensor` :param cluster_name: The name of your cluster. :param target_state: Success state to watch for. (Default: "ACTIVE") :param failure_states: Fail if any of these states are reached before the Success State. (Default: "FAILED" or "INACTIVE") """
[docs]classEcsTaskDefinitionStateSensor(EcsBaseSensor):""" Polls the task definition state until it reaches a terminal state. Raises an AirflowException with the failure reason if a failed state is reached. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/sensor:EcsTaskDefinitionStateSensor` :param task_definition: The family for the latest ACTIVE revision, family and revision (family:revision ) for a specific revision in the family, or full Amazon Resource Name (ARN) of the task definition. :param target_state: Success state to watch for. (Default: "ACTIVE") """
def__init__(self,*,task_definition:str,target_state:EcsTaskDefinitionStates|None=EcsTaskDefinitionStates.ACTIVE,**kwargs,):super().__init__(**kwargs)self.task_definition=task_definitionself.target_state=target_state# There are only two possible states, so set failure_state to whatever is not the target_stateself.failure_states={(EcsTaskDefinitionStates.INACTIVEiftarget_state==EcsTaskDefinitionStates.ACTIVEelseEcsTaskDefinitionStates.ACTIVE)}
[docs]classEcsTaskStateSensor(EcsBaseSensor):""" Polls the task state until it reaches a terminal state. Raises an AirflowException with the failure reason if a failed state is reached. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/sensor:EcsTaskStateSensor` :param cluster: The short name or full Amazon Resource Name (ARN) of the cluster that hosts the task. :param task: The task ID or full ARN of the task to poll. :param target_state: Success state to watch for. (Default: "ACTIVE") :param failure_states: Fail if any of these states are reached before the Success State. (Default: "STOPPED") """