Kubernetes Executor

The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster.

KubernetesExecutor runs as a process in the Airflow Scheduler. The scheduler itself does not necessarily need to be running on Kubernetes, but does need access to a Kubernetes cluster.

KubernetesExecutor requires a non-sqlite database in the backend.

When a DAG submits a task, the KubernetesExecutor requests a worker pod from the Kubernetes API. The worker pod then runs the task, reports the result, and terminates.

../_images/arch-diag-kubernetes.png

One example of an Airflow deployment running on a distributed set of five nodes in a Kubernetes cluster is shown below.

../_images/arch-diag-kubernetes2.png

Consistent with the regular Airflow architecture, the Workers need access to the DAG files to execute the tasks within those DAGs and interact with the Metadata repository. Also, configuration information specific to the Kubernetes Executor, such as the worker namespace and image information, needs to be specified in the Airflow Configuration file.

Additionally, the Kubernetes Executor enables specification of additional features on a per-task basis using the Executor config.

../_images/k8s-happy-path.png

Configuration

pod_template_file

To customize the pod used for k8s executor worker processes, you may create a pod template file. You must provide the path to the template file in the pod_template_file option in the kubernetes section of airflow.cfg.

Airflow has two strict requirements for pod template files: base image and pod name.

Base image

A pod_template_file must have a container named base at the spec.containers[0] position, and its image must be specified.

You are free to create sidecar containers after this required container, but Airflow assumes that the airflow worker container exists at the beginning of the container array, and assumes that the container is named base.

Note

Airflow may override the base container image, e.g. through pod_override configuration; but it must be present in the template file and must not be blank.

Pod name

The pod's metadata.name must be set in the template file. This field will always be set dynamically at pod launch to guarantee uniqueness across all pods. But again, it must be included in the template, and cannot be left blank.

Example pod templates

With these requirements in mind, here are some examples of basic pod_template_file YAML files.

Note

The examples below should work when using default Airflow configuration values. However, many custom configuration values need to be explicitly passed to the pod via this template too. This includes, but is not limited to, sql configuration, required Airflow connections, dag folder path and logging settings. See Configuration Reference for details.

Storing DAGs in the image:

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

Storing DAGs in a persistentVolume:

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

Pulling DAGs from git:

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  initContainers:
    - name: git-sync
      image: "k8s.gcr.io/git-sync/git-sync:v3.4.0"
      env:
        - name: GIT_SYNC_BRANCH
          value: "v2-2-stable"
        - name: GIT_SYNC_REPO
          value: "https://github.com/apache/airflow.git"
        - name: GIT_SYNC_DEPTH
          value: "1"
        - name: GIT_SYNC_ROOT
          value: "/git"
        - name: GIT_SYNC_DEST
          value: "repo"
        - name: GIT_SYNC_ADD_USER
          value: "true"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
      volumeMounts:
        - name: airflow-dags
          mountPath: /git
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          subPath: repo/airflow/example_dags
          readOnly: false
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      emptyDir: {}
    - name: airflow-logs
      emptyDir: {}
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

pod_override

When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis. To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides. Please note that the scheduler will override the metadata.name of the V1pod before launching it.

To overwrite the base container of the pod launched by the KubernetesExecutor, create a V1pod with a single container, and overwrite the fields as follows:

airflow/example_dags/example_kubernetes_executor.py[source]

        executor_config_volume_mount = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[
                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
                            ],
                        )
                    ],
                    volumes=[
                        k8s.V1Volume(
                            name="example-kubernetes-test-volume",
                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                        )
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_volume_mount)
        def test_volume_mount():
            """
            Tests whether the volume has been mounted.
            """

            with open('/foo/volume_mount_test.txt', 'w') as foo:
                foo.write('Hello')

            return_code = os.system("cat /foo/volume_mount_test.txt")
            if return_code != 0:
                raise ValueError(f"Error when checking volume mount. Return code {return_code}")

        volume_task = test_volume_mount()

Note that the following fields will all be extended instead of overwritten. From spec: volumes, and init_containers. From container: volume mounts, environment variables, ports, and devices.

To add a sidecar container to the launched pod, create a V1pod with an empty first container with the name base and a second container containing your desired sidecar.

airflow/example_dags/example_kubernetes_executor.py[source]

        executor_config_sidecar = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                        k8s.V1Container(
                            name="sidecar",
                            image="ubuntu",
                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
                            command=["bash", "-cx"],
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                    ],
                    volumes=[
                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_sidecar)
        def test_sharedvolume_mount():
            """
            Tests whether the volume has been mounted.
            """
            for i in range(5):
                try:
                    return_code = os.system("cat /shared/test.txt")
                    if return_code != 0:
                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
                except ValueError as e:
                    if i > 4:
                        raise e

        sidecar_task = test_sharedvolume_mount()

You can also create custom pod_template_file on a per-task basis so that you can recycle the same base values between multiple tasks. This will replace the default pod_template_file named in the airflow.cfg and then override that template using the pod_override.

Here is an example of a task with both features:

import os

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME

from kubernetes.client import models as k8s

with DAG(
    dag_id="example_pod_template_file",
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    executor_config_template = {
        "pod_template_file": os.path.join(
            AIRFLOW_HOME, "pod_templates/basic_template.yaml"
        ),
        "pod_override": k8s.V1Pod(
            metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
        ),
    }

    @task(executor_config=executor_config_template)
    def task_with_template():
        print_stuff()

Managing dags and logs

Use of persistent volumes is optional and depends on your configuration.

  • Dags:

To get the DAGs into the workers, you can:

  • Include dags in the image.

  • Use git-sync which, before starting the worker container, will run a git pull of the dags repository.

  • Storing dags on a persistent volume, which can be mounted on all workers.

  • Logs:

To get task logs out of the workers, you can:

  • Use a persistent volume mounted on both the webserver and workers.

  • Enable remote logging.

Note

If you don't enable logging persistence, and if you have not enabled remote logging, logs will be lost after the worker pods shut down.

Comparison with CeleryExecutor

In contrast to CeleryExecutor, KubernetesExecutor does not require additional components such as Redis and Flower, but does require access to Kubernetes cluster.

With KubernetesExecutor, each task runs in its own pod. The pod is created when the task is queued, and terminates when the task completes. Historically, in scenarios such as burstable workloads, this presented a resource utilization advantage over CeleryExecutor, where you needed a fixed number of long-running Celery worker pods, whether or not there were tasks to run.

However, the official Apache Airflow Helm chart can automatically scale celery workers down to zero based on the number of tasks in the queue, so when using the official chart, this is no longer an advantage.

With Celery workers you will tend to have less task latency because the worker pod is already up and running when the task is queued. On the other hand, because multiple tasks are running in the same pod, with Celery you may have to be more mindful about resource utilization in your task design, particularly memory consumption.

One scenario where KubernetesExecutor can be helpful is if you have long-running tasks, because if you deploy while a task is running, the task will keep running until it completes (or times out, etc). But with CeleryExecutor, provided you have set a grace period, the task will only keep running up until the grace period has elapsed, at which time the task will be terminated. Another scenario where KubernetesExecutor can work well is when your tasks are not very uniform with respect to resource requirements or images.

Finally, note that it does not have to be either-or; with CeleryKubernetesExecutor, it is possible to use both CeleryExecutor and KubernetesExecutor simultaneously on the same cluster. CeleryKubernetesExecutor will look at a task's queue to determine whether to run on Celery or Kubernetes. By default, tasks are sent to Celery workers, but if you want a task to run using KubernetesExecutor, you send it to the kubernetes queue and it will run in its own pod. And KubernetesPodOperator can be used to similar effect, no matter what executor you are using.

Fault Tolerance

Tip

To troubleshoot issues with KubernetesExecutor, you can use airflow kubernetes generate-dag-yaml command. This command generates the pods as they will be launched in Kubernetes and dumps them into yaml files for you to inspect.

Handling Worker Pod Crashes

When dealing with distributed systems, we need a system that assumes that any component can crash at any moment for reasons ranging from OOM errors to node upgrades.

In the case where a worker dies before it can report its status to the backend DB, the executor can use a Kubernetes watcher thread to discover the failed pod.

../_images/k8s-failed-pod.png

A Kubernetes watcher is a thread that can subscribe to every change that occurs in Kubernetes' database. It is alerted when pods start, run, end, and fail. By monitoring this stream, the KubernetesExecutor can discover that the worker crashed and correctly report the task as failed.

But What About Cases Where the Scheduler Pod Crashes?

In cases of scheduler crashes, the scheduler will recover its state using the watcher's resourceVersion.

When monitoring the Kubernetes cluster's watcher thread, each event has a monotonically rising number called a resourceVersion. Every time the executor reads a resourceVersion, the executor stores the latest value in the backend database. Because the resourceVersion is stored, the scheduler can restart and continue reading the watcher stream from where it left off. Since the tasks are run independently of the executor and report results directly to the database, scheduler failures will not lead to task failures or re-runs.

Was this entry helpful?