Source code for airflow.executors.debug_executor

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

.. seealso::
    For more information on how the DebugExecutor works, take a look at the guide:
from __future__ import annotations

import threading
from typing import Any

from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.utils.state import State

[docs]class DebugExecutor(BaseExecutor): """ This executor is meant for debugging purposes. It can be used with SQLite. It executes one task instance at time. Additionally to support working with sensors, all sensors ``mode`` will be automatically set to "reschedule". """ _terminated = threading.Event() def __init__(self): super().__init__() self.tasks_to_run: list[TaskInstance] = [] # Place where we keep information for task instance raw run self.tasks_params: dict[TaskInstanceKey, dict[str, Any]] = {} self.fail_fast = conf.getboolean("debug", "fail_fast")
[docs] def execute_async(self, *args, **kwargs) -> None: """The method is replaced by custom trigger_task implementation."""
[docs] def sync(self) -> None: task_succeeded = True while self.tasks_to_run: ti = self.tasks_to_run.pop(0) if self.fail_fast and not task_succeeded:"Setting %s to %s", ti.key, State.UPSTREAM_FAILED) ti.set_state(State.UPSTREAM_FAILED) self.change_state(ti.key, State.UPSTREAM_FAILED) continue if self._terminated.is_set():"Executor is terminated! Stopping %s to %s", ti.key, State.FAILED) ti.set_state(State.FAILED) self.change_state(ti.key, State.FAILED) continue task_succeeded = self._run_task(ti)
def _run_task(self, ti: TaskInstance) -> bool: self.log.debug("Executing task: %s", ti) key = ti.key try: params = self.tasks_params.pop(ti.key, {}), **params) self.change_state(key, State.SUCCESS) return True except Exception as e: ti.set_state(State.FAILED) self.change_state(key, State.FAILED) self.log.exception("Failed to execute task: %s.", str(e)) return False
[docs] def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, pickle_id: str | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, ) -> None: """Queues task instance with empty command because we do not need it.""" self.queue_command( task_instance, [str(task_instance)], # Just for better logging, it's not used anywhere priority=task_instance.task.priority_weight_total, queue=task_instance.task.queue, ) # Save params for TaskInstance._run_raw_task self.tasks_params[task_instance.key] = { "mark_success": mark_success, "pool": pool,
[docs] def trigger_tasks(self, open_slots: int) -> None: """ Triggers tasks. Instead of calling exec_async we just add task instance to tasks_to_run queue. :param open_slots: Number of open slots """ sorted_queue = sorted( self.queued_tasks.items(), key=lambda x: x[1][1], reverse=True, ) for _ in range(min((open_slots, len(self.queued_tasks)))): key, (_, _, _, ti) = sorted_queue.pop(0) self.queued_tasks.pop(key) self.running.add(key) self.tasks_to_run.append(ti) # type: ignore
[docs] def end(self) -> None: """ When the method is called we just set states of queued tasks to UPSTREAM_FAILED marking them as not executed. """ for ti in self.tasks_to_run:"Setting %s to %s", ti.key, State.UPSTREAM_FAILED) ti.set_state(State.UPSTREAM_FAILED) self.change_state(ti.key, State.UPSTREAM_FAILED)
[docs] def terminate(self) -> None: self._terminated.set()
[docs] def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None: self.log.debug("Popping %s from executor task queue.", key) self.running.remove(key) self.event_buffer[key] = state, info

Was this entry helpful?