CeleryExecutor is one of the ways you can scale out the number of workers. For this
to work, you need to setup a Celery backend (RabbitMQ, Redis, ...) and
airflow.cfg to point the executor parameter to
CeleryExecutor and provide the related Celery settings.
For more information about setting up a Celery broker, refer to the exhaustive Celery documentation on the topic.
Here are a few imperative requirements for your workers:
airflowneeds to be installed, and the CLI needs to be in the path
Airflow configuration settings should be homogeneous across the cluster
Operators that are executed on the worker need to have their dependencies met in that context. For example, if you use the
HiveOperator, the hive CLI needs to be installed on that box, or if you use the
MySqlOperator, the required Python library needs to be available in the
The worker needs to have access to its
DAGS_FOLDER, and you need to synchronize the filesystems by your own means. A common setup would be to store your
DAGS_FOLDERin a Git repository and sync it across machines using Chef, Puppet, Ansible, or whatever you use to configure machines in your environment. If all your boxes have a common mount point, having your pipelines files shared there should work as well
To kick off a worker, you need to setup Airflow and kick off the worker subcommand
airflow celery worker
Your worker should start picking up tasks as soon as they get fired in its direction. To stop a worker running on a machine you can use:
airflow celery stop
It will try to stop the worker gracefully by sending
SIGTERM signal to main Celery
process as recommended by
Note that you can also run Celery Flower, a web UI built on top of Celery, to monitor your workers. You can use the shortcut command to start a Flower web server:
airflow celery flower
Please note that you must have the
flower python library already installed on your system. The recommended way is to install the airflow celery bundle.
pip install 'apache-airflow[celery]'
Make sure to use a database backed result backend
Make sure to set a visibility timeout in
[celery_broker_transport_options]that exceeds the ETA of your longest running task
Make sure to set umask in
[worker_umask]to set permissions for newly created files by workers.
Tasks can consume resources. Make sure your worker has enough resources to run
Queue names are limited to 256 characters, but each broker backend might have its own restrictions
See Modules Management for details on how Python and Airflow manage modules.
Airflow consist of several components:
Workers - Execute the assigned tasks
Scheduler - Responsible for adding the necessary tasks to the queue
Web server - HTTP Server provides access to DAG/task status information
Database - Contains information about the status of tasks, DAGs, Variables, connections, etc.
Celery - Queue mechanism
Please note that the queue at Celery consists of two components:
Broker - Stores commands for execution
Result backend - Stores status of completed commands
The components communicate with each other in many places
 Web server --> Workers - Fetches task execution logs
 Web server --> DAG files - Reveal the DAG structure
 Web server --> Database - Fetch the status of the tasks
 Workers --> DAG files - Reveal the DAG structure and execute the tasks
 Workers --> Database - Gets and stores information about connection configuration, variables and XCOM.
 Workers --> Celery's result backend - Saves the status of tasks
 Workers --> Celery's broker - Stores commands for execution
 Scheduler --> DAG files - Reveal the DAG structure and execute the tasks
 Scheduler --> Database - Store a DAG run and related tasks
 Scheduler --> Celery's result backend - Gets information about the status of completed tasks
 Scheduler --> Celery's broker - Put the commands to be executed
Task execution process¶
Initially, two processes are running:
SchedulerProcess - process the tasks and run using CeleryExecutor
WorkerProcess - observes the queue waiting for new tasks to appear
WorkerChildProcess - waits for new tasks
Two databases are also available:
During this process, two 2 process are created:
LocalTaskJobProcess - It logic is described by LocalTaskJob. It is monitoring RawTaskProcess. New processes are started using TaskRunner.
RawTaskProcess - It is process with the user code e.g.
execute_command(). It creates a new process - LocalTaskJobProcess.
LocalTaskJobclass. It starts new process using TaskRunner.
When using the CeleryExecutor, the Celery queues that tasks are sent to
can be specified.
queue is an attribute of BaseOperator, so any
task can be assigned to any queue. The default queue for the environment
is defined in the
operators -> default_queue. This defines
the queue that tasks get assigned to when not specified, as well as which
queue Airflow workers listen to when started.
Workers can listen to one or multiple queues of tasks. When a worker is
started (using command
airflow celery worker), a set of comma-delimited queue
names (with no whitespace) can be given (e.g.
airflow celery worker -q spark,quark).
This worker will then only pick up tasks wired to the specified queue(s).
This can be useful if you need specialized workers, either from a resource perspective (for say very lightweight tasks where one worker could take thousands of tasks without a problem), or from an environment perspective (you want a worker running from within the Spark cluster itself because it needs a very specific environment and security rights).