Source code for airflow.executors.local_kubernetes_executor

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Sequence

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.kubernetes_executor import KubernetesExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.utils.log.logging_mixin import LoggingMixin

[docs]class LocalKubernetesExecutor(LoggingMixin): """ LocalKubernetesExecutor consists of LocalExecutor and KubernetesExecutor. It chooses the executor to use based on the queue defined on the task. When the task's queue is the value of ``kubernetes_queue`` in section ``[local_kubernetes_executor]`` of the configuration (default value: `kubernetes`), KubernetesExecutor is selected to run the task, otherwise, LocalExecutor is used. """
[docs] supports_ad_hoc_ti_run: bool = True
[docs] callback_sink: BaseCallbackSink | None = None
[docs] KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue')
def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor): super().__init__() self._job_id: str | None = None self.local_executor = local_executor self.kubernetes_executor = kubernetes_executor self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE @property
[docs] def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]: """Return queued tasks from local and kubernetes executor""" queued_tasks = self.local_executor.queued_tasks.copy() queued_tasks.update(self.kubernetes_executor.queued_tasks) return queued_tasks
[docs] def running(self) -> set[TaskInstanceKey]: """Return running tasks from local and kubernetes executor""" return self.local_executor.running.union(self.kubernetes_executor.running)
[docs] def job_id(self) -> str | None: """ This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper of executors we implement as property so we can have custom setter. """ return self._job_id
@job_id.setter def job_id(self, value: str | None) -> None: """job_id is manipulated by SchedulerJob. We must propagate the job_id to wrapped executors.""" self._job_id = value self.kubernetes_executor.job_id = value self.local_executor.job_id = value
[docs] def start(self) -> None:"Starting local and Kubernetes Executor") """Start local and kubernetes executor""" self.local_executor.start() self.kubernetes_executor.start()
[docs] def slots_available(self) -> int: """Number of new tasks this executor instance can accept""" return self.local_executor.slots_available
[docs] def queue_command( self, task_instance: TaskInstance, command: CommandType, priority: int = 1, queue: str | None = None, ) -> None: """Queues command via local or kubernetes executor""" executor = self._router(task_instance) self.log.debug("Using executor: %s for %s", executor.__class__.__name__, task_instance.key) executor.queue_command(task_instance, command, priority, queue)
[docs] def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, pickle_id: str | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, ) -> None: """Queues task instance via local or kubernetes executor""" executor = self._router(SimpleTaskInstance.from_ti(task_instance)) self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) executor.queue_task_instance( task_instance, mark_success, pickle_id, ignore_all_deps, ignore_depends_on_past, ignore_task_deps, ignore_ti_state, pool, cfg_path,
[docs] def has_task(self, task_instance: TaskInstance) -> bool: """ Checks if a task is either queued or running in either local or kubernetes executor. :param task_instance: TaskInstance :return: True if the task is known to this executor """ return self.local_executor.has_task(task_instance) or self.kubernetes_executor.has_task(task_instance)
[docs] def heartbeat(self) -> None: """Heartbeat sent to trigger new jobs in local and kubernetes executor""" self.local_executor.heartbeat() self.kubernetes_executor.heartbeat()
[docs] def get_event_buffer( self, dag_ids: list[str] | None = None ) -> dict[TaskInstanceKey, EventBufferValueType]: """ Returns and flush the event buffer from local and kubernetes executor :param dag_ids: dag_ids to return events for, if None returns all :return: a dict of events """ cleared_events_from_local = self.local_executor.get_event_buffer(dag_ids) cleared_events_from_kubernetes = self.kubernetes_executor.get_event_buffer(dag_ids) return {**cleared_events_from_local, **cleared_events_from_kubernetes}
[docs] def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: """ Try to adopt running task instances that have been abandoned by a SchedulerJob dying. Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling) :return: any TaskInstances that were unable to be adopted :rtype: list[airflow.models.TaskInstance] """ local_tis = [ti for ti in tis if ti.queue != self.KUBERNETES_QUEUE] kubernetes_tis = [ti for ti in tis if ti.queue == self.KUBERNETES_QUEUE] return [ *self.local_executor.try_adopt_task_instances(local_tis), *self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis),
[docs] def end(self) -> None: """End local and kubernetes executor""" self.local_executor.end() self.kubernetes_executor.end()
[docs] def terminate(self) -> None: """Terminate local and kubernetes executor""" self.local_executor.terminate() self.kubernetes_executor.terminate()
def _router(self, simple_task_instance: SimpleTaskInstance) -> LocalExecutor | KubernetesExecutor: """ Return either local_executor or kubernetes_executor :param simple_task_instance: SimpleTaskInstance :return: local_executor or kubernetes_executor :rtype: Union[LocalExecutor, KubernetesExecutor] """ if simple_task_instance.queue == self.KUBERNETES_QUEUE: return self.kubernetes_executor return self.local_executor
[docs] def debug_dump(self) -> None: """Called in response to SIGUSR2 by the scheduler""""Dumping LocalExecutor state") self.local_executor.debug_dump()"Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump()
[docs] def send_callback(self, request: CallbackRequest) -> None: """Sends callback for execution. :param request: Callback request to be executed. """ if not self.callback_sink: raise ValueError("Callback sink is not ready.") self.callback_sink.send(request)

Was this entry helpful?