airflow.executors.base_executor

Base executor - this is the base class for all the implemented executors.

Module Contents

airflow.executors.base_executor.PARALLELISM :int[source]
airflow.executors.base_executor.NOT_STARTED_MESSAGE = The executor should be started first![source]
airflow.executors.base_executor.CommandType[source]
airflow.executors.base_executor.QueuedTaskInstanceType[source]
airflow.executors.base_executor.EventBufferValueType[source]
class airflow.executors.base_executor.BaseExecutor(parallelism: int = PARALLELISM)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Class to derive in order to interface with executor-type systems like Celery, Kubernetes, Local, Sequential and the likes.

Parameters

parallelism – how many jobs should run at one time. Set to 0 for infinity

job_id :Optional[str][source]
slots_available[source]

Number of new tasks this executor instance can accept

__repr__(self)[source]
start(self)[source]

Executors may need to get things started.

queue_command(self, task_instance: TaskInstance, command: CommandType, priority: int = 1, queue: Optional[str] = None)[source]

Queues command to task

queue_task_instance(self, task_instance: TaskInstance, mark_success: bool = False, pickle_id: Optional[str] = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, pool: Optional[str] = None, cfg_path: Optional[str] = None)[source]

Queues task instance.

has_task(self, task_instance: TaskInstance)[source]

Checks if a task is either queued or running in this executor.

Parameters

task_instance – TaskInstance

Returns

True if the task is known to this executor

sync(self)[source]

Sync will get called periodically by the heartbeat method. Executors should override this to perform gather statuses.

heartbeat(self)[source]

Heartbeat sent to trigger new jobs.

order_queued_tasks_by_priority(self)[source]

Orders the queued tasks by priority.

Returns

List of tuples from the queued_tasks according to the priority.

trigger_tasks(self, open_slots: int)[source]

Triggers tasks

Parameters

open_slots – Number of open slots

change_state(self, key: TaskInstanceKey, state: str, info=None)[source]

Changes state of the task.

Parameters
  • info – Executor information for the task instance

  • key – Unique key for the task instance

  • state – State to set for the task.

fail(self, key: TaskInstanceKey, info=None)[source]

Set fail state for the event.

Parameters
  • info – Executor information for the task instance

  • key – Unique key for the task instance

success(self, key: TaskInstanceKey, info=None)[source]

Set success state for the event.

Parameters
  • info – Executor information for the task instance

  • key – Unique key for the task instance

get_event_buffer(self, dag_ids=None)[source]

Returns and flush the event buffer. In case dag_ids is specified it will only return and flush events for the given dag_ids. Otherwise it returns and flushes all events.

Parameters

dag_ids – to dag_ids to return events for, if None returns all

Returns

a dict of events

execute_async(self, key: TaskInstanceKey, command: CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None)[source]

This method will execute the command asynchronously.

Parameters
  • key – Unique key for the task instance

  • command – Command to run

  • queue – name of the queue

  • executor_config – Configuration passed to the executor.

end(self)[source]

This method is called when the caller is done submitting job and wants to wait synchronously for the job submitted previously to be all done.

terminate(self)[source]

This method is called when the daemon receives a SIGTERM

try_adopt_task_instances(self, tis: List[TaskInstance])[source]

Try to adopt running task instances that have been abandoned by a SchedulerJob dying.

Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling)

Returns

any TaskInstances that were unable to be adopted

Return type

list[airflow.models.TaskInstance]

static validate_command(command: List[str])[source]

Check if the command to execute is airflow command

debug_dump(self)[source]

Called in response to SIGUSR2 by the scheduler

Was this entry helpful?