Source code for airflow.providers.cncf.kubernetes.triggers.job
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,AsyncIteratorfromairflow.providers.cncf.kubernetes.hooks.kubernetesimportAsyncKubernetesHook,KubernetesHookfromairflow.providers.cncf.kubernetes.utils.pod_managerimportPodManagerfromairflow.providers.cncf.kubernetes.utils.xcom_sidecarimportPodDefaultsfromairflow.triggers.baseimportBaseTrigger,TriggerEventifTYPE_CHECKING:fromkubernetes.clientimportV1Job
[docs]classKubernetesJobTrigger(BaseTrigger):""" KubernetesJobTrigger run on the trigger worker to check the state of Job. :param job_name: The name of the job. :param job_namespace: The namespace of the job. :param pod_name: The name of the Pod. :param pod_namespace: The namespace of the Pod. :param base_container_name: The name of the base container in the pod. :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` for the Kubernetes cluster. :param cluster_context: Context that points to kubernetes cluster. :param config_file: Path to kubeconfig file. :param poll_interval: Polling period in seconds to check for the status. :param in_cluster: run kubernetes client with in_cluster configuration. :param get_logs: get the stdout of the base container as logs of the tasks. :param do_xcom_push: If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. """def__init__(self,job_name:str,job_namespace:str,pod_name:str,pod_namespace:str,base_container_name:str,kubernetes_conn_id:str|None=None,poll_interval:float=10.0,cluster_context:str|None=None,config_file:str|None=None,in_cluster:bool|None=None,get_logs:bool=True,do_xcom_push:bool=False,):super().__init__()self.job_name=job_nameself.job_namespace=job_namespaceself.pod_name=pod_nameself.pod_namespace=pod_namespaceself.base_container_name=base_container_nameself.kubernetes_conn_id=kubernetes_conn_idself.poll_interval=poll_intervalself.cluster_context=cluster_contextself.config_file=config_fileself.in_cluster=in_clusterself.get_logs=get_logsself.do_xcom_push=do_xcom_push
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize KubernetesCreateJobTrigger arguments and classpath."""return("airflow.providers.cncf.kubernetes.triggers.job.KubernetesJobTrigger",{"job_name":self.job_name,"job_namespace":self.job_namespace,"pod_name":self.pod_name,"pod_namespace":self.pod_namespace,"base_container_name":self.base_container_name,"kubernetes_conn_id":self.kubernetes_conn_id,"poll_interval":self.poll_interval,"cluster_context":self.cluster_context,"config_file":self.config_file,"in_cluster":self.in_cluster,"get_logs":self.get_logs,"do_xcom_push":self.do_xcom_push,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:# type: ignore[override]"""Get current job status and yield a TriggerEvent."""ifself.get_logsorself.do_xcom_push:pod=awaitself.hook.get_pod(name=self.pod_name,namespace=self.pod_namespace)ifself.do_xcom_push:awaitself.hook.wait_until_container_complete(name=self.pod_name,namespace=self.pod_namespace,container_name=self.base_container_name)self.log.info("Checking if xcom sidecar container is started.")awaitself.hook.wait_until_container_started(name=self.pod_name,namespace=self.pod_namespace,container_name=PodDefaults.SIDECAR_CONTAINER_NAME,)self.log.info("Extracting result from xcom sidecar container.")loop=asyncio.get_running_loop()xcom_result=awaitloop.run_in_executor(None,self.pod_manager.extract_xcom,pod)job:V1Job=awaitself.hook.wait_until_job_complete(name=self.job_name,namespace=self.job_namespace)job_dict=job.to_dict()error_message=self.hook.is_job_failed(job=job)yieldTriggerEvent({"name":job.metadata.name,"namespace":job.metadata.namespace,"pod_name":pod.metadata.nameifself.get_logselseNone,"pod_namespace":pod.metadata.namespaceifself.get_logselseNone,"status":"error"iferror_messageelse"success","message":f"Job failed with error: {error_message}"iferror_messageelse"Job completed successfully","job":job_dict,"xcom_result":xcom_resultifself.do_xcom_pushelseNone,})