Source code for airflow.providers.cncf.kubernetes.hooks.kubernetes
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportcontextlibimportjsonimporttempfilefromcollections.abcimportGeneratorfromfunctoolsimportcached_propertyfromtimeimportsleepfromtypingimportTYPE_CHECKING,Anyimportaiofilesimportrequestsimporttenacityfromasgiref.syncimportsync_to_asyncfromkubernetesimportclient,config,utils,watchfromkubernetes.client.modelsimportV1Deploymentfromkubernetes.configimportConfigExceptionfromkubernetes_asyncioimportclientasasync_client,configasasync_configfromurllib3.exceptionsimportHTTPErrorfromairflow.exceptionsimportAirflowException,AirflowNotFoundExceptionfromairflow.hooks.baseimportBaseHookfromairflow.modelsimportConnectionfromairflow.providers.cncf.kubernetes.kube_clientimport_disable_verify_ssl,_enable_tcp_keepalivefromairflow.providers.cncf.kubernetes.kubernetes_helper_functionsimportshould_retry_creationfromairflow.providers.cncf.kubernetes.utils.pod_managerimport(PodOperatorHookProtocol,container_is_completed,container_is_running,)fromairflow.utilsimportyamlifTYPE_CHECKING:fromkubernetes.clientimportV1JobListfromkubernetes.client.modelsimportV1Job,V1Pod
[docs]LOADING_KUBE_CONFIG_FILE_RESOURCE="Loading Kubernetes configuration file kube_config from {}..."
def_load_body_to_dict(body:str)->dict:try:body_dict=yaml.safe_load(body)exceptyaml.YAMLErrorase:raiseAirflowException(f"Exception when loading resource definition: {e}\n")returnbody_dict
[docs]classKubernetesHook(BaseHook,PodOperatorHookProtocol):""" Creates Kubernetes API connection. - use in cluster configuration by using extra field ``in_cluster`` in connection - use custom config by providing path to the file using extra field ``kube_config_path`` in connection - use custom configuration by providing content of kubeconfig file via extra field ``kube_config`` in connection - use default config by providing no extras This hook check for configuration option in the above order. Once an option is present it will use this configuration. .. seealso:: For more information about Kubernetes connection: :doc:`/connections/kubernetes` :param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>` to Kubernetes cluster. :param client_configuration: Optional dictionary of client configuration params. Passed on to kubernetes client. :param cluster_context: Optionally specify a context to use (e.g. if you have multiple in your kubeconfig. :param config_file: Path to kubeconfig file. :param in_cluster: Set to ``True`` if running from within a kubernetes cluster. :param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled. :param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic. """
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom field behaviour."""return{"hidden_fields":["host","schema","login","password","port","extra"],"relabeling":{},}
[docs]defget_connection(cls,conn_id:str)->Connection:""" Return requested connection. If missing and conn_id is "kubernetes_default", will return empty connection so that hook will default to cluster-derived credentials. """try:returnsuper().get_connection(conn_id)exceptAirflowNotFoundException:ifconn_id==cls.default_conn_name:returnConnection(conn_id=cls.default_conn_name)else:raise
def_get_field(self,field_name):""" Handle backcompat for extra fields. Prior to Airflow 2.3, in order to make use of UI customizations for extra fields, we needed to store them with the prefix ``extra__kubernetes__``. This method handles the backcompat, i.e. if the extra dict contains prefixed fields. """iffield_name.startswith("extra__"):raiseValueError(f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix "f"when using this method.")iffield_nameinself.conn_extras:returnself.conn_extras[field_name]orNoneprefixed_name=f"extra__kubernetes__{field_name}"returnself.conn_extras.get(prefixed_name)orNone
[docs]defget_conn(self)->client.ApiClient:"""Return kubernetes api session for use with requests."""in_cluster=self._coalesce_param(self.in_cluster,self._get_field("in_cluster"))cluster_context=self._coalesce_param(self.cluster_context,self._get_field("cluster_context"))kubeconfig_path=self._coalesce_param(self.config_file,self._get_field("kube_config_path"))kubeconfig=self._get_field("kube_config")num_selected_configuration=sum(1foroin[in_cluster,kubeconfig,kubeconfig_path]ifo)ifnum_selected_configuration>1:raiseAirflowException("Invalid connection configuration. Options kube_config_path, ""kube_config, in_cluster are mutually exclusive. ""You can only use one option at a time.")disable_verify_ssl=self._coalesce_param(self.disable_verify_ssl,_get_bool(self._get_field("disable_verify_ssl")))disable_tcp_keepalive=self._coalesce_param(self.disable_tcp_keepalive,_get_bool(self._get_field("disable_tcp_keepalive")))ifdisable_verify_sslisTrue:_disable_verify_ssl()ifdisable_tcp_keepaliveisnotTrue:_enable_tcp_keepalive()ifin_cluster:self.log.debug("loading kube_config from: in_cluster configuration")self._is_in_cluster=Trueconfig.load_incluster_config()returnclient.ApiClient()ifkubeconfig_pathisnotNone:self.log.debug("loading kube_config from: %s",kubeconfig_path)self._is_in_cluster=Falseconfig.load_kube_config(config_file=kubeconfig_path,client_configuration=self.client_configuration,context=cluster_context,)returnclient.ApiClient()ifkubeconfigisnotNone:withtempfile.NamedTemporaryFile()astemp_config:self.log.debug("loading kube_config from: connection kube_config")ifisinstance(kubeconfig,dict):kubeconfig=json.dumps(kubeconfig)temp_config.write(kubeconfig.encode())temp_config.flush()self._is_in_cluster=Falseconfig.load_kube_config(config_file=temp_config.name,client_configuration=self.client_configuration,context=cluster_context,)returnclient.ApiClient()returnself._get_default_client(cluster_context=cluster_context)
def_get_default_client(self,*,cluster_context:str|None=None)->client.ApiClient:# if we get here, then no configuration has been supplied# we should try in_cluster since that's most likely# but failing that just load assuming a kubeconfig file# in the default locationtry:config.load_incluster_config(client_configuration=self.client_configuration)self._is_in_cluster=TrueexceptConfigException:self.log.debug("loading kube_config from: default file")self._is_in_cluster=Falseconfig.load_kube_config(client_configuration=self.client_configuration,context=cluster_context,)returnclient.ApiClient()@property
[docs]defis_in_cluster(self)->bool:"""Expose whether the hook is configured with ``load_incluster_config`` or not."""ifself._is_in_clusterisnotNone:returnself._is_in_clusterself.api_client# so we can determine if we are in_cluster or notifTYPE_CHECKING:assertself._is_in_clusterisnotNonereturnself._is_in_cluster
@cached_property
[docs]defapi_client(self)->client.ApiClient:"""Cached Kubernetes API client."""returnself.get_conn()
[docs]defcreate_custom_object(self,group:str,version:str,plural:str,body:str|dict,namespace:str|None=None):""" Create custom resource definition object in Kubernetes. :param group: api group :param version: api version :param plural: api plural :param body: crd object definition :param namespace: kubernetes namespace """api:client.CustomObjectsApi=self.custom_object_clientifisinstance(body,str):body_dict=_load_body_to_dict(body)else:body_dict=bodyresponse=api.create_namespaced_custom_object(group=group,version=version,namespace=namespaceorself.get_namespace()orself.DEFAULT_NAMESPACE,plural=plural,body=body_dict,)self.log.debug("Response: %s",response)returnresponse
[docs]defget_custom_object(self,group:str,version:str,plural:str,name:str,namespace:str|None=None):""" Get custom resource definition object from Kubernetes. :param group: api group :param version: api version :param plural: api plural :param name: crd object name :param namespace: kubernetes namespace """api=client.CustomObjectsApi(self.api_client)response=api.get_namespaced_custom_object(group=group,version=version,namespace=namespaceorself.get_namespace()orself.DEFAULT_NAMESPACE,plural=plural,name=name,)returnresponse
[docs]defdelete_custom_object(self,group:str,version:str,plural:str,name:str,namespace:str|None=None,**kwargs):""" Delete custom resource definition object from Kubernetes. :param group: api group :param version: api version :param plural: api plural :param name: crd object name :param namespace: kubernetes namespace """api=client.CustomObjectsApi(self.api_client)returnapi.delete_namespaced_custom_object(group=group,version=version,namespace=namespaceorself.get_namespace()orself.DEFAULT_NAMESPACE,plural=plural,name=name,**kwargs,)
[docs]defget_namespace(self)->str|None:"""Return the namespace that defined in the connection."""ifself.conn_id:returnself._get_field("namespace")returnNone
[docs]defget_xcom_sidecar_container_image(self):"""Return the xcom sidecar image that defined in the connection."""returnself._get_field("xcom_sidecar_container_image")
[docs]defget_xcom_sidecar_container_resources(self):"""Return the xcom sidecar resources that defined in the connection."""field=self._get_field("xcom_sidecar_container_resources")ifnotfield:returnNonereturnjson.loads(field)
[docs]defget_pod_log_stream(self,pod_name:str,container:str|None="",namespace:str|None=None,)->tuple[watch.Watch,Generator[str,None,None]]:""" Retrieve a log stream for a container in a kubernetes pod. :param pod_name: pod name :param container: container name :param namespace: kubernetes namespace """watcher=watch.Watch()return(watcher,watcher.stream(self.core_v1_client.read_namespaced_pod_log,name=pod_name,container=container,namespace=namespaceorself.get_namespace()orself.DEFAULT_NAMESPACE,),)
[docs]defget_pod_logs(self,pod_name:str,container:str|None="",namespace:str|None=None,):""" Retrieve a container's log from the specified pod. :param pod_name: pod name :param container: container name :param namespace: kubernetes namespace """returnself.core_v1_client.read_namespaced_pod_log(name=pod_name,container=container,_preload_content=False,namespace=namespaceorself.get_namespace()orself.DEFAULT_NAMESPACE,)
[docs]defget_pod(self,name:str,namespace:str)->V1Pod:"""Read pod object from kubernetes API."""returnself.core_v1_client.read_namespaced_pod(name=name,namespace=namespace,)
[docs]defget_namespaced_pod_list(self,label_selector:str|None="",namespace:str|None=None,watch:bool=False,**kwargs,):""" Retrieve a list of Kind pod which belong default kubernetes namespace. :param label_selector: A selector to restrict the list of returned objects by their labels :param namespace: kubernetes namespace :param watch: Watch for changes to the described resources and return them as a stream """returnself.core_v1_client.list_namespaced_pod(namespace=namespaceorself.get_namespace()orself.DEFAULT_NAMESPACE,watch=watch,label_selector=label_selector,_preload_content=False,**kwargs,)
[docs]defget_deployment_status(self,name:str,namespace:str="default",**kwargs,)->V1Deployment:""" Get status of existing Deployment. :param name: Name of Deployment to retrieve :param namespace: Deployment namespace """returnself.apps_v1_client.read_namespaced_deployment_status(name=name,namespace=namespace,pretty=True,**kwargs)
[docs]defcreate_job(self,job:V1Job,**kwargs,)->V1Job:""" Run Job. :param job: A kubernetes Job object """sanitized_job=self.batch_v1_client.api_client.sanitize_for_serialization(job)json_job=json.dumps(sanitized_job,indent=2)self.log.debug("Job Creation Request: \n%s",json_job)try:resp=self.batch_v1_client.create_namespaced_job(body=sanitized_job,namespace=job.metadata.namespace,**kwargs)self.log.debug("Job Creation Response: %s",resp)exceptExceptionase:self.log.exception("Exception when attempting to create Namespaced Job: %s",str(json_job).replace("\n"," "))raiseereturnresp
[docs]defget_job(self,job_name:str,namespace:str)->V1Job:""" Get Job of specified name and namespace. :param job_name: Name of Job to fetch. :param namespace: Namespace of the Job. :return: Job object """returnself.batch_v1_client.read_namespaced_job(name=job_name,namespace=namespace,pretty=True)
[docs]defget_job_status(self,job_name:str,namespace:str)->V1Job:""" Get job with status of specified name and namespace. :param job_name: Name of Job to fetch. :param namespace: Namespace of the Job. :return: Job object """returnself.batch_v1_client.read_namespaced_job_status(name=job_name,namespace=namespace,pretty=True)
[docs]defwait_until_job_complete(self,job_name:str,namespace:str,job_poll_interval:float=10)->V1Job:""" Block job of specified name and namespace until it is complete or failed. :param job_name: Name of Job to fetch. :param namespace: Namespace of the Job. :param job_poll_interval: Interval in seconds between polling the job status :return: Job object """whileTrue:self.log.info("Requesting status for the job '%s' ",job_name)job:V1Job=self.get_job_status(job_name=job_name,namespace=namespace)ifself.is_job_complete(job=job):returnjobself.log.info("The job '%s' is incomplete. Sleeping for %i sec.",job_name,job_poll_interval)sleep(job_poll_interval)
[docs]deflist_jobs_all_namespaces(self)->V1JobList:""" Get list of Jobs from all namespaces. :return: V1JobList object """returnself.batch_v1_client.list_job_for_all_namespaces(pretty=True)
[docs]deflist_jobs_from_namespace(self,namespace:str)->V1JobList:""" Get list of Jobs from dedicated namespace. :param namespace: Namespace of the Job. :return: V1JobList object """returnself.batch_v1_client.list_namespaced_job(namespace=namespace,pretty=True)
[docs]defis_job_complete(self,job:V1Job)->bool:""" Check whether the given job is complete (with success or fail). :return: Boolean indicating that the given job is complete. """ifstatus:=job.status:ifconditions:=status.conditions:iffinal_condition_types:=list(cforcinconditionsifc.typeinJOB_FINAL_STATUS_CONDITION_TYPESandc.status):s="s"iflen(final_condition_types)>1else""self.log.info("The job '%s' state%s: %s",job.metadata.name,s,", ".join(f"{c.type} at {c.last_transition_time}"forcinfinal_condition_types),)returnTruereturnFalse
@staticmethod
[docs]defis_job_failed(job:V1Job)->str|bool:""" Check whether the given job is failed. :return: Error message if the job is failed, and False otherwise. """ifstatus:=job.status:conditions=status.conditionsor[]iffail_condition:=next((cforcinconditionsifc.type=="Failed"andc.status),None):returnfail_condition.reasonreturnFalse
@staticmethod
[docs]defis_job_successful(job:V1Job)->str|bool:""" Check whether the given job is completed successfully.. :return: Error message if the job is failed, and False otherwise. """ifstatus:=job.status:conditions=status.conditionsor[]returnbool(next((cforcinconditionsifc.type=="Complete"andc.status),None))returnFalse
[docs]defpatch_namespaced_job(self,job_name:str,namespace:str,body:object)->V1Job:""" Update the specified Job. :param job_name: name of the Job :param namespace: the namespace to run within kubernetes :param body: json object with parameters for update """returnself.batch_v1_client.patch_namespaced_job(name=job_name,namespace=namespace,body=body,)
[docs]defapply_from_yaml_file(self,api_client:Any=None,yaml_file:str|None=None,yaml_objects:list[dict]|None=None,verbose:bool=False,namespace:str="default",):""" Perform an action from a yaml file. :param api_client: A Kubernetes client application. :param yaml_file: Contains the path to yaml file. :param yaml_objects: List of YAML objects; used instead of reading the yaml_file. :param verbose: If True, print confirmation from create action. Default is False. :param namespace: Contains the namespace to create all resources inside. The namespace must preexist otherwise the resource creation will fail. """utils.create_from_yaml(k8s_client=api_clientorself.api_client,yaml_objects=yaml_objects,yaml_file=yaml_file,verbose=verbose,namespace=namespaceorself.get_namespace(),)
[docs]defcheck_kueue_deployment_running(self,name:str,namespace:str,timeout:float=300.0,polling_period_seconds:float=2.0)->None:_timeout=timeoutwhile_timeout>0:try:deployment=self.get_deployment_status(name=name,namespace=namespace)exceptExceptionase:self.log.exception("Exception occurred while checking for Deployment status.")raiseedeployment_status=V1Deployment.to_dict(deployment)["status"]replicas=deployment_status["replicas"]ready_replicas=deployment_status["ready_replicas"]unavailable_replicas=deployment_status["unavailable_replicas"]if(replicasisnotNoneandready_replicasisnotNoneandunavailable_replicasisNoneandreplicas==ready_replicas):returnelse:self.log.info("Waiting until Deployment will be ready...")sleep(polling_period_seconds)_timeout-=polling_period_secondsraiseAirflowException("Deployment timed out")
@staticmethod
[docs]defget_yaml_content_from_file(kueue_yaml_url)->list[dict]:"""Download content of YAML file and separate it into several dictionaries."""response=requests.get(kueue_yaml_url,allow_redirects=True)ifresponse.status_code!=200:raiseAirflowException("Was not able to read the yaml file from given URL")returnlist(yaml.safe_load_all(response.text))
def_get_bool(val)->bool|None:"""Convert val to bool if can be done with certainty; if we cannot infer intention we return None."""ifisinstance(val,bool):returnvalelifisinstance(val,str):ifval.strip().lower()=="true":returnTrueelifval.strip().lower()=="false":returnFalsereturnNone
[docs]classAsyncKubernetesHook(KubernetesHook):"""Hook to use Kubernetes SDK asynchronously."""def__init__(self,config_dict:dict|None=None,*args,**kwargs):super().__init__(*args,**kwargs)
self._extras:dict|None=Noneasyncdef_load_config(self):"""Return Kubernetes API session for use with requests."""in_cluster=self._coalesce_param(self.in_cluster,awaitself._get_field("in_cluster"))cluster_context=self._coalesce_param(self.cluster_context,awaitself._get_field("cluster_context"))kubeconfig_path=awaitself._get_field("kube_config_path")kubeconfig=awaitself._get_field("kube_config")num_selected_configuration=sum(1foroin[in_cluster,kubeconfig,kubeconfig_path,self.config_dict]ifo)asyncdefapi_client_from_kubeconfig_file(_kubeconfig_path:str|None):awaitasync_config.load_kube_config(config_file=_kubeconfig_path,client_configuration=self.client_configuration,context=cluster_context,)returnasync_client.ApiClient()ifnum_selected_configuration>1:raiseAirflowException("Invalid connection configuration. Options kube_config_path, ""kube_config, in_cluster are mutually exclusive. ""You can only use one option at a time.")ifin_cluster:self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("within a pod"))self._is_in_cluster=Trueasync_config.load_incluster_config()returnasync_client.ApiClient()ifself.config_dict:self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config dictionary"))self._is_in_cluster=Falseawaitasync_config.load_kube_config_from_dict(self.config_dict)returnasync_client.ApiClient()ifkubeconfig_pathisnotNone:self.log.debug("loading kube_config from: %s",kubeconfig_path)self._is_in_cluster=Falsereturnawaitapi_client_from_kubeconfig_file(kubeconfig_path)ifkubeconfigisnotNone:asyncwithaiofiles.tempfile.NamedTemporaryFile()astemp_config:self.log.debug("Reading kubernetes configuration file from connection ""object and writing temporary config file with its content",)awaittemp_config.write(kubeconfig.encode())awaittemp_config.flush()self._is_in_cluster=Falsereturnawaitapi_client_from_kubeconfig_file(temp_config.name)self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("default configuration file"))awaitasync_config.load_kube_config(client_configuration=self.client_configuration,context=cluster_context,)
asyncdef_get_field(self,field_name):iffield_name.startswith("extra__"):raiseValueError(f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix ""when using this method.")extras=awaitself.get_conn_extras()iffield_nameinextras:returnextras.get(field_name)prefixed_name=f"extra__kubernetes__{field_name}"returnextras.get(prefixed_name)@contextlib.asynccontextmanager
[docs]asyncdefget_pod(self,name:str,namespace:str)->V1Pod:""" Get pod's object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:v1_api=async_client.CoreV1Api(connection)pod:V1Pod=awaitv1_api.read_namespaced_pod(name=name,namespace=namespace,)returnpod
[docs]asyncdefdelete_pod(self,name:str,namespace:str):""" Delete pod's object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:try:v1_api=async_client.CoreV1Api(connection)awaitv1_api.delete_namespaced_pod(name=name,namespace=namespace,body=client.V1DeleteOptions())exceptasync_client.ApiExceptionase:# If the pod is already deletedifstr(e.status)!="404":raise
[docs]asyncdefread_logs(self,name:str,namespace:str):""" Read logs inside the pod while starting containers inside. All the logs will be outputted with its timestamp to track the logs after the execution of the pod is completed. The method is used for async output of the logs only in the pod failed it execution or the task was cancelled by the user. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:try:v1_api=async_client.CoreV1Api(connection)logs=awaitv1_api.read_namespaced_pod_log(name=name,namespace=namespace,follow=False,timestamps=True,)logs=logs.splitlines()forlineinlogs:self.log.info("Container logs from %s",line)returnlogsexceptHTTPError:self.log.exception("There was an error reading the kubernetes API.")raise
[docs]asyncdefget_job_status(self,name:str,namespace:str)->V1Job:""" Get job's status object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:v1_api=async_client.BatchV1Api(connection)job:V1Job=awaitv1_api.read_namespaced_job_status(name=name,namespace=namespace,)returnjob
[docs]asyncdefwait_until_job_complete(self,name:str,namespace:str,poll_interval:float=10)->V1Job:""" Block job of specified name and namespace until it is complete or failed. :param name: Name of Job to fetch. :param namespace: Namespace of the Job. :param poll_interval: Interval in seconds between polling the job status :return: Job object """whileTrue:self.log.info("Requesting status for the job '%s' ",name)job:V1Job=awaitself.get_job_status(name=name,namespace=namespace)ifself.is_job_complete(job=job):returnjobself.log.info("The job '%s' is incomplete. Sleeping for %i sec.",name,poll_interval)awaitasyncio.sleep(poll_interval)
[docs]asyncdefwait_until_container_complete(self,name:str,namespace:str,container_name:str,poll_interval:float=10)->None:""" Wait for the given container in the given pod to be completed. :param name: Name of Pod to fetch. :param namespace: Namespace of the Pod. :param container_name: name of the container within the pod to monitor :param poll_interval: Interval in seconds between polling the container status """whileTrue:pod=awaitself.get_pod(name=name,namespace=namespace)ifcontainer_is_completed(pod=pod,container_name=container_name):breakself.log.info("Waiting for container '%s' state to be completed",container_name)awaitasyncio.sleep(poll_interval)
[docs]asyncdefwait_until_container_started(self,name:str,namespace:str,container_name:str,poll_interval:float=10)->None:""" Wait for the given container in the given pod to be started. :param name: Name of Pod to fetch. :param namespace: Namespace of the Pod. :param container_name: name of the container within the pod to monitor :param poll_interval: Interval in seconds between polling the container status """whileTrue:pod=awaitself.get_pod(name=name,namespace=namespace)ifcontainer_is_running(pod=pod,container_name=container_name):breakself.log.info("Waiting for container '%s' state to be running",container_name)awaitasyncio.sleep(poll_interval)