Warning
The Batch Executor is alpha/experimental at the moment and may be subject to change without warning.
AWS Batch Executor¶
This is an Airflow executor powered by Amazon Batch. Each task scheduled by Airflow is run inside a separate container, scheduled by Batch. Some benefits of an executor like this include:
Scalability and Lower Costs: AWS Batch allows the ability to dynamically provision the resources needed to execute tasks. Depending on the resources allocated, AWS Batch can autoscale up or down based on the workload, ensuring efficient resource utilization and reducing costs.
Job Queues and Priority: AWS Batch provides the concept of job queues, allowing the ability to prioritize and manage the execution of tasks. This ensures that when multiple tasks are scheduled simultaneously, they are executed in the desired order of priority.
Flexibility: AWS Batch supports Fargate (ECS), EC2 and EKS compute environments. This range of compute environments, as well as the ability to finely define the resources allocated to the compute environments gives a lot of flexibility to users in choosing the most suitable execution environment for their workloads.
Rapid Task Execution: By maintaining an active worker within AWS Batch, tasks submitted to the service can be executed swiftly. With a ready-to-go worker, there’s minimal startup delay, ensuring tasks commence immediately upon submission. This feature is particularly advantageous for time-sensitive workloads or applications requiring near-real-time processing, enhancing overall workflow efficiency and responsiveness.
For a quick start guide please see here, it will get you up and running with a basic configuration.
The below sections provide more generic details about configuration, the provided example Dockerfile and logging.
Config Options¶
There are a number of configuration options available, which can either
be set directly in the airflow.cfg file under an “aws_batch_executor”
section or via environment variables using the
AIRFLOW__AWS_BATCH_EXECUTOR__<OPTION_NAME>
format, for example
AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE = "myJobQueue"
. For
more information on how to set these options, see Setting Configuration
Options
Note
Configuration options must be consistent across all the hosts/environments running the Airflow components (Scheduler, Webserver, Executor managed resources, etc). See here for more details on setting configurations.
In the case of conflicts, the order of precedence from lowest to highest is:
Load default values for options which have defaults.
Load any values explicitly provided through airflow.cfg or environment variables. These are checked with Airflow’s config precedence.
Load any values provided in the SUBMIT_JOB_KWARGS option if one is provided.
Note
exec_config
is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Batch Executor it represents a submit_job_kwargs
configuration which is then updated over-top of the submit_job_kwargs
specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: submit_job_kwargs.update(exec_config)
Required config options:¶
JOB_QUEUE - The job queue where the job is submitted. Required.
JOB_DEFINITION - The job definition used by this job. Required.
JOB_NAME - The name of the AWS Batch Job. Required.
REGION_NAME - The name of the AWS Region where Amazon Batch is configured. Required.
Optional config options:¶
AWS_CONN_ID - The Airflow connection (i.e. credentials) used by the Batch executor to make API calls to AWS Batch. Defaults to “aws_default”.
SUBMIT_JOB_KWARGS - A JSON string containing arguments to provide the Batch
submit_job
API.MAX_SUBMIT_JOB_ATTEMPTS - The maximum number of times the Batch Executor should attempt to submit a job. This refers to instances where the job fails to start (i.e. API failures, container failures etc.)
CHECK_HEALTH_ON_STARTUP - Whether or not to check the Batch Executor health on startup
For a more detailed description of available options, including type
hints and examples, see the config_templates
folder in the Amazon
provider package.
Note
exec_config
is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Batch Executor it represents a submit_job_kwargs
configuration which is then updated over-top of the submit_job_kwargs
specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: submit_job_kwargs.update(exec_config)
Dockerfile for AWS Batch Executor¶
An example Dockerfile can be found here, it creates an image that can be used by AWS Batch to run Airflow tasks using the AWS Batch Executor in Apache Airflow. The image supports AWS CLI/API integration, allowing you to interact with AWS services within your Airflow environment. It also includes options to load DAGs (Directed Acyclic Graphs) from either an S3 bucket or a local folder.
Prerequisites¶
Docker must be installed on your system. Instructions for installing Docker can be found here.
Building an Image¶
The AWS CLI will be installed within the image, and there are multiple ways to pass AWS authentication information to the container and thus multiple ways to build the image. This guide will cover 2 methods.
The most secure method is to use IAM roles. When creating an AWS Batch Job
Definition, you are able to select a Job Role and an Execution
Role. The Execution Role is the role that is used by the container
agent to make AWS API requests on your behalf. Depending on what compute is being
used by the Batch Executor, the appropriate policy needs to be attached to the Execution Role.
Additionally, the role also needs to have at least the CloudWatchLogsFullAccess
(or CloudWatchLogsFullAccessV2
) policies. The Job Role is the role that is
used by the containers to make AWS API requests. This role needs to have
permissions based on the tasks that are described in the DAG being run.
If you are loading DAGs via an S3 bucket, this role needs to have
permission to read the S3 bucket.
To create a new Job Role or Execution Role, follow the steps below:
Navigate to the IAM page on the AWS console, and from the left hand tab, under Access Management, select Roles.
On the Roles page, click Create role on the top right hand corner.
Under Trusted entity type, select AWS Service.
Select applicable use case.
In the Permissions page, select the permissions the role will need, depending on whether the role is a Job Role or an Execution Role. Click Next after selecting all the required permissions.
Enter a name for the new role, and an optional description. Review the Trusted Entities, and the permissions for the role. Add any tags as necessary, and click Create role.
When creating the Job Definition for Batch (see the setup guide for more details), select the appropriate newly created Job Role and Execution role for the Job Definition.
Then you can build your image by cd
-ing to the directory with the Dockerfile and running:
docker build -t my-airflow-image \
--build-arg aws_default_region=YOUR_DEFAULT_REGION .
Note: It is important that images are built and run under the same architecture. For example,
for users on Apple Silicon, you may want to specify the arch using docker buildx
:
docker buildx build --platform=linux/amd64 -t my-airflow-image \
--build-arg aws_default_region=YOUR_DEFAULT_REGION .
See
here for more information
about using docker buildx
.
The second method is to use the build-time arguments
(aws_access_key_id
, aws_secret_access_key
,
aws_default_region
, and aws_session_token
).
Note: This method is not recommended for use in production environments, because user credentials are stored in the container, which may be a security vulnerability.
To pass AWS authentication information using these arguments, use the
--build-arg
option during the Docker build process. For example:
docker build -t my-airflow-image \
--build-arg aws_access_key_id=YOUR_ACCESS_KEY \
--build-arg aws_secret_access_key=YOUR_SECRET_KEY \
--build-arg aws_default_region=YOUR_DEFAULT_REGION \
--build-arg aws_session_token=YOUR_SESSION_TOKEN .
Replace YOUR_ACCESS_KEY
, YOUR_SECRET_KEY
,
YOUR_SESSION_TOKEN
, and YOUR_DEFAULT_REGION
with valid AWS
credentials.
Base Image¶
The Docker image is built upon the apache/airflow:latest
image. See
here for more information
about the image.
Important note: The Airflow and python versions in this image must align with the Airflow and python versions on the host/container which is running the Airflow scheduler process (which in turn runs the executor). The Airflow version of the image can be verified by running the container locally with the following command:
docker run <image_name> version
Similarly, the python version of the image can be verified the following command:
docker run <image_name> python --version
Ensure that these versions match the versions on the host/container
which is running the Airflow scheduler process (and thus, the Batch
executor.) Apache Airflow images with specific python versions can be
downloaded from the Dockerhub registry, and filtering tags by the
python
version.
For example, the tag latest-python3.8
specifies that the image will
have python 3.8 installed.
Loading DAGs¶
There are many ways to load DAGs on a container managed by Batch. This Dockerfile is preconfigured with two possible ways: copying from a local folder, or downloading from an S3 bucket. Other methods of loading DAGs are possible as well.
From S3 Bucket¶
To load DAGs from an S3 bucket, uncomment the entrypoint line in the
Dockerfile to synchronize the DAGs from the specified S3 bucket to the
/opt/airflow/dags
directory inside the container. You can optionally
provide container_dag_path
as a build argument if you want to store
the DAGs in a directory other than /opt/airflow/dags
.
Add --build-arg s3_uri=YOUR_S3_URI
in the docker build command.
Replace YOUR_S3_URI
with the URI of your S3 bucket. Make sure you
have the appropriate permissions to read from the bucket.
Note that the following command is also passing in AWS credentials as build arguments.
docker build -t my-airflow-image \
--build-arg aws_access_key_id=YOUR_ACCESS_KEY \
--build-arg aws_secret_access_key=YOUR_SECRET_KEY \
--build-arg aws_default_region=YOUR_DEFAULT_REGION \
--build-arg aws_session_token=YOUR_SESSION_TOKEN \
--build-arg s3_uri=YOUR_S3_URI .
From Local Folder¶
To load DAGs from a local folder, place your DAG files in a folder
within the docker build context on your host machine, and provide the
location of the folder using the host_dag_path
build argument. By
default, the DAGs will be copied to /opt/airflow/dags
, but this can
be changed by passing the container_dag_path
build-time argument
during the Docker build process:
docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .
If choosing to load DAGs onto a different path than
/opt/airflow/dags
, then the new path will need to be updated in the
Airflow config.
Installing Python Dependencies¶
This Dockerfile supports installing Python dependencies via pip
from
a requirements.txt
file. Place your requirements.txt
file in the
same directory as the Dockerfile. If it is in a different location, it
can be specified using the requirements_path
build-argument. Keep in
mind the Docker context when copying the requirements.txt
file.
Uncomment the two appropriate lines in the Dockerfile that copy the
requirements.txt
file to the container, and run pip install
to
install the dependencies on the container.
Logging¶
Airflow tasks executed via this executor run in containers within the configured VPC. This means that logs are not directly accessible to the Airflow Webserver and when containers are stopped, after task completion, the logs would be permanently lost.
Remote logging should be employed when using the Batch executor to persist your Airflow Task logs and make them viewable from the Airflow Webserver.
Configuring Remote Logging¶
There are many ways to configure remote logging and several supported destinations. A general overview of Airflow Task logging can be found here. Instructions for configuring S3 remote logging can be found here and Cloudwatch remote logging here. Some important things to point out for remote logging in the context of the Batch executor:
The configuration options for Airflow remote logging should be configured on all hosts and containers running Airflow. For example the Webserver requires this config so that it can fetch logs from the remote location and the containers run by Batch Executor require the config so that they can upload the logs to the remote location. See here to read more about how to set Airflow configuration via config file or environment variable exports.
Adding the Airflow remote logging config to the container can be done in many ways. Some examples include, but are not limited to:
Exported as environment variables directly in the Dockerfile (see the Dockerfile section above)
Updating the
airflow.cfg
file or copy/mounting/downloading a customairflow.cfg
in the Dockerfile.Added in the Job Definition as an environment variable
You must have credentials configured within the container to be able to interact with the remote service for your logs (e.g. S3, CloudWatch Logs, etc). This can be done in many ways. Some examples include, but are not limited to:
Export credentials into the Dockerfile directly (see the Dockerfile section above)
Configure an Airflow Connection and provide this as the remote logging conn id (exported into the container by any of the means listed above or your preferred method). Airflow will then use these credentials specifically for interacting with your chosen remote logging destination.
Note
Configuration options must be consistent across all the hosts/environments running the Airflow components (Scheduler, Webserver, Executor managed resources, etc). See here for more details on setting configurations.
Setting up a Batch Executor for Apache Airflow¶
There are 3 steps involved in getting a Batch Executor to work in Apache Airflow:
Creating a database that Airflow and the tasks executed by Batch can connect to.
Creating and configuring Batch resources that can run tasks from Airflow.
Configuring Airflow to use the Batch Executor and the database.
There are different options for selecting a database backend. See here for more information about the different options supported by Airflow. The following guide will explain how to set up a PostgreSQL RDS Instance on AWS.
Setting up an RDS DB Instance for AWS Batch Executor¶
Create the RDS DB Instance¶
Log in to your AWS Management Console and navigate to the RDS service.
Click “Create database” to start creating a new RDS instance.
Choose the “Standard create” option, and select PostreSQL.
Select the appropriate template, availability and durability.
NOTE: At the time of this writing, the “Multi-AZ DB Cluster” option does not support setting the database name, which is a required step below.
Set the DB Instance name, the username and password.
Choose the instance configuration, and storage parameters.
In the Connectivity section, select Don’t connect to an EC2 compute resource
Select or create a VPC and subnet, and allow public access to the DB. Select or create security group and select the Availability Zone.
Open the Additional Configuration tab and set the database name to
airflow_db
.Select other settings as required, and create the database by clicking Create database.
Test Connectivity¶
In order to be able to connect to the new RDS instance, you need to allow inbound traffic to the database from your IP address.
Under the “Security” heading in the “Connectivity & security” tab of the RDS instance, find the link to the VPC security group for your new RDS DB instance.
Create an inbound rule that allows traffic from your IP address(es) on TCP port 5432 (PostgreSQL).
Confirm that you can connect to the DB after modifying the security group. This will require having
psql
installed. Instructions for installingpsql
can be found here.
NOTE: Be sure that the status of your DB is Available before testing connectivity
psql -h <endpoint> -p 5432 -U <username> <db_name>
The endpoint can be found on the “Connectivity and Security” tab, the username (and password) are the credentials used when creating the database.
The db_name should be airflow_db
(unless a different one was used when creating the database.)
You will be prompted to enter the password if the connection is successful.
Setting up AWS Batch¶
AWS Batch can be configured in various ways, with differing orchestration types depending on the use case. For simplicity, this guide will cover how to set up Batch with EC2.
In order to set up AWS Batch so that it will work with Apache Airflow, you will need a Docker image that is properly configured. See the Dockerfile section for instructions on how to do that.
Once the image is built, it needs to be put in a repository where it can be pulled by a container. There are multiple ways to accomplish this. This guide will go over doing this using Amazon Elastic Container Registry (ECR).
Create an ECR Repository¶
Log in to your AWS Management Console and navigate to the ECR service.
Click Create repository.
Name the repository and fill out other information as required.
Click Create Repository.
Once the repository has been created, click on the repository. Click on the “View push commands” button on the top right.
Follow the instructions to push the Docker image, replacing image names as appropriate. Ensure the image is uploaded by refreshing the page once the image is pushed.
Configuring AWS Batch¶
Log in to your AWS Management Console and navigate to the AWS Batch landing page.
On the left hand side bar, click Wizard. This Wizard will guide you to creating all the required resources to run Batch jobs.
Select the orchestration as Amazon EC2.
Click Next.
Create a Compute Environment¶
Choose a name for the compute environment, tags and any appropriate instance configuration. Here, you may select the minimum, maximum and desired number of vCPU’s, as well as the type of EC2 instances you would like to use.
For Instance Role, choose to create a new instance profile or use an existing instance profile that has the required IAM permissions attached. This instance profile allows the Amazon ECS container instances that are created for your compute environment to make calls to the required AWS API operations on your behalf.
Select a VPC which allows access to internet, as well as a security group with the necessary permissions.
Click Next.
Create a Job Queue¶
Select a name for the job queue, as well as a priority. The compute environment will be set to the one created in the previous step.
Create a Job Definition¶
Choose a name for the Job Definition.
Select the appropriate platform configurations. Ensure
Assign public IP
is enabled.Select an Execution Role, and ensure the role has the required permissions to accomplish its tasks.
Enter the image URI of the image that was pushed in the previous section. Make sure the role being used has the required permissions to pull the image.
Select an appropriate Job role, keeping in mind the requirements of the tasks being run.
Configure the environment as required. You may specify the number of vCPU’s, memory or GPUs available to the container. Additionally, add the following environment variables to the container:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
, with the value being the PostgreSQL connection string in the following format using the values set during the Database section above:
postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
Add other configuration as necessary for Airflow generally (see here), the Batch executor (see here) or for remote logging (see here). Note that any configuration changes should be made across the entire Airflow environment to keep configuration consistent.
Click Next.
In Review and Create page, review all the selections, and once everything is correct, click Create Resources.
Allow Containers to Access RDS Database¶
As a final step, access to the database must be configured for the containers managed by Batch. Many different networking configurations are possible, but one possible approach is:
Log in to your AWS Management Console and navigate to the VPC Dashboard.
On the left hand, under the Security heading, click Security groups.
Select the security group associated with your RDS instance, and click Edit inbound rules.
Add a new rule that allows PostgreSQL type traffic to the CIDR of the subnet(s) associated with the Batch Compute Environment.
Configure Airflow¶
To configure Airflow to utilize the Batch Executor and leverage the resources we’ve set up, ensure the following environment variables are defined:
AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.batch.batch_executor.AwsBatchExecutor'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>
AIRFLOW__AWS_BATCH_EXECUTOR__REGION_NAME=<executor-region>
AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE=<batch-job-queue>
AIRFLOW__AWS_BATCH_EXECUTOR__JOB_DEFINITION=<batch-job-definition>
AIRFLOW__AWS_BATCH_EXECUTOR__JOB_NAME=<batch-job-name>
This script should be run on the host(s) running the Airflow Scheduler and Webserver, before those processes are started.
The script sets environment variables that configure Airflow to use the Batch Executor and provide necessary information for task execution. Any other configuration changes made (such as for remote logging) should be added to this example script to keep configuration consistent across the Airflow environment.
Initialize the Airflow DB¶
The Airflow DB needs to be initialized before it can be used and a user needs to be added for you to log in. The below command adds an admin user (the command will also initialize the DB if it hasn’t been already):
airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin