Source code for airflow.executors.celery_kubernetes_executor
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportDict,List,Optional,Set,Unionfromairflow.configurationimportconffromairflow.executors.base_executorimportCommandType,EventBufferValueType,QueuedTaskInstanceTypefromairflow.executors.celery_executorimportCeleryExecutorfromairflow.executors.kubernetes_executorimportKubernetesExecutorfromairflow.models.taskinstanceimportSimpleTaskInstance,TaskInstance,TaskInstanceKeyfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classCeleryKubernetesExecutor(LoggingMixin):""" CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor. It chooses an executor to use based on the queue defined on the task. When the queue is the value of ``kubernetes_queue`` in section ``[celery_kubernetes_executor]`` of the configuration (default value: `kubernetes`), KubernetesExecutor is selected to run the task, otherwise, CeleryExecutor is used. """
[docs]defqueued_tasks(self)->Dict[TaskInstanceKey,QueuedTaskInstanceType]:"""Return queued tasks from celery and kubernetes executor"""queued_tasks=self.celery_executor.queued_tasks.copy()queued_tasks.update(self.kubernetes_executor.queued_tasks)returnqueued_tasks
@property
[docs]defrunning(self)->Set[TaskInstanceKey]:"""Return running tasks from celery and kubernetes executor"""returnself.celery_executor.running.union(self.kubernetes_executor.running)
@property
[docs]defjob_id(self):""" This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper of executors we implement as property so we can have custom setter. """returnself._job_id
@job_id.setterdefjob_id(self,value):"""job_id is manipulated by SchedulerJob. We must propagate the job_id to wrapped executors."""self._job_id=valueself.kubernetes_executor.job_id=valueself.celery_executor.job_id=value
[docs]defstart(self)->None:"""Start celery and kubernetes executor"""self.celery_executor.start()self.kubernetes_executor.start()
@property
[docs]defslots_available(self):"""Number of new tasks this executor instance can accept"""returnself.celery_executor.slots_available
[docs]defqueue_command(self,task_instance:TaskInstance,command:CommandType,priority:int=1,queue:Optional[str]=None,):"""Queues command via celery or kubernetes executor"""executor=self._router(task_instance)self.log.debug("Using executor: %s for %s",executor.__class__.__name__,task_instance.key)executor.queue_command(task_instance,command,priority,queue)
[docs]defqueue_task_instance(self,task_instance:TaskInstance,mark_success:bool=False,pickle_id:Optional[str]=None,ignore_all_deps:bool=False,ignore_depends_on_past:bool=False,ignore_task_deps:bool=False,ignore_ti_state:bool=False,pool:Optional[str]=None,cfg_path:Optional[str]=None,)->None:"""Queues task instance via celery or kubernetes executor"""executor=self._router(SimpleTaskInstance(task_instance))self.log.debug("Using executor: %s to queue_task_instance for %s",executor.__class__.__name__,task_instance.key)executor.queue_task_instance(task_instance,mark_success,pickle_id,ignore_all_deps,ignore_depends_on_past,ignore_task_deps,ignore_ti_state,pool,cfg_path,
)
[docs]defhas_task(self,task_instance:TaskInstance)->bool:""" Checks if a task is either queued or running in either celery or kubernetes executor. :param task_instance: TaskInstance :return: True if the task is known to this executor """returnself.celery_executor.has_task(task_instance)orself.kubernetes_executor.has_task(task_instance
)
[docs]defheartbeat(self)->None:"""Heartbeat sent to trigger new jobs in celery and kubernetes executor"""self.celery_executor.heartbeat()self.kubernetes_executor.heartbeat()
[docs]defget_event_buffer(self,dag_ids=None)->Dict[TaskInstanceKey,EventBufferValueType]:""" Returns and flush the event buffer from celery and kubernetes executor :param dag_ids: to dag_ids to return events for, if None returns all :return: a dict of events """cleared_events_from_celery=self.celery_executor.get_event_buffer(dag_ids)cleared_events_from_kubernetes=self.kubernetes_executor.get_event_buffer(dag_ids)return{**cleared_events_from_celery,**cleared_events_from_kubernetes}
[docs]deftry_adopt_task_instances(self,tis:List[TaskInstance])->List[TaskInstance]:""" Try to adopt running task instances that have been abandoned by a SchedulerJob dying. Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling) :return: any TaskInstances that were unable to be adopted :rtype: list[airflow.models.TaskInstance] """celery_tis=[]kubernetes_tis=[]abandoned_tis=[]fortiintis:ifti.queue==self.KUBERNETES_QUEUE:kubernetes_tis.append(ti)else:celery_tis.append(ti)abandoned_tis.extend(self.celery_executor.try_adopt_task_instances(celery_tis))abandoned_tis.extend(self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis))returnabandoned_tis
[docs]defend(self)->None:"""End celery and kubernetes executor"""self.celery_executor.end()self.kubernetes_executor.end()
[docs]defterminate(self)->None:"""Terminate celery and kubernetes executor"""self.celery_executor.terminate()self.kubernetes_executor.terminate()
[docs]def_router(self,simple_task_instance:SimpleTaskInstance)->Union[CeleryExecutor,KubernetesExecutor]:""" Return either celery_executor or kubernetes_executor :param simple_task_instance: SimpleTaskInstance :return: celery_executor or kubernetes_executor :rtype: Union[CeleryExecutor, KubernetesExecutor] """ifsimple_task_instance.queue==self.KUBERNETES_QUEUE:returnself.kubernetes_executorreturnself.celery_executor
[docs]defdebug_dump(self):"""Called in response to SIGUSR2 by the scheduler"""self.log.info("Dumping CeleryExecutor state")self.celery_executor.debug_dump()self.log.info("Dumping KubernetesExecutor state")self.kubernetes_executor.debug_dump()