Source code for airflow.providers.cncf.kubernetes.operators.resource
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Manage a Kubernetes Resource."""from__future__importannotationsimportosfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Sequenceimporttenacityimportyamlfromkubernetes.utilsimportcreate_from_yamlfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.cncf.kubernetes.hooks.kubernetesimportKubernetesHookfromairflow.providers.cncf.kubernetes.kubernetes_helper_functionsimportshould_retry_creationfromairflow.providers.cncf.kubernetes.utils.delete_fromimportdelete_from_yamlfromairflow.providers.cncf.kubernetes.utils.k8s_resource_iteratorimportk8s_resource_iteratorifTYPE_CHECKING:fromkubernetes.clientimportApiClient,CustomObjectsApi__all__=["KubernetesCreateResourceOperator","KubernetesDeleteResourceOperator"]classKubernetesResourceBaseOperator(BaseOperator):""" Abstract base class for all Kubernetes Resource operators. :param yaml_conf: string. Contains the kubernetes resources to Create or Delete :param yaml_conf_file: path to the kubernetes resources file (templated) :param namespace: string. Contains the namespace to create all resources inside. The namespace must preexist otherwise the resource creation will fail. If the API object in the yaml file already contains a namespace definition then this parameter has no effect. :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` for the Kubernetes cluster. :param namespaced: specified that Kubernetes resource is or isn't in a namespace. This parameter works only when custom_resource_definition parameter is True. """template_fields:Sequence[str]=("yaml_conf","yaml_conf_file")template_fields_renderers={"yaml_conf":"yaml"}def__init__(self,*,yaml_conf:str|None=None,yaml_conf_file:str|None=None,namespace:str|None=None,kubernetes_conn_id:str|None=KubernetesHook.default_conn_name,custom_resource_definition:bool=False,namespaced:bool=True,config_file:str|None=None,**kwargs,)->None:super().__init__(**kwargs)self._namespace=namespaceself.kubernetes_conn_id=kubernetes_conn_idself.yaml_conf=yaml_confself.yaml_conf_file=yaml_conf_fileself.custom_resource_definition=custom_resource_definitionself.namespaced=namespacedself.config_file=config_fileifnotany([self.yaml_conf,self.yaml_conf_file]):raiseAirflowException("One of `yaml_conf` or `yaml_conf_file` arguments must be provided")@cached_propertydefclient(self)->ApiClient:returnself.hook.api_client@cached_propertydefcustom_object_client(self)->CustomObjectsApi:returnself.hook.custom_object_client@cached_propertydefhook(self)->KubernetesHook:hook=KubernetesHook(conn_id=self.kubernetes_conn_id,config_file=self.config_file)returnhookdefget_namespace(self)->str:ifself._namespace:returnself._namespaceelse:returnself.hook.get_namespace()or"default"defget_crd_fields(self,body:dict)->tuple[str,str,str,str]:api_version=body["apiVersion"]group=api_version[0:api_version.find("/")]version=api_version[api_version.find("/")+1:]namespace=Noneifbody.get("metadata"):metadata:dict=body.get("metadata",None)namespace=metadata.get("namespace",None)ifnamespaceisNone:namespace=self.get_namespace()plural=body["kind"].lower()+"s"returngroup,version,namespace,plural
[docs]classKubernetesCreateResourceOperator(KubernetesResourceBaseOperator):"""Create a resource in a kubernetes."""
[docs]defexecute(self,context)->None:ifself.yaml_conf:self._create_objects(yaml.safe_load_all(self.yaml_conf))elifself.yaml_conf_fileandos.path.exists(self.yaml_conf_file):withopen(self.yaml_conf_file)asstream:self._create_objects(yaml.safe_load_all(stream))else:raiseAirflowException("File %s not found",self.yaml_conf_file)self.log.info("Resource was created")
[docs]classKubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):"""Delete a resource in a kubernetes."""
[docs]defexecute(self,context)->None:ifself.yaml_conf:self._delete_objects(yaml.safe_load_all(self.yaml_conf))elifself.yaml_conf_fileandos.path.exists(self.yaml_conf_file):withopen(self.yaml_conf_file)asstream:self._delete_objects(yaml.safe_load_all(stream))else:raiseAirflowException("File %s not found",self.yaml_conf_file)