Source code for airflow.providers.amazon.aws.triggers.eks
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromtypingimportAnyfrombotocore.exceptionsimportWaiterErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.eksimportEksHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classEksCreateFargateProfileTrigger(BaseTrigger):""" Trigger for EksCreateFargateProfileOperator. The trigger will asynchronously wait for the fargate profile to be created. :param cluster_name: The name of the EKS cluster :param fargate_profile_name: The name of the fargate profile :param poll_interval: The amount of time in seconds to wait between attempts. :param max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. """def__init__(self,cluster_name:str,fargate_profile_name:str,poll_interval:int,max_attempts:int,aws_conn_id:str,):self.cluster_name=cluster_nameself.fargate_profile_name=fargate_profile_nameself.poll_interval=poll_intervalself.max_attempts=max_attemptsself.aws_conn_id=aws_conn_id
[docs]asyncdefrun(self):self.hook=EksHook(aws_conn_id=self.aws_conn_id)asyncwithself.hook.async_connasclient:attempt=0waiter=client.get_waiter("fargate_profile_active")whileattempt<int(self.max_attempts):attempt+=1try:awaitwaiter.wait(clusterName=self.cluster_name,fargateProfileName=self.fargate_profile_name,WaiterConfig={"Delay":int(self.poll_interval),"MaxAttempts":1},)breakexceptWaiterErroraserror:if"terminal failure"instr(error):raiseAirflowException(f"Create Fargate Profile failed: {error}")self.log.info("Status of fargate profile is %s",error.last_response["fargateProfile"]["status"])awaitasyncio.sleep(int(self.poll_interval))ifattempt>=int(self.max_attempts):raiseAirflowException(f"Create Fargate Profile failed - max attempts reached: {self.max_attempts}")else:yieldTriggerEvent({"status":"success","message":"Fargate Profile Created"})
[docs]classEksDeleteFargateProfileTrigger(BaseTrigger):""" Trigger for EksDeleteFargateProfileOperator. The trigger will asynchronously wait for the fargate profile to be deleted. :param cluster_name: The name of the EKS cluster :param fargate_profile_name: The name of the fargate profile :param poll_interval: The amount of time in seconds to wait between attempts. :param max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. """def__init__(self,cluster_name:str,fargate_profile_name:str,poll_interval:int,max_attempts:int,aws_conn_id:str,):self.cluster_name=cluster_nameself.fargate_profile_name=fargate_profile_nameself.poll_interval=poll_intervalself.max_attempts=max_attemptsself.aws_conn_id=aws_conn_id