Source code for airflow.executors.kubernetes_executor
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""KubernetesExecutor.. seealso:: For more information on how the KubernetesExecutor works, take a look at the guide: :ref:`executor:KubernetesExecutor`"""importfunctoolsimportjsonimportmultiprocessingimporttimefromdatetimeimporttimedeltafromqueueimportEmpty,QueuefromtypingimportAny,Dict,List,Optional,Tuplefromkubernetesimportclient,watchfromkubernetes.clientimportConfiguration,modelsask8sfromkubernetes.client.restimportApiExceptionfromurllib3.exceptionsimportReadTimeoutErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.executors.base_executorimportNOT_STARTED_MESSAGE,BaseExecutor,CommandTypefromairflow.kubernetesimportpod_generatorfromairflow.kubernetes.kube_clientimportget_kube_clientfromairflow.kubernetes.kube_configimportKubeConfigfromairflow.kubernetes.kubernetes_helper_functionsimportannotations_to_key,create_pod_idfromairflow.kubernetes.pod_generatorimportPodGeneratorfromairflow.models.taskinstanceimportTaskInstance,TaskInstanceKeyfromairflow.settingsimportpod_mutation_hookfromairflow.utilsimporttimezonefromairflow.utils.event_schedulerimportEventSchedulerfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.sessionimportprovide_sessionfromairflow.utils.stateimportState# TaskInstance key, command, configuration, pod_template_file
[docs]classKubernetesJobWatcher(multiprocessing.Process,LoggingMixin):"""Watches for Kubernetes jobs"""def__init__(self,namespace:Optional[str],multi_namespace_mode:bool,watcher_queue:'Queue[KubernetesWatchType]',resource_version:Optional[str],scheduler_job_id:str,kube_config:Configuration,):super().__init__()self.namespace=namespaceself.multi_namespace_mode=multi_namespace_modeself.scheduler_job_id=scheduler_job_idself.watcher_queue=watcher_queueself.resource_version=resource_versionself.kube_config=kube_config
[docs]defrun(self)->None:"""Performs watching"""kube_client:client.CoreV1Api=get_kube_client()ifnotself.scheduler_job_id:raiseAirflowException(NOT_STARTED_MESSAGE)whileTrue:try:self.resource_version=self._run(kube_client,self.resource_version,self.scheduler_job_id,self.kube_config)exceptReadTimeoutError:self.log.warning("There was a timeout error accessing the Kube API. Retrying request.",exc_info=True)time.sleep(1)exceptException:self.log.exception('Unknown error in KubernetesJobWatcher. Failing')self.resource_version="0"ResourceVersion().resource_version="0"raiseelse:self.log.warning('Watch died gracefully, starting back up with: last resource_version: %s',self.resource_version,
)def_run(self,kube_client:client.CoreV1Api,resource_version:Optional[str],scheduler_job_id:str,kube_config:Any,)->Optional[str]:self.log.info('Event: and now my watch begins starting at resource_version: %s',resource_version)watcher=watch.Watch()kwargs={'label_selector':f'airflow-worker={scheduler_job_id}'}ifresource_version:kwargs['resource_version']=resource_versionifkube_config.kube_client_request_args:forkey,valueinkube_config.kube_client_request_args.items():kwargs[key]=valuelast_resource_version:Optional[str]=Noneifself.multi_namespace_mode:list_worker_pods=functools.partial(watcher.stream,kube_client.list_pod_for_all_namespaces,**kwargs)else:list_worker_pods=functools.partial(watcher.stream,kube_client.list_namespaced_pod,self.namespace,**kwargs)foreventinlist_worker_pods():task=event['object']self.log.info('Event: %s had an event of type %s',task.metadata.name,event['type'])ifevent['type']=='ERROR':returnself.process_error(event)annotations=task.metadata.annotationstask_instance_related_annotations={'dag_id':annotations['dag_id'],'task_id':annotations['task_id'],'execution_date':annotations.get('execution_date'),'run_id':annotations.get('run_id'),'try_number':annotations['try_number'],}map_index=annotations.get('map_index')ifmap_indexisnotNone:task_instance_related_annotations['map_index']=map_indexself.process_status(pod_id=task.metadata.name,namespace=task.metadata.namespace,status=task.status.phase,annotations=task_instance_related_annotations,resource_version=task.metadata.resource_version,event=event,)last_resource_version=task.metadata.resource_versionreturnlast_resource_version
[docs]defprocess_error(self,event:Any)->str:"""Process error response"""self.log.error('Encountered Error response from k8s list namespaced pod stream => %s',event)raw_object=event['raw_object']ifraw_object['code']==410:self.log.info('Kubernetes resource version is too old, must reset to 0 => %s',(raw_object['message'],))# Return resource version 0return'0'raiseAirflowException(f"Kubernetes failure for {raw_object['reason']} with code {raw_object['code']} and message: "
f"{raw_object['message']}")
[docs]defprocess_status(self,pod_id:str,namespace:str,status:str,annotations:Dict[str,str],resource_version:str,event:Any,)->None:"""Process status response"""ifstatus=='Pending':ifevent['type']=='DELETED':self.log.info('Event: Failed to start pod %s',pod_id)self.watcher_queue.put((pod_id,namespace,State.FAILED,annotations,resource_version))else:self.log.info('Event: %s Pending',pod_id)elifstatus=='Failed':self.log.error('Event: %s Failed',pod_id)self.watcher_queue.put((pod_id,namespace,State.FAILED,annotations,resource_version))elifstatus=='Succeeded':self.log.info('Event: %s Succeeded',pod_id)self.watcher_queue.put((pod_id,namespace,None,annotations,resource_version))elifstatus=='Running':ifevent['type']=='DELETED':self.log.info('Event: Pod %s deleted before it could complete',pod_id)self.watcher_queue.put((pod_id,namespace,State.FAILED,annotations,resource_version))else:self.log.info('Event: %s is Running',pod_id)else:self.log.warning('Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with ''resource_version: %s',status,pod_id,namespace,annotations,resource_version,
)
[docs]classAirflowKubernetesScheduler(LoggingMixin):"""Airflow Scheduler for Kubernetes"""def__init__(self,kube_config:Any,task_queue:'Queue[KubernetesJobType]',result_queue:'Queue[KubernetesResultsType]',kube_client:client.CoreV1Api,scheduler_job_id:str,):super().__init__()self.log.debug("Creating Kubernetes executor")self.kube_config=kube_configself.task_queue=task_queueself.result_queue=result_queueself.namespace=self.kube_config.kube_namespaceself.log.debug("Kubernetes using namespace %s",self.namespace)self.kube_client=kube_clientself._manager=multiprocessing.Manager()self.watcher_queue=self._manager.Queue()self.scheduler_job_id=scheduler_job_idself.kube_watcher=self._make_kube_watcher()
[docs]defrun_pod_async(self,pod:k8s.V1Pod,**kwargs):"""Runs POD asynchronously"""pod_mutation_hook(pod)sanitized_pod=self.kube_client.api_client.sanitize_for_serialization(pod)json_pod=json.dumps(sanitized_pod,indent=2)self.log.debug('Pod Creation Request: \n%s',json_pod)try:resp=self.kube_client.create_namespaced_pod(body=sanitized_pod,namespace=pod.metadata.namespace,**kwargs)self.log.debug('Pod Creation Response: %s',resp)exceptExceptionase:self.log.exception('Exception when attempting to create Namespaced Pod: %s',json_pod)raiseereturnresp
def_make_kube_watcher(self)->KubernetesJobWatcher:resource_version=ResourceVersion().resource_versionwatcher=KubernetesJobWatcher(watcher_queue=self.watcher_queue,namespace=self.kube_config.kube_namespace,multi_namespace_mode=self.kube_config.multi_namespace_mode,resource_version=resource_version,scheduler_job_id=self.scheduler_job_id,kube_config=self.kube_config,)watcher.start()returnwatcherdef_health_check_kube_watcher(self):ifself.kube_watcher.is_alive():self.log.debug("KubeJobWatcher alive, continuing")else:self.log.error('Error while health checking kube watcher process. Process died for unknown reasons')ResourceVersion().resource_version="0"self.kube_watcher=self._make_kube_watcher()
[docs]defrun_next(self,next_job:KubernetesJobType)->None:""" The run_next command will check the task_queue for any un-run jobs. It will then create a unique job-id, launch that job in the cluster, and store relevant info in the current_jobs map so we can track the job's status """self.log.info('Kubernetes job is %s',str(next_job).replace("\n"," "))key,command,kube_executor_config,pod_template_file=next_jobdag_id,task_id,run_id,try_number,map_index=keyifcommand[0:3]!=["airflow","tasks","run"]:raiseValueError('The command must start with ["airflow", "tasks", "run"].')base_worker_pod=get_base_pod_from_template(pod_template_file,self.kube_config)ifnotbase_worker_pod:raiseAirflowException(f"could not find a valid worker template yaml at {self.kube_config.pod_template_file}")pod=PodGenerator.construct_pod(namespace=self.namespace,scheduler_job_id=self.scheduler_job_id,pod_id=create_pod_id(dag_id,task_id),dag_id=dag_id,task_id=task_id,kube_image=self.kube_config.kube_image,try_number=try_number,map_index=map_index,date=None,run_id=run_id,args=command,pod_override_object=kube_executor_config,base_worker_pod=base_worker_pod,)# Reconcile the pod generated by the Operator and the Pod# generated by the .cfg fileself.log.debug("Kubernetes running for command %s",command)self.log.debug("Kubernetes launching image %s",pod.spec.containers[0].image)# the watcher will monitor pods, so we do not block.self.run_pod_async(pod,**self.kube_config.kube_client_request_args)self.log.debug("Kubernetes Job created!")
[docs]defdelete_pod(self,pod_id:str,namespace:str)->None:"""Deletes POD"""try:self.log.debug("Deleting pod %s in namespace %s",pod_id,namespace)self.kube_client.delete_namespaced_pod(pod_id,namespace,body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),**self.kube_config.kube_client_request_args,)exceptApiExceptionase:# If the pod is already deletedife.status!=404:raise
[docs]defsync(self)->None:""" The sync function checks the status of all currently running kubernetes jobs. If a job is completed, its status is placed in the result queue to be sent back to the scheduler. :return: """self.log.debug("Syncing KubernetesExecutor")self._health_check_kube_watcher()whileTrue:try:task=self.watcher_queue.get_nowait()try:self.log.debug("Processing task %s",task)self.process_watcher_task(task)finally:self.watcher_queue.task_done()exceptEmpty:break
[docs]defprocess_watcher_task(self,task:KubernetesWatchType)->None:"""Process the task by watcher."""pod_id,namespace,state,annotations,resource_version=taskself.log.info('Attempting to finish pod; pod_id: %s; state: %s; annotations: %s',pod_id,state,annotations)key=annotations_to_key(annotations=annotations)ifkey:self.log.debug('finishing job %s - %s (%s)',key,state,pod_id)self.result_queue.put((key,state,pod_id,namespace,resource_version))
def_flush_watcher_queue(self)->None:self.log.debug('Executor shutting down, watcher_queue approx. size=%d',self.watcher_queue.qsize())whileTrue:try:task=self.watcher_queue.get_nowait()# Ignoring it since it can only have either FAILED or SUCCEEDED podsself.log.warning('Executor shutting down, IGNORING watcher task=%s',task)self.watcher_queue.task_done()exceptEmpty:break
[docs]defterminate(self)->None:"""Terminates the watcher."""self.log.debug("Terminating kube_watcher...")self.kube_watcher.terminate()self.kube_watcher.join()self.log.debug("kube_watcher=%s",self.kube_watcher)self.log.debug("Flushing watcher_queue...")self._flush_watcher_queue()# Queue should be empty...self.watcher_queue.join()self.log.debug("Shutting down manager...")self._manager.shutdown()
[docs]defget_base_pod_from_template(pod_template_file:Optional[str],kube_config:Any)->k8s.V1Pod:""" Reads either the pod_template_file set in the executor_config or the base pod_template_file set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor :param pod_template_file: absolute path to a pod_template_file.yaml or None :param kube_config: The KubeConfig class generated by airflow that contains all kube metadata :return: a V1Pod that can be used as the base pod for k8s tasks """ifpod_template_file:returnPodGenerator.deserialize_model_file(pod_template_file)else:returnPodGenerator.deserialize_model_file(kube_config.pod_template_file)
[docs]classKubernetesExecutor(BaseExecutor):"""Executor for Kubernetes"""
[docs]defclear_not_launched_queued_tasks(self,session=None)->None:""" Tasks can end up in a "Queued" state through either the executor being abruptly shut down (leaving a non-empty task_queue on this executor) or when a rescheduled/deferred operator comes back up for execution (with the same try_number) before the pod of its previous incarnation has been fully removed (we think). This method checks each of those tasks to see if the corresponding pod is around, and if not, and there's no matching entry in our own task_queue, marks it for re-execution. """self.log.debug("Clearing tasks that have not been launched")ifnotself.kube_client:raiseAirflowException(NOT_STARTED_MESSAGE)query=session.query(TaskInstance).filter(TaskInstance.state==State.QUEUED)ifself.kubernetes_queue:query=query.filter(TaskInstance.queue==self.kubernetes_queue)queued_tis:List[TaskInstance]=query.all()self.log.info('Found %s queued task instances',len(queued_tis))# Go through the "last seen" dictionary and clean out old entriesallowed_age=self.kube_config.worker_pods_queued_check_interval*3forkey,timestampinlist(self.last_handled.items()):iftime.time()-timestamp>allowed_age:delself.last_handled[key]fortiinqueued_tis:self.log.debug("Checking task instance %s",ti)# Check to see if we've handled it ourselves recentlyifti.keyinself.last_handled:continue# Build the pod selectorbase_label_selector=(f"dag_id={pod_generator.make_safe_label_value(ti.dag_id)},"f"task_id={pod_generator.make_safe_label_value(ti.task_id)},"f"airflow-worker={pod_generator.make_safe_label_value(str(ti.queued_by_job_id))}")ifti.map_index>=0:# Old tasks _couldn't_ be mapped, so we don't have to worry about compatbase_label_selector+=f',map_index={ti.map_index}'kwargs=dict(label_selector=base_label_selector)ifself.kube_config.kube_client_request_args:kwargs.update(**self.kube_config.kube_client_request_args)# Try run_id firstkwargs['label_selector']+=',run_id='+pod_generator.make_safe_label_value(ti.run_id)pod_list=self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace,**kwargs)ifpod_list.items:continue# Fallback to old style of using execution_datekwargs['label_selector']=(f'{base_label_selector},'f'execution_date={pod_generator.datetime_to_label_safe_datestring(ti.execution_date)}')pod_list=self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace,**kwargs)ifpod_list.items:continueself.log.info('TaskInstance: %s found in queued state but was not launched, rescheduling',ti)session.query(TaskInstance).filter(TaskInstance.dag_id==ti.dag_id,TaskInstance.task_id==ti.task_id,TaskInstance.run_id==ti.run_id,TaskInstance.map_index==ti.map_index,).update({TaskInstance.state:State.SCHEDULED})
[docs]defstart(self)->None:"""Starts the executor"""self.log.info('Start Kubernetes executor')ifnotself.job_id:raiseAirflowException("Could not get scheduler_job_id")self.scheduler_job_id=str(self.job_id)self.log.debug('Start with scheduler_job_id: %s',self.scheduler_job_id)self.kube_client=get_kube_client()self.kube_scheduler=AirflowKubernetesScheduler(self.kube_config,self.task_queue,self.result_queue,self.kube_client,self.scheduler_job_id)self.event_scheduler=EventScheduler()self.event_scheduler.call_regular_interval(self.kube_config.worker_pods_pending_timeout_check_interval,self._check_worker_pods_pending_timeout,)self.event_scheduler.call_regular_interval(self.kube_config.worker_pods_queued_check_interval,self.clear_not_launched_queued_tasks,)# We also call this at startup as that's the most likely time to see# stuck queued tasksself.clear_not_launched_queued_tasks()
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:Optional[str]=None,executor_config:Optional[Any]=None,)->None:"""Executes task asynchronously"""self.log.info('Add task %s with command %s with executor_config %s',key,command,executor_config)try:kube_executor_config=PodGenerator.from_obj(executor_config)exceptException:self.log.error("Invalid executor_config for %s",key)self.fail(key=key,info="Invalid executor_config passed")returnifexecutor_config:pod_template_file=executor_config.get("pod_template_file",None)else:pod_template_file=Noneifnotself.task_queue:raiseAirflowException(NOT_STARTED_MESSAGE)self.event_buffer[key]=(State.QUEUED,self.scheduler_job_id)self.task_queue.put((key,command,kube_executor_config,pod_template_file))# We keep a temporary local record that we've handled this so we don't# try and remove it from the QUEUED state while we process itself.last_handled[key]=time.time()
[docs]defsync(self)->None:"""Synchronize task state."""ifself.running:self.log.debug('self.running: %s',self.running)ifself.queued_tasks:self.log.debug('self.queued: %s',self.queued_tasks)ifnotself.scheduler_job_id:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.kube_scheduler:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.kube_config:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.result_queue:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.task_queue:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.event_scheduler:raiseAirflowException(NOT_STARTED_MESSAGE)self.kube_scheduler.sync()last_resource_version=NonewhileTrue:try:results=self.result_queue.get_nowait()try:key,state,pod_id,namespace,resource_version=resultslast_resource_version=resource_versionself.log.info('Changing state of %s to %s',results,state)try:self._change_state(key,state,pod_id,namespace)exceptExceptionase:self.log.exception("Exception: %s when attempting to change state of %s to %s, re-queueing.",e,results,state,)self.result_queue.put(results)finally:self.result_queue.task_done()exceptEmpty:breakresource_instance=ResourceVersion()resource_instance.resource_version=last_resource_versionorresource_instance.resource_versionfor_inrange(self.kube_config.worker_pods_creation_batch_size):try:task=self.task_queue.get_nowait()try:self.kube_scheduler.run_next(task)exceptApiExceptionase:# These codes indicate something is wrong with pod definition; otherwise we assume pod# definition is ok, and that retrying may workife.statusin(400,422):self.log.error("Pod creation failed with reason %r. Failing task",e.reason)key,_,_,_=taskself.change_state(key,State.FAILED,e)else:self.log.warning('ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s',e.reason,json.loads(e.body)['message'],)self.task_queue.put(task)finally:self.task_queue.task_done()exceptEmpty:break# Run any pending timed eventsnext_event=self.event_scheduler.run(blocking=False)self.log.debug("Next timed event is in %f",next_event)
def_check_worker_pods_pending_timeout(self):"""Check if any pending worker pods have timed out"""ifnotself.scheduler_job_id:raiseAirflowException(NOT_STARTED_MESSAGE)timeout=self.kube_config.worker_pods_pending_timeoutself.log.debug('Looking for pending worker pods older than %d seconds',timeout)kwargs={'limit':self.kube_config.worker_pods_pending_timeout_batch_size,'field_selector':'status.phase=Pending','label_selector':f'airflow-worker={self.scheduler_job_id}',**self.kube_config.kube_client_request_args,}ifself.kube_config.multi_namespace_mode:pending_pods=functools.partial(self.kube_client.list_pod_for_all_namespaces,**kwargs)else:pending_pods=functools.partial(self.kube_client.list_namespaced_pod,self.kube_config.kube_namespace,**kwargs)cutoff=timezone.utcnow()-timedelta(seconds=timeout)forpodinpending_pods().items:self.log.debug('Found a pending pod "%s", created "%s"',pod.metadata.name,pod.metadata.creation_timestamp)ifpod.metadata.creation_timestamp<cutoff:self.log.error(('Pod "%s" has been pending for longer than %d seconds.''It will be deleted and set to failed.'),pod.metadata.name,timeout,)self.kube_scheduler.delete_pod(pod.metadata.name,pod.metadata.namespace)def_change_state(self,key:TaskInstanceKey,state:Optional[str],pod_id:str,namespace:str)->None:ifstate!=State.RUNNING:ifself.kube_config.delete_worker_pods:ifnotself.kube_scheduler:raiseAirflowException(NOT_STARTED_MESSAGE)ifstate!=State.FAILEDorself.kube_config.delete_worker_pods_on_failure:self.kube_scheduler.delete_pod(pod_id,namespace)self.log.info('Deleted pod: %s in namespace %s',str(key),str(namespace))try:self.running.remove(key)exceptKeyError:self.log.debug('Could not find key: %s',str(key))self.event_buffer[key]=state,None
[docs]defadopt_launched_task(self,kube_client:client.CoreV1Api,pod:k8s.V1Pod,pod_ids:Dict[TaskInstanceKey,k8s.V1Pod])->None:""" Patch existing pod so that the current KubernetesJobWatcher can monitor it via label selectors :param kube_client: kubernetes client for speaking to kube API :param pod: V1Pod spec that we will patch with new label :param pod_ids: pod_ids we expect to patch. """ifnotself.scheduler_job_id:raiseAirflowException(NOT_STARTED_MESSAGE)self.log.info("attempting to adopt pod %s",pod.metadata.name)pod.metadata.labels['airflow-worker']=pod_generator.make_safe_label_value(self.scheduler_job_id)pod_id=annotations_to_key(pod.metadata.annotations)ifpod_idnotinpod_ids:self.log.error("attempting to adopt taskinstance which was not specified by database: %s",pod_id)returntry:kube_client.patch_namespaced_pod(name=pod.metadata.name,namespace=pod.metadata.namespace,body=PodGenerator.serialize_pod(pod),)pod_ids.pop(pod_id)self.running.add(pod_id)exceptApiExceptionase:self.log.info("Failed to adopt pod %s. Reason: %s",pod.metadata.name,e)
def_adopt_completed_pods(self,kube_client:client.CoreV1Api)->None:""" Patch completed pod so that the KubernetesJobWatcher can delete it. :param kube_client: kubernetes client for speaking to kube API """ifnotself.scheduler_job_id:raiseAirflowException(NOT_STARTED_MESSAGE)kwargs={'field_selector':"status.phase=Succeeded",'label_selector':'kubernetes_executor=True',}pod_list=kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace,**kwargs)forpodinpod_list.items:self.log.info("Attempting to adopt pod %s",pod.metadata.name)pod.metadata.labels['airflow-worker']=pod_generator.make_safe_label_value(self.scheduler_job_id)try:kube_client.patch_namespaced_pod(name=pod.metadata.name,namespace=pod.metadata.namespace,body=PodGenerator.serialize_pod(pod),)exceptApiExceptionase:self.log.info("Failed to adopt pod %s. Reason: %s",pod.metadata.name,e)def_flush_task_queue(self)->None:ifnotself.task_queue:raiseAirflowException(NOT_STARTED_MESSAGE)self.log.debug('Executor shutting down, task_queue approximate size=%d',self.task_queue.qsize())whileTrue:try:task=self.task_queue.get_nowait()# This is a new task to run thus ok to ignore.self.log.warning('Executor shutting down, will NOT run task=%s',task)self.task_queue.task_done()exceptEmpty:breakdef_flush_result_queue(self)->None:ifnotself.result_queue:raiseAirflowException(NOT_STARTED_MESSAGE)self.log.debug('Executor shutting down, result_queue approximate size=%d',self.result_queue.qsize())whileTrue:try:results=self.result_queue.get_nowait()self.log.warning('Executor shutting down, flushing results=%s',results)try:key,state,pod_id,namespace,resource_version=resultsself.log.info('Changing state of %s to %s : resource_version=%d',results,state,resource_version)try:self._change_state(key,state,pod_id,namespace)exceptExceptionase:self.log.exception('Ignoring exception: %s when attempting to change state of %s to %s.',e,results,state,)finally:self.result_queue.task_done()exceptEmpty:break
[docs]defend(self)->None:"""Called when the executor shuts down"""ifnotself.task_queue:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.result_queue:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.kube_scheduler:raiseAirflowException(NOT_STARTED_MESSAGE)self.log.info('Shutting down Kubernetes executor')self.log.debug('Flushing task_queue...')self._flush_task_queue()self.log.debug('Flushing result_queue...')self._flush_result_queue()# Both queues should be empty...self.task_queue.join()self.result_queue.join()ifself.kube_scheduler:self.kube_scheduler.terminate()self._manager.shutdown()
[docs]defterminate(self):"""Terminate the executor is not doing anything."""