Source code for airflow.providers.cncf.kubernetes.utils.delete_from
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.# from https://github.com/tomplus/kubernetes_asyncio/pull/239/filesfrom__future__importannotationsimportrefromkubernetesimportclientfromkubernetes.clientimportApiClient
def_delete_from_yaml_single_item(*,k8s_client:ApiClient,yml_document:dict,verbose:bool=False,namespace:str="default",body:dict|None=None,**kwargs,):ifbodyisNone:body=DEFAULT_DELETION_BODY# get group and version from apiVersiongroup,_,version=yml_document["apiVersion"].partition("/")ifversion=="":version=groupgroup="core"# Take care for the case e.g. api_type is "apiextensions.k8s.io"# Only replace the last instancegroup="".join(group.rsplit(".k8s.io",1))# convert group name from DNS subdomain format to# python class name conventiongroup="".join(word.capitalize()forwordingroup.split("."))fcn_to_call=f"{group}{version.capitalize()}Api"k8s_api=getattr(client,fcn_to_call)(k8s_client)# Replace CamelCased action_type into snake_casekind=yml_document["kind"]kind=re.sub("(.)([A-Z][a-z]+)",r"\1_\2",kind)kind=re.sub("([a-z0-9])([A-Z])",r"\1_\2",kind).lower()# Decide which namespace we are going to use for deleting the object# IMPORTANT: the docs namespace takes precedence over the namespace in args# create_from_yaml_single_item have same behaviourif"namespace"inyml_document["metadata"]:namespace=yml_document["metadata"]["namespace"]name=yml_document["metadata"]["name"]# Expect the user to delete namespaced objects more oftenresp:client.V1Statusifhasattr(k8s_api,f"delete_namespaced_{kind}"):resp=getattr(k8s_api,f"delete_namespaced_{kind}")(name=name,namespace=namespace,body=body,**kwargs)else:resp=getattr(k8s_api,f"delete_{kind}")(name=name,body=body,**kwargs)ifverbose:print(f"{kind} deleted. status='{str(resp.status)}'")returnresp
[docs]classFailToDeleteError(Exception):"""For handling error if an error occurred when handling a yaml file during deletion of the resource."""def__init__(self,api_exceptions:list):self.api_exceptions=api_exceptions
[docs]def__str__(self):msg=""forapi_exceptioninself.api_exceptions:msg+=f"Error from server ({api_exception.reason}):{api_exception.body}\n"returnmsg