airflow.executors.celery_executor

CeleryExecutor

See also

For more information on how the CeleryExecutor works, take a look at the guide: Celery Executor

Module Contents

Classes

ExceptionWithTraceback

Wrapper class used to propagate exceptions to parent processes from subprocesses.

CeleryExecutor

CeleryExecutor is recommended for production use of Airflow. It allows

BulkStateFetcher

Gets status for many Celery tasks using the best method available

Functions

execute_command(command_to_exec: airflow.executors.base_executor.CommandType) → None

Executes command.

send_task_to_executor(task_tuple: TaskInstanceInCelery) → Tuple[airflow.models.taskinstance.TaskInstanceKey, airflow.executors.base_executor.CommandType, Union[celery.result.AsyncResult, ExceptionWithTraceback]]

Sends task to executor.

on_celery_import_modules(*args, **kwargs)

Preload some "expensive" airflow modules so that every task process doesn't have to import it again and

fetch_celery_task_state(async_result: celery.result.AsyncResult) → Tuple[str, Union[str, ExceptionWithTraceback], Any]

Fetch and return the state of the given celery task. The scope of this function is

Attributes

log

CELERY_FETCH_ERR_MSG_HEADER

CELERY_SEND_ERR_MSG_HEADER

OPERATION_TIMEOUT

To start the celery worker, run the command:

celery_configuration

app

TaskInstanceInCelery

airflow.executors.celery_executor.log[source]
airflow.executors.celery_executor.CELERY_FETCH_ERR_MSG_HEADER = Error fetching Celery task state[source]
airflow.executors.celery_executor.CELERY_SEND_ERR_MSG_HEADER = Error sending Celery task[source]
airflow.executors.celery_executor.OPERATION_TIMEOUT[source]

To start the celery worker, run the command: airflow celery worker

airflow.executors.celery_executor.celery_configuration[source]
airflow.executors.celery_executor.app[source]
airflow.executors.celery_executor.execute_command(command_to_exec: airflow.executors.base_executor.CommandType) None[source]

Executes command.

class airflow.executors.celery_executor.ExceptionWithTraceback(exception: Exception, exception_traceback: str)[source]

Wrapper class used to propagate exceptions to parent processes from subprocesses.

Parameters
  • exception (Exception) – The exception to wrap

  • exception_traceback (str) – The stacktrace to wrap

airflow.executors.celery_executor.TaskInstanceInCelery[source]
airflow.executors.celery_executor.send_task_to_executor(task_tuple: TaskInstanceInCelery) Tuple[airflow.models.taskinstance.TaskInstanceKey, airflow.executors.base_executor.CommandType, Union[celery.result.AsyncResult, ExceptionWithTraceback]][source]

Sends task to executor.

airflow.executors.celery_executor.on_celery_import_modules(*args, **kwargs)[source]

Preload some “expensive” airflow modules so that every task process doesn’t have to import it again and again.

Loading these for each task adds 0.3-0.5s per task before the task can run. For long running tasks this doesn’t matter, but for short tasks this starts to be a noticeable impact.

class airflow.executors.celery_executor.CeleryExecutor[source]

Bases: airflow.executors.base_executor.BaseExecutor

CeleryExecutor is recommended for production use of Airflow. It allows distributing the execution of task instances to multiple worker nodes.

Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

start(self) None[source]

Executors may need to get things started.

trigger_tasks(self, open_slots: int) None[source]

Overwrite trigger_tasks function from BaseExecutor

Parameters

open_slots – Number of open slots

Returns

sync(self) None[source]

Sync will get called periodically by the heartbeat method. Executors should override this to perform gather statuses.

debug_dump(self) None[source]

Called in response to SIGUSR2 by the scheduler

update_all_task_states(self) None[source]

Updates states of the tasks.

change_state(self, key: airflow.models.taskinstance.TaskInstanceKey, state: str, info=None) None[source]

Changes state of the task.

Parameters
  • info – Executor information for the task instance

  • key – Unique key for the task instance

  • state – State to set for the task.

update_task_state(self, key: airflow.models.taskinstance.TaskInstanceKey, state: str, info: Any) None[source]

Updates state of a single task.

end(self, synchronous: bool = False) None[source]

This method is called when the caller is done submitting job and wants to wait synchronously for the job submitted previously to be all done.

execute_async(self, key: airflow.models.taskinstance.TaskInstanceKey, command: airflow.executors.base_executor.CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None)[source]

Do not allow async execution for Celery executor.

terminate(self)[source]

This method is called when the daemon receives a SIGTERM

try_adopt_task_instances(self, tis: List[airflow.models.taskinstance.TaskInstance]) List[airflow.models.taskinstance.TaskInstance][source]

Try to adopt running task instances that have been abandoned by a SchedulerJob dying.

Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling)

Returns

any TaskInstances that were unable to be adopted

Return type

list[airflow.models.TaskInstance]

airflow.executors.celery_executor.fetch_celery_task_state(async_result: celery.result.AsyncResult) Tuple[str, Union[str, ExceptionWithTraceback], Any][source]

Fetch and return the state of the given celery task. The scope of this function is global so that it can be called by subprocesses in the pool.

Parameters

async_result (tuple(str, celery.result.AsyncResult)) – a tuple of the Celery task key and the async Celery object used to fetch the task’s state

Returns

a tuple of the Celery task key and the Celery state and the celery info of the task

Return type

tuple[str, str, str]

class airflow.executors.celery_executor.BulkStateFetcher(sync_parallelism=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Gets status for many Celery tasks using the best method available

If BaseKeyValueStoreBackend is used as result backend, the mget method is used. If DatabaseBackend is used as result backend, the SELECT …WHERE task_id IN (…) query is used Otherwise, multiprocessing.Pool will be used. Each task status will be downloaded individually.

get_many(self, async_results) Mapping[str, airflow.executors.base_executor.EventBufferValueType][source]

Gets status for many Celery tasks using the best method available.

Was this entry helpful?