Source code for airflow.providers.cncf.kubernetes.operators.resource
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Manage a Kubernetes Resource."""from__future__importannotationsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKINGimportyamlfromkubernetes.utilsimportcreate_from_yamlfromairflow.modelsimportBaseOperatorfromairflow.providers.cncf.kubernetes.hooks.kubernetesimportKubernetesHookfromairflow.providers.cncf.kubernetes.utils.delete_fromimportdelete_from_yamlfromairflow.providers.cncf.kubernetes.utils.k8s_resource_iteratorimportk8s_resource_iteratorifTYPE_CHECKING:fromkubernetes.clientimportApiClient,CustomObjectsApi__all__=["KubernetesCreateResourceOperator","KubernetesDeleteResourceOperator"]classKubernetesResourceBaseOperator(BaseOperator):""" Abstract base class for all Kubernetes Resource operators. :param yaml_conf: string. Contains the kubernetes resources to Create or Delete :param namespace: string. Contains the namespace to create all resources inside. The namespace must preexist otherwise the resource creation will fail. If the API object in the yaml file already contains a namespace definition then this parameter has no effect. :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` for the Kubernetes cluster. """template_fields=("yaml_conf",)template_fields_renderers={"yaml_conf":"yaml"}def__init__(self,*,yaml_conf:str,namespace:str|None=None,kubernetes_conn_id:str|None=KubernetesHook.default_conn_name,custom_resource_definition:bool=False,config_file:str|None=None,**kwargs,)->None:super().__init__(**kwargs)self._namespace=namespaceself.kubernetes_conn_id=kubernetes_conn_idself.yaml_conf=yaml_confself.custom_resource_definition=custom_resource_definitionself.config_file=config_file@cached_propertydefclient(self)->ApiClient:returnself.hook.api_client@cached_propertydefcustom_object_client(self)->CustomObjectsApi:returnself.hook.custom_object_client@cached_propertydefhook(self)->KubernetesHook:hook=KubernetesHook(conn_id=self.kubernetes_conn_id,config_file=self.config_file)returnhookdefget_namespace(self)->str:ifself._namespace:returnself._namespaceelse:returnself.hook.get_namespace()or"default"defget_crd_fields(self,body:dict)->tuple[str,str,str,str]:api_version=body["apiVersion"]group=api_version[0:api_version.find("/")]version=api_version[api_version.find("/")+1:]namespace=Noneifbody.get("metadata"):metadata:dict=body.get("metadata",None)namespace=metadata.get("namespace",None)ifnamespaceisNone:namespace=self.get_namespace()plural=body["kind"].lower()+"s"returngroup,version,namespace,plural
[docs]classKubernetesCreateResourceOperator(KubernetesResourceBaseOperator):"""Create a resource in a kubernetes."""