Source code for
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#"""Tracking the state of Amazon EKS Clusters, Amazon EKS managed node groups, and AWS Fargate profiles.""",EKSHook,FargateProfileStates,NodegroupStates,)fromairflow.sensors.baseimportBaseSensorOperator
[docs]UNEXPECTED_TERMINAL_STATE_MSG=("Terminal state reached. Current state: {current_state}, Expected state: {target_state}"
[docs]classEKSClusterStateSensor(BaseSensorOperator):""" Check the state of an Amazon EKS Cluster until it reaches the target state or another terminal state. :param cluster_name: The name of the Cluster to watch. (templated) :type cluster_name: str :param target_state: Target state of the Cluster. (templated) :type target_state: ClusterStates :param region: Which AWS region the connection should use. (templated) If this is None or empty then the default boto3 behaviour is used. :type region: str :param aws_conn_id: The Airflow connection used for AWS credentials. (templated) If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then the default boto3 configuration would be used (and must be maintained on each worker node). :type aws_conn_id: str """
[docs]defpoke(self,context):eks_hook=EKSHook(aws_conn_id=self.aws_conn_id,region_name=self.region,)cluster_state=eks_hook.get_cluster_state(clusterName=self.cluster_name)"Cluster state: %s",cluster_state)ifcluster_statein(CLUSTER_TERMINAL_STATES-{self.target_state}):# If we reach a terminal state which is not the target state:raiseAirflowException(UNEXPECTED_TERMINAL_STATE_MSG.format(current_state=cluster_state,target_state=self.target_state))returncluster_state==self.target_state
[docs]classEKSFargateProfileStateSensor(BaseSensorOperator):""" Check the state of an AWS Fargate profile until it reaches the target state or another terminal state. :param cluster_name: The name of the Cluster which the AWS Fargate profile is attached to. (templated) :type cluster_name: str :param fargate_profile_name: The name of the Fargate profile to watch. (templated) :type fargate_profile_name: str :param target_state: Target state of the Fargate profile. (templated) :type target_state: FargateProfileStates :param region: Which AWS region the connection should use. (templated) If this is None or empty then the default boto3 behaviour is used. :type region: str :param aws_conn_id: The Airflow connection used for AWS credentials. (templated) If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then the default boto3 configuration would be used (and must be maintained on each worker node). :type aws_conn_id: str """
[docs]defpoke(self,context):eks_hook=EKSHook(aws_conn_id=self.aws_conn_id,region_name=self.region,)fargate_profile_state=eks_hook.get_fargate_profile_state(clusterName=self.cluster_name,fargateProfileName=self.fargate_profile_name)"Fargate profile state: %s",fargate_profile_state)iffargate_profile_statein(FARGATE_TERMINAL_STATES-{self.target_state}):# If we reach a terminal state which is not the target state:raiseAirflowException(UNEXPECTED_TERMINAL_STATE_MSG.format(current_state=fargate_profile_state,target_state=self.target_state))returnfargate_profile_state==self.target_state
[docs]classEKSNodegroupStateSensor(BaseSensorOperator):""" Check the state of an EKS managed node group until it reaches the target state or another terminal state. :param cluster_name: The name of the Cluster which the Nodegroup is attached to. (templated) :type cluster_name: str :param nodegroup_name: The name of the Nodegroup to watch. (templated) :type nodegroup_name: str :param target_state: Target state of the Nodegroup. (templated) :type target_state: NodegroupStates :param region: Which AWS region the connection should use. (templated) If this is None or empty then the default boto3 behaviour is used. :type region: str :param aws_conn_id: The Airflow connection used for AWS credentials. (templated) If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then the default boto3 configuration would be used (and must be maintained on each worker node). :type aws_conn_id: str """
[docs]defpoke(self,context):eks_hook=EKSHook(aws_conn_id=self.aws_conn_id,region_name=self.region,)nodegroup_state=eks_hook.get_nodegroup_state(clusterName=self.cluster_name,nodegroupName=self.nodegroup_name)"Nodegroup state: %s",nodegroup_state)ifnodegroup_statein(NODEGROUP_TERMINAL_STATES-{self.target_state}):# If we reach a terminal state which is not the target state:raiseAirflowException(UNEXPECTED_TERMINAL_STATE_MSG.format(current_state=nodegroup_state,target_state=self.target_state))returnnodegroup_state==self.target_state