Source code for airflow.providers.cncf.kubernetes.triggers.pod
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportdatetimeimporttracebackimportwarningsfromenumimportEnumfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,AsyncIteratorfromairflow.exceptionsimportAirflowProviderDeprecationWarningfromairflow.providers.cncf.kubernetes.hooks.kubernetesimportAsyncKubernetesHookfromairflow.providers.cncf.kubernetes.utils.pod_managerimport(OnFinishAction,PodLaunchTimeoutException,PodPhase,)fromairflow.triggers.baseimportBaseTrigger,TriggerEventifTYPE_CHECKING:fromkubernetes_asyncio.client.modelsimportV1PodfrompendulumimportDateTime
[docs]classContainerState(str,Enum):""" Possible container states. See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase. """
[docs]classKubernetesPodTrigger(BaseTrigger):""" KubernetesPodTrigger run on the trigger worker to check the state of Pod. :param pod_name: The name of the pod. :param pod_namespace: The namespace of the pod. :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` for the Kubernetes cluster. :param cluster_context: Context that points to kubernetes cluster. :param config_file: Path to kubeconfig file. :param poll_interval: Polling period in seconds to check for the status. :param trigger_start_time: time in Datetime format when the trigger was started :param in_cluster: run kubernetes client with in_cluster configuration. :param get_logs: get the stdout of the container as logs of the tasks. :param startup_timeout: timeout in seconds to start up the pod. :param startup_check_interval: interval in seconds to check if the pod has already started. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. :param should_delete_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the pod; if False, leave the pod. Deprecated - use `on_finish_action` instead. :param logging_interval: number of seconds to wait before kicking it back to the operator to print latest logs. If ``None`` will wait until container done. :param last_log_time: where to resume logs from """def__init__(self,pod_name:str,pod_namespace:str,trigger_start_time:datetime.datetime,base_container_name:str,kubernetes_conn_id:str|None=None,poll_interval:float=2,cluster_context:str|None=None,config_file:str|None=None,in_cluster:bool|None=None,get_logs:bool=True,startup_timeout:int=120,startup_check_interval:int=5,on_finish_action:str="delete_pod",should_delete_pod:bool|None=None,last_log_time:DateTime|None=None,logging_interval:int|None=None,):super().__init__()self.pod_name=pod_nameself.pod_namespace=pod_namespaceself.trigger_start_time=trigger_start_timeself.base_container_name=base_container_nameself.kubernetes_conn_id=kubernetes_conn_idself.poll_interval=poll_intervalself.cluster_context=cluster_contextself.config_file=config_fileself.in_cluster=in_clusterself.get_logs=get_logsself.startup_timeout=startup_timeoutself.startup_check_interval=startup_check_intervalself.last_log_time=last_log_timeself.logging_interval=logging_intervalifshould_delete_podisnotNone:warnings.warn("`should_delete_pod` parameter is deprecated, please use `on_finish_action`",category=AirflowProviderDeprecationWarning,stacklevel=2,)self.on_finish_action=(OnFinishAction.DELETE_PODifshould_delete_podelseOnFinishAction.KEEP_POD)self.should_delete_pod=should_delete_podelse:self.on_finish_action=OnFinishAction(on_finish_action)self.should_delete_pod=self.on_finish_action==OnFinishAction.DELETE_PODself._since_time=None
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize KubernetesCreatePodTrigger arguments and classpath."""return("airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger",{"pod_name":self.pod_name,"pod_namespace":self.pod_namespace,"base_container_name":self.base_container_name,"kubernetes_conn_id":self.kubernetes_conn_id,"poll_interval":self.poll_interval,"cluster_context":self.cluster_context,"config_file":self.config_file,"in_cluster":self.in_cluster,"get_logs":self.get_logs,"startup_timeout":self.startup_timeout,"startup_check_interval":self.startup_check_interval,"trigger_start_time":self.trigger_start_time,"should_delete_pod":self.should_delete_pod,"on_finish_action":self.on_finish_action.value,"last_log_time":self.last_log_time,"logging_interval":self.logging_interval,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:# type: ignore[override]"""Get current pod status and yield a TriggerEvent."""self.log.info("Checking pod %r in namespace %r.",self.pod_name,self.pod_namespace)try:state=awaitself._wait_for_pod_start()ifstate==ContainerState.TERMINATED:event=TriggerEvent({"status":"success","namespace":self.pod_namespace,"name":self.pod_name,"message":"All containers inside pod have started successfully.",})elifstate==ContainerState.FAILED:event=TriggerEvent({"status":"failed","namespace":self.pod_namespace,"name":self.pod_name,"message":"pod failed",})else:event=awaitself._wait_for_container_completion()yieldeventreturnexceptPodLaunchTimeoutExceptionase:message=self._format_exception_description(e)yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"timeout","message":message,})returnexceptExceptionase:yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"error","message":str(e),"stack_trace":traceback.format_exc(),})return
def_format_exception_description(self,exc:Exception)->Any:ifisinstance(exc,PodLaunchTimeoutException):returnexc.args[0]description=f"Trigger {self.__class__.__name__} failed with exception {exc.__class__.__name__}."message=exc.argsandexc.args[0]or""ifmessage:description+=f"\ntrigger exception message: {message}"curr_traceback=traceback.format_exc()description+=f"\ntrigger traceback:\n{curr_traceback}"returndescriptionasyncdef_wait_for_pod_start(self)->ContainerState:"""Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error."""whileTrue:pod=awaitself.hook.get_pod(self.pod_name,self.pod_namespace)ifnotpod.status.phase=="Pending":returnself.define_container_state(pod)delta=datetime.datetime.now(tz=datetime.timezone.utc)-self.trigger_start_timeifself.startup_timeout<delta.total_seconds():raisePodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout")self.log.info("Still waiting for pod to start. The pod state is %s",pod.status.phase)awaitasyncio.sleep(self.startup_check_interval)asyncdef_wait_for_container_completion(self)->TriggerEvent:""" Wait for container completion. Waits until container is no longer in running state. If trigger is configured with a logging period, then will emit an event to resume the task for the purpose of fetching more logs. """time_begin=datetime.datetime.now(tz=datetime.timezone.utc)time_get_more_logs=Noneifself.logging_intervalisnotNone:time_get_more_logs=time_begin+datetime.timedelta(seconds=self.logging_interval)whileTrue:pod=awaitself.hook.get_pod(self.pod_name,self.pod_namespace)container_state=self.define_container_state(pod)ifcontainer_state==ContainerState.TERMINATED:returnTriggerEvent({"status":"success","namespace":self.pod_namespace,"name":self.pod_name,"last_log_time":self.last_log_time,})elifcontainer_state==ContainerState.FAILED:returnTriggerEvent({"status":"failed","namespace":self.pod_namespace,"name":self.pod_name,"message":"Container state failed","last_log_time":self.last_log_time,})self.log.debug("Container is not completed and still working.")iftime_get_more_logsanddatetime.datetime.now(tz=datetime.timezone.utc)>time_get_more_logs:returnTriggerEvent({"status":"running","last_log_time":self.last_log_time,"namespace":self.pod_namespace,"name":self.pod_name,})self.log.debug("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)def_get_async_hook(self)->AsyncKubernetesHook:# TODO: Remove this method when the min version of kubernetes provider is 7.12.0 in Google provider.returnAsyncKubernetesHook(conn_id=self.kubernetes_conn_id,in_cluster=self.in_cluster,config_file=self.config_file,cluster_context=self.cluster_context,)@cached_property