Kubernetes Executor¶
The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster.
KubernetesExecutor runs as a process in the Airflow Scheduler. The scheduler itself does not necessarily need to be running on Kubernetes, but does need access to a Kubernetes cluster.
KubernetesExecutor requires a non-sqlite database in the backend.
When a DAG submits a task, the KubernetesExecutor requests a worker pod from the Kubernetes API. The worker pod then runs the task, reports the result, and terminates.
One example of an Airflow deployment running on a distributed set of five nodes in a Kubernetes cluster is shown below.
Consistent with the regular Airflow architecture, the Workers need access to the DAG files to execute the tasks within those DAGs and interact with the Metadata repository. Also, configuration information specific to the Kubernetes Executor, such as the worker namespace and image information, needs to be specified in the Airflow Configuration file.
Additionally, the Kubernetes Executor enables specification of additional features on a per-task basis using the Executor config.
Configuration¶
pod_template_file¶
To customize the pod used for k8s executor worker processes, you may create a pod template file. You must provide
the path to the template file in the pod_template_file
option in the kubernetes
section of airflow.cfg
.
Airflow has two strict requirements for pod template files: base image and pod name.
Base image¶
A pod_template_file
must have a container named base
at the spec.containers[0]
position, and
its image
must be specified.
You are free to create sidecar containers after this required container, but Airflow assumes that the
airflow worker container exists at the beginning of the container array, and assumes that the
container is named base
.
Note
Airflow may override the base container image
, e.g. through pod_override
configuration; but it must be present in the template file and must not be blank.
Pod name¶
The pod’s metadata.name
must be set in the template file. This field will always be set dynamically at
pod launch to guarantee uniqueness across all pods. But again, it must be included in the template, and cannot
be left blank.
Example pod templates¶
With these requirements in mind, here are some examples of basic pod_template_file
YAML files.
Note
The examples below should work when using default airflow configuration values. However, many custom configuration values need to be explicitly passed to the pod via this template too. This includes, but is not limited to, sql configuration, required Airflow connections, dag folder path and logging settings. See Configuration Reference for details.
Storing DAGs in the image:
airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
Storing DAGs in a persistentVolume
:
airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: true
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- name: dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
Pulling DAGs from git
:
airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
initContainers:
- name: git-sync
image: "k8s.gcr.io/git-sync/git-sync:v3.3.0"
env:
- name: GIT_SYNC_BRANCH
value: "v2-1-stable"
- name: GIT_SYNC_REPO
value: "https://github.com/apache/airflow.git"
- name: GIT_SYNC_DEPTH
value: "1"
- name: GIT_SYNC_ROOT
value: "/git"
- name: GIT_SYNC_DEST
value: "repo"
- name: GIT_SYNC_ADD_USER
value: "true"
- name: GIT_SYNC_ONE_TIME
value: "true"
volumeMounts:
- name: airflow-dags
mountPath: /git
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
subPath: repo/airflow/example_dags
readOnly: false
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- name: airflow-dags
emptyDir: {}
- name: airflow-logs
emptyDir: {}
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
pod_override¶
When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis.
To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides.
Please note that the scheduler will override the metadata.name
of the V1pod before launching it.
To overwrite the base container of the pod launched by the KubernetesExecutor, create a V1pod with a single container, and overwrite the fields as follows:
executor_config_volume_mount = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
}
@task(executor_config=executor_config_volume_mount)
def test_volume_mount():
"""
Tests whether the volume has been mounted.
"""
with open('/foo/volume_mount_test.txt', 'w') as foo:
foo.write('Hello')
return_code = os.system("cat /foo/volume_mount_test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
volume_task = test_volume_mount()
Note that the following fields will all be extended instead of overwritten. From spec: volumes, and init_containers. From container: volume mounts, environment variables, ports, and devices.
To add a sidecar container to the launched pod, create a V1pod with an empty first container with the
name base
and a second container containing your desired sidecar.
executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=["echo \"retrieved from mount\" > /shared/test.txt"],
command=["bash", "-cx"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
],
)
),
}
@task(executor_config=executor_config_sidecar)
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
except ValueError as e:
if i > 4:
raise e
sidecar_task = test_sharedvolume_mount()
You can also create custom pod_template_file
on a per-task basis so that you can recycle the same base values between multiple tasks.
This will replace the default pod_template_file
named in the airflow.cfg and then override that template using the pod_override
.
Here is an example of a task with both features:
import os
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME
from kubernetes.client import models as k8s
with DAG(
dag_id="example_pod_template_file",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example3"],
) as dag:
executor_config_template = {
"pod_template_file": os.path.join(
AIRFLOW_HOME, "pod_templates/basic_template.yaml"
),
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
),
}
@task(executor_config=executor_config_template)
def task_with_template():
print_stuff()
Managing dags and logs¶
Use of persistent volumes is optional and depends on your configuration.
Dags:
To get the DAGs into the workers, you can:
Include dags in the image.
Use
git-sync
which, before starting the worker container, will run agit pull
of the dags repository.Storing dags on a persistent volume, which can be mounted on all workers.
Logs:
To get task logs out of the workers, you can:
Use a persistent volume mounted on both the webserver and workers.
Enable remote logging.
Note
If you don’t enable logging persistence, and if you have not enabled remote logging, logs will be lost after the worker pods shut down.
Comparison with CeleryExecutor¶
In contrast to CeleryExecutor, KubernetesExecutor does not require additional components such as Redis and Flower, but does require access to Kubernetes cluster.
With KubernetesExecutor, each task runs in its own pod. The pod is created when the task is queued, and terminates when the task completes. Historically, in scenarios such as burstable workloads, this presented a resource utilization advantage over CeleryExecutor, where you needed a fixed number of long-running celery worker pods, whether or not there were tasks to run.
However, the official Apache Airflow Helm chart can automatically scale celery workers down to zero based on the number of tasks in the queue, so when using the official chart, this is no longer an advantage.
With Celery workers you will tend to have less task latency because the worker pod is already up and running when the task is queued. On the other hand, because multiple tasks are running in the same pod, with Celery you may have to be more mindful about resource utilization in your task design, particularly memory consumption.
One scenario where KubernetesExecutor can be helpful is if you have long-running tasks, because if you deploy while a task is running, the task will keep running until it completes (or times out, etc). But with CeleryExecutor, provided you have set a grace period, the task will only keep running up until the grace period has elapsed, at which time the task will be terminated. Another scenario where KubernetesExecutor can work well is when your tasks are not very uniform with respect to resource requirements or images.
Finally, note that it does not have to be either-or; with CeleryKubernetesExecutor, it is possible to use both CeleryExecutor and
KubernetesExecutor simultaneously on the same cluster. CeleryKubernetesExecutor will look at a task’s queue
to determine
whether to run on Celery or Kubernetes. By default, tasks are sent to Celery workers, but if you want a task to run using KubernetesExecutor,
you send it to the kubernetes
queue and it will run in its own pod. And KubernetesPodOperator can be used
to similar effect, no matter what executor you are using.
Fault Tolerance¶
Tip
To troubleshoot issues with KubernetesExecutor, you can use airflow kubernetes generate-dag-yaml
command.
This command generates the pods as they will be launched in Kubernetes and dumps them into yaml files for you to inspect.
Handling Worker Pod Crashes¶
When dealing with distributed systems, we need a system that assumes that any component can crash at any moment for reasons ranging from OOM errors to node upgrades.
In the case where a worker dies before it can report its status to the backend DB, the executor can use a Kubernetes watcher thread to discover the failed pod.
A Kubernetes watcher is a thread that can subscribe to every change that occurs in Kubernetes’ database. It is alerted when pods start, run, end, and fail. By monitoring this stream, the KubernetesExecutor can discover that the worker crashed and correctly report the task as failed.
But What About Cases Where the Scheduler Pod Crashes?¶
In cases of scheduler crashes, the scheduler will recover its state using the watcher’s resourceVersion
.
When monitoring the Kubernetes cluster’s watcher thread, each event has a monotonically rising number called a resourceVersion
.
Every time the executor reads a resourceVersion
, the executor stores the latest value in the backend database.
Because the resourceVersion is stored, the scheduler can restart and continue reading the watcher stream from where it left off.
Since the tasks are run independently of the executor and report results directly to the database, scheduler failures will not lead to task failures or re-runs.