airflow.executors.celery_executor
¶
CeleryExecutor
See also
For more information on how the CeleryExecutor works, take a look at the guide: Celery Executor
Module Contents¶
-
airflow.executors.celery_executor.
CELERY_FETCH_ERR_MSG_HEADER
= Error fetching Celery task state[source]¶
-
airflow.executors.celery_executor.
OPERATION_TIMEOUT
[source]¶ To start the celery worker, run the command: airflow celery worker
-
airflow.executors.celery_executor.
execute_command
(command_to_exec: CommandType) → None[source]¶ -
Executes command.
-
airflow.executors.celery_executor.
_execute_in_subprocess
(command_to_exec: CommandType) → None[source]¶
-
class
airflow.executors.celery_executor.
ExceptionWithTraceback
(exception: Exception, exception_traceback: str)[source]¶ Wrapper class used to propagate exceptions to parent processes from subprocesses.
-
airflow.executors.celery_executor.
send_task_to_executor
(task_tuple: TaskInstanceInCelery) → Tuple[TaskInstanceKey, CommandType, Union[AsyncResult, ExceptionWithTraceback]][source]¶ -
Sends task to executor.
-
airflow.executors.celery_executor.
on_celery_import_modules
(*args, **kwargs)[source]¶ -
Preload some "expensive" airflow modules so that every task process doesn't have to import it again and
-
again.
Loading these for each task adds 0.3-0.5s per task before the task can run. For long running tasks this doesn’t matter, but for short tasks this starts to be a noticeable impact.
-
class
airflow.executors.celery_executor.
CeleryExecutor
[source]¶ Bases:
airflow.executors.base_executor.BaseExecutor
CeleryExecutor is recommended for production use of Airflow. It allows distributing the execution of task instances to multiple worker nodes.
Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
-
_num_tasks_per_send_process
(self, to_send_count: int)[source]¶ How many Celery tasks should each worker process send.
- Returns
Number of tasks that should be sent per process
- Return type
-
trigger_tasks
(self, open_slots: int)[source]¶ Overwrite trigger_tasks function from BaseExecutor
- Parameters
open_slots – Number of open slots
- Returns
-
_process_tasks
(self, task_tuples_to_send: List[TaskInstanceInCelery])[source]¶
-
_send_tasks_to_celery
(self, task_tuples_to_send: List[TaskInstanceInCelery])[source]¶
-
_check_for_stalled_adopted_tasks
(self)[source]¶ See if any of the tasks we adopted from another Executor run have not progressed after the configured timeout.
If they haven’t, they likely never made it to Celery, and we should just resend them. We do that by clearing the state and letting the normal scheduler loop deal with that
-
update_task_state
(self, key: TaskInstanceKey, state: str, info: Any)[source]¶ Updates state of a single task.
-
-
airflow.executors.celery_executor.
fetch_celery_task_state
(async_result: AsyncResult) → Tuple[str, Union[str, ExceptionWithTraceback], Any][source]¶ -
Fetch and return the state of the given celery task. The scope of this function is
-
global so that it can be called by subprocesses in the pool.
-
class
airflow.executors.celery_executor.
BulkStateFetcher
(sync_parallelism=None)[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
Gets status for many Celery tasks using the best method available
If BaseKeyValueStoreBackend is used as result backend, the mget method is used. If DatabaseBackend is used as result backend, the SELECT …WHERE task_id IN (…) query is used Otherwise, multiprocessing.Pool will be used. Each task status will be downloaded individually.