airflow.providers.celery.executors.celery_executor_utils¶
Utilities and classes used by the Celery Executor.
Much of this code is expensive to import/load, be careful where this module is imported.
Attributes¶
Classes¶
Wrapper class used to propagate exceptions to parent processes from subprocesses. |
|
Gets status for many Celery tasks using the best method available. |
Functions¶
Get the Celery configuration dictionary. |
|
|
Create a Celery app, supporting team-specific configuration. |
|
Preload some "expensive" airflow modules once, so other task processes won't have to import it again. |
|
|
|
|
|
Execute command. |
|
Send workload to executor (serialized and executed as a Celery task). |
|
Fetch and return the state of the given celery task (workload execution). |
Module Contents¶
- airflow.providers.celery.executors.celery_executor_utils.CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state'[source]¶
- airflow.providers.celery.executors.celery_executor_utils.get_celery_configuration()[source]¶
Get the Celery configuration dictionary.
- airflow.providers.celery.executors.celery_executor_utils.create_celery_app(team_conf)[source]¶
Create a Celery app, supporting team-specific configuration.
- Parameters:
team_conf (airflow.executors.base_executor.ExecutorConf | airflow.configuration.AirflowConfigParser) – ExecutorConf instance with team-specific configuration, or global conf
- Returns:
Celery app instance
- Return type:
- airflow.providers.celery.executors.celery_executor_utils.on_celery_import_modules(*args, **kwargs)[source]¶
Preload some “expensive” airflow modules once, so other task processes won’t have to import it again.
Loading these for each task adds 0.3-0.5s per task before the task can run. For long-running tasks this doesn’t matter, but for short tasks this starts to be a noticeable impact.
- airflow.providers.celery.executors.celery_executor_utils.on_celery_worker_ready(*args, **kwargs)[source]¶
- airflow.providers.celery.executors.celery_executor_utils.execute_command(command_to_exec)[source]¶
Execute command.
- class airflow.providers.celery.executors.celery_executor_utils.ExceptionWithTraceback(exception, exception_traceback)[source]¶
Wrapper class used to propagate exceptions to parent processes from subprocesses.
- Parameters:
exception (BaseException) – The exception to wrap
exception_traceback (str) – The stacktrace to wrap
- airflow.providers.celery.executors.celery_executor_utils.send_workload_to_executor(workload_tuple)[source]¶
Send workload to executor (serialized and executed as a Celery task).
This function is called in ProcessPoolExecutor subprocesses. To avoid pickling issues with team-specific Celery apps, we pass the team_name and reconstruct the Celery app here.
- airflow.providers.celery.executors.celery_executor_utils.fetch_celery_task_state(async_result)[source]¶
Fetch and return the state of the given celery task (workload execution).
The scope of this function is global so that it can be called by subprocesses in the pool.
- Parameters:
async_result (celery.result.AsyncResult) – a tuple of the Celery task key and the async Celery object used to fetch the task’s state
- Returns:
a tuple of the Celery task key and the Celery state and the celery info of the task
- Return type:
tuple[str, str | ExceptionWithTraceback, Any]
- class airflow.providers.celery.executors.celery_executor_utils.BulkStateFetcher(sync_parallelism, celery_app=None)[source]¶
Bases:
airflow.utils.log.logging_mixin.LoggingMixinGets status for many Celery tasks using the best method available.
If BaseKeyValueStoreBackend is used as result backend, the mget method is used. If DatabaseBackend is used as result backend, the SELECT …WHERE task_id IN (…) query is used Otherwise, multiprocessing.Pool will be used. Each task status will be downloaded individually.