Table of Contents
KubernetesPodOperator¶
The KubernetesPodOperator
allows
you to create and run Pods on a Kubernetes cluster.
Note
If you use a managed Kubernetes consider using a specialize KPO operator as it simplifies the Kubernetes authorization process :
GKEStartPodOperator operator for Google Kubernetes Engine,
EksPodOperator operator for AWS Elastic Kubernetes Engine.
Note
The Kubernetes executor is not required to use this operator.
How does this operator work?¶
The KubernetesPodOperator
uses the
Kubernetes API to launch a pod in a Kubernetes cluster. By supplying an
image URL and a command with optional arguments, the operator uses the Kube Python Client to generate a Kubernetes API
request that dynamically launches those individual pods.
Users can specify a kubeconfig file using the config_file
parameter, otherwise the operator will default
to ~/.kube/config
.
The KubernetesPodOperator
enables task-level
resource configuration and is optimal for custom Python
dependencies that are not available through the public PyPI repository. It also allows users to supply a template
YAML file using the pod_template_file
parameter.
Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
Debugging KubernetesPodOperator¶
You can print out the Kubernetes manifest for the pod that would be created at runtime by calling
dry_run()
on an instance of the operator.
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
)
k.dry_run()
Argument precedence¶
When KPO defines the pod object, there may be overlap between the KubernetesPodOperator
arguments.
In general, the order of precedence is KPO field-specific arguments (e.g., secrets
, cmds
, affinity
), more general templates full_pod_spec
, pod_template_file
, pod_template_dict
, and followed by V1Pod
, by default.
For namespace
, if namespace is not provided via any of these methods, then we’ll first try to
get the current namespace (if the task is already running in kubernetes) and failing that we’ll use
the default
namespace.
For pod name, if not provided explicitly, we’ll use the task_id. A random suffix is added by default so the pod name is not generally of great consequence.
How to use cluster ConfigMaps, Secrets, and Volumes with Pod?¶
To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API like this:
from kubernetes.client import models as k8s
With this API object, you can have access to all Kubernetes API objects in the form of python classes.
Using this method will ensure correctness
and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
Secret
class to simplify the process of generating secret volumes/env variables.
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
configmaps = [
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
port = k8s.V1ContainerPort(name="http", container_port=80)
init_container_volume_mounts = [
k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]
init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]
init_container = k8s.V1Container(
name="init-container",
image="ubuntu:16.04",
env=init_environments,
volume_mounts=init_container_volume_mounts,
command=["bash", "-cx"],
args=["echo 10"],
)
affinity = k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s.V1PreferredSchedulingTerm(
weight=1,
preference=k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
]
),
)
]
),
pod_affinity=k8s.V1PodAffinity(
required_during_scheduling_ignored_during_execution=[
k8s.V1WeightedPodAffinityTerm(
weight=1,
pod_affinity_term=k8s.V1PodAffinityTerm(
label_selector=k8s.V1LabelSelector(
match_expressions=[
k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
]
),
topology_key="failure-domain.beta.kubernetes.io/zone",
),
)
]
),
)
tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
Difference between KubernetesPodOperator
and Kubernetes object spec¶
The KubernetesPodOperator
can be considered
a substitute for a Kubernetes object spec definition that is able
to be run in the Airflow scheduler in the DAG context. If using the operator, there is no need to create the
equivalent YAML/JSON object spec for the Pod you would like to run.
The YAML file can still be provided with the pod_template_file
or even the Pod Spec constructed in Python via
the full_pod_spec
parameter which requires a Kubernetes V1Pod
.
How to use private images (container registry)?¶
By default, the KubernetesPodOperator
will
look for images hosted publicly on Dockerhub.
To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
specified in the image_pull_secrets
parameter.
Create the Secret using kubectl
:
kubectl create secret docker-registry testquay \
--docker-server=quay.io \
--docker-username=<Profile name> \
--docker-password=<password>
Then use it in your pod like so:
quay_k8s = KubernetesPodOperator(
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
on_finish_action="delete_pod",
in_cluster=True,
task_id="task-two",
get_logs=True,
)
Also for this action you can use operator in the deferrable mode:
quay_k8s_async = KubernetesPodOperator(
task_id="kubernetes_private_img_task_async",
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
)
Example to fetch and display container log periodically
kubernetes_task_async_log = KubernetesPodOperator(
task_id="kubernetes_task_async_log",
namespace="kubernetes_task_async_log",
in_cluster=False,
name="astro_k8s_test_pod",
image="ubuntu",
cmds=[
"bash",
"-cx",
(
"i=0; "
"while [ $i -ne 100 ]; "
"do i=$(($i+1)); "
"echo $i; "
"sleep 1; "
"done; "
"mkdir -p /airflow/xcom/; "
'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'
),
],
do_xcom_push=True,
deferrable=True,
get_logs=True,
logging_interval=5,
)
How does XCom work?¶
The KubernetesPodOperator
handles
XCom values differently than other operators. In order to pass a XCom value
from your Pod you must specify the do_xcom_push
as True
. This will create a sidecar container that runs
alongside the Pod. The Pod must write the XCom value into this location at the /airflow/xcom/return.json
path.
Note
An invalid json content will fail, example echo 'hello' > /airflow/xcom/return.json
fail and echo '\"hello\"' > /airflow/xcom/return.json
work
See the following example on how this occurs:
write_xcom = KubernetesPodOperator(
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
on_finish_action="delete_pod",
in_cluster=True,
task_id="write-xcom",
get_logs=True,
)
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
write_xcom >> pod_task_xcom_result
Note
XCOMs will be pushed only for tasks marked as State.SUCCESS
.
Also for this action you can use operator in the deferrable mode:
write_xcom_async = KubernetesPodOperator(
task_id="kubernetes_write_xcom_task_async",
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
)
pod_task_xcom_result_async = BashOperator(
task_id="pod_task_xcom_result_async",
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
)
write_xcom_async >> pod_task_xcom_result_async
Include error message in email alert¶
Any content written to /dev/termination-log
will be retrieved by Kubernetes and
included in the exception message if the task fails.
k = KubernetesPodOperator(
task_id="test_error_message",
image="alpine",
cmds=["/bin/sh"],
arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
name="test-error-message",
email="airflow@example.com",
email_on_failure=True,
)
Read more on termination-log here.
KubernetesPodOperator callbacks¶
The KubernetesPodOperator
supports different
callbacks that can be used to trigger actions during the lifecycle of the pod. In order to use them, you need to
create a subclass of KubernetesPodOperatorCallback
and override
the callbacks methods you want to use. Then you can pass your callback class to the operator using the callbacks
parameter.
The following callbacks are supported:
on_sync_client_creation: called after creating the sync client
on_pod_creation: called after creating the pod
on_pod_starting: called after the pod starts
on_pod_completion: called when the pod completes
on_pod_cleanup: called after cleaning/deleting the pod
on_operator_resuming: when resuming the task from deferred state
progress_callback: called on each line of containers logs
Currently, the callbacks methods are not called in the async mode, this support will be added in the future.
Example:¶
import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
class MyCallback(KubernetesPodOperatorCallback):
@staticmethod
def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
client.create_namespaced_service(
namespace=pod.metadata.namespace,
body=k8s.V1Service(
metadata=k8s.V1ObjectMeta(
name=pod.metadata.name,
labels=pod.metadata.labels,
owner_references=[
k8s.V1OwnerReference(
api_version=pod.api_version,
kind=pod.kind,
name=pod.metadata.name,
uid=pod.metadata.uid,
controller=True,
block_owner_deletion=True,
)
],
),
spec=k8s.V1ServiceSpec(
selector=pod.metadata.labels,
ports=[
k8s.V1ServicePort(
name="http",
port=80,
target_port=80,
)
],
),
),
)
k = KubernetesPodOperator(
task_id="test_callback",
image="alpine",
cmds=["/bin/sh"],
arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
name="test-callback",
callbacks=MyCallback,
)
Passing secrets¶
Never use environment variables to pass secrets (for example connection authentication information) to
Kubernetes Pod Operator. Such environment variables will be visible to anyone who has access
to see and describe PODs in Kubernetes. Instead, pass your secrets via native Kubernetes Secrets
or
use Connections and Variables from Airflow. For the latter, you need to have apache-airflow
package
installed in your image in the same version as airflow you run your Kubernetes Pod Operator from).
SparkKubernetesOperator¶
The SparkKubernetesOperator
allows
you to create and run spark job on a Kubernetes cluster. It is based on spark-on-k8s-operator project.
This operator simplifies the interface and accepts different parameters to configure and run spark application on Kubernetes. Similar to the KubernetesOperator, we have added the logic to wait for a job after submission, manage error handling, retrieve logs from the driver pod and the ability to delete a spark job. It also supports out-of-the-box Kubernetes functionalities such as handling of volumes, config maps, secrets, etc.
How does this operator work?¶
The operator initiates a Spark task by generating a SparkApplication Custom Resource Definition (CRD) within Kubernetes. This SparkApplication task subsequently generates driver and required executor pods, using the parameters specified by the user. The operator continuously monitors the task’s progress until it either succeeds or fails. It retrieves logs from the driver pod and displays them in the Airflow UI.
Usage examples¶
In order to create a SparkKubernetesOperator task, you must provide a basic template that includes Spark configuration and Kubernetes-related resource configuration. This template, which can be in either YAML or JSON format, serves as a starting point for the operator. Below is a sample template that you can utilize:
spark_job_template.yaml
spark:
apiVersion: sparkoperator.k8s.io/v1beta2
version: v1beta2
kind: SparkApplication
apiGroup: sparkoperator.k8s.io
metadata:
namespace: ds
spec:
type: Python
pythonVersion: "3"
mode: cluster
sparkVersion: 3.0.0
successfulRunHistoryLimit: 1
restartPolicy:
type: Never
imagePullPolicy: Always
hadoopConf: {}
imagePullSecrets: []
dynamicAllocation:
enabled: false
initialExecutors: 1
minExecutors: 1
maxExecutors: 1
labels: {}
driver:
serviceAccount: default
container_resources:
gpu:
name: null
quantity: 0
cpu:
request: null
limit: null
memory:
request: null
limit: null
executor:
instances: 1
container_resources:
gpu:
name: null
quantity: 0
cpu:
request: null
limit: null
memory:
request: null
limit: null
kubernetes:
# example:
# env_vars:
# - name: TEST_NAME
# value: TEST_VALUE
env_vars: []
# example:
# env_from:
# - name: test
# valueFrom:
# secretKeyRef:
# name: mongo-secret
# key: mongo-password
env_from: []
# example:
# node_selector:
# karpenter.sh/provisioner-name: spark
node_selector: {}
# example: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/
# affinity:
# nodeAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# nodeSelectorTerms:
# - matchExpressions:
# - key: beta.kubernetes.io/instance-type
# operator: In
# values:
# - r5.xlarge
affinity:
nodeAffinity: {}
podAffinity: {}
podAntiAffinity: {}
# example: https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
# type: list
# tolerations:
# - key: "key1"
# operator: "Equal"
# value: "value1"
# effect: "NoSchedule"
tolerations: []
# example:
# config_map_mounts:
# snowflake-default: /mnt/tmp
config_map_mounts: {}
# example:
# volume_mounts:
# - name: config
# mountPath: /airflow
volume_mounts: []
# https://kubernetes.io/docs/concepts/storage/volumes/
# example:
# volumes:
# - name: config
# persistentVolumeClaim:
# claimName: airflow
volumes: []
# read config map into an env variable
# example:
# from_env_config_map:
# - configmap_1
# - configmap_2
from_env_config_map: []
# load secret into an env variable
# example:
# from_env_secret:
# - secret_1
# - secret_2
from_env_secret: []
in_cluster: true
conn_id: kubernetes_default
kube_config_file: null
cluster_context: null
Important
The template file consists of two primary categories:
spark
andkubernetes
.spark: This segment encompasses the task’s Spark configuration, mirroring the structure of the Spark API template.
kubernetes: This segment encompasses the task’s Kubernetes resource configuration, directly corresponding to the Kubernetes API Documentation. Each resource type includes an example within the template.
The designated base image to be utilized is
gcr.io/spark-operator/spark-py:v3.1.1
.Ensure that the Spark code is either embedded within the image, mounted using a persistentVolume, or accessible from an external location such as an S3 bucket.
Next, create the task using the following:
SparkKubernetesOperator(
task_id="spark_task",
image="gcr.io/spark-operator/spark-py:v3.1.1", # OR custom image using that
code_path="local://path/to/spark/code.py",
application_file="spark_job_template.yaml", # OR spark_job_template.json
dag=dag,
)
Note: Alternatively application_file can also be a json file. see below example
spark_job_template.json
{
"spark": {
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"version": "v1beta2",
"kind": "SparkApplication",
"apiGroup": "sparkoperator.k8s.io",
"metadata": {
"namespace": "ds"
},
"spec": {
"type": "Python",
"pythonVersion": "3",
"mode": "cluster",
"sparkVersion": "3.0.0",
"successfulRunHistoryLimit": 1,
"restartPolicy": {
"type": "Never"
},
"imagePullPolicy": "Always",
"hadoopConf": {},
"imagePullSecrets": [],
"dynamicAllocation": {
"enabled": false,
"initialExecutors": 1,
"minExecutors": 1,
"maxExecutors": 1
},
"labels": {},
"driver": {
"serviceAccount": "default",
"container_resources": {
"gpu": {
"name": null,
"quantity": 0
},
"cpu": {
"request": null,
"limit": null
},
"memory": {
"request": null,
"limit": null
}
}
},
"executor": {
"instances": 1,
"container_resources": {
"gpu": {
"name": null,
"quantity": 0
},
"cpu": {
"request": null,
"limit": null
},
"memory": {
"request": null,
"limit": null
}
}
}
}
},
"kubernetes": {
"env_vars": [],
"env_from": [],
"node_selector": {},
"affinity": {
"nodeAffinity": {},
"podAffinity": {},
"podAntiAffinity": {}
},
"tolerations": [],
"config_map_mounts": {},
"volume_mounts": [
{
"name": "config",
"mountPath": "/airflow"
}
],
"volumes": [
{
"name": "config",
"persistentVolumeClaim": {
"claimName": "hsaljoog-airflow"
}
}
],
"from_env_config_map": [],
"from_env_secret": [],
"in_cluster": true,
"conn_id": "kubernetes_default",
"kube_config_file": null,
"cluster_context": null
}
}
An alternative method, apart from using YAML or JSON files, is to directly pass the template_spec
field instead of application_file
if you prefer not to employ a file for configuration.
KubernetesJobOperator¶
The KubernetesJobOperator
allows
you to create and run Jobs on a Kubernetes cluster.
Note
If you use a managed Kubernetes consider using a specialize KJO operator as it simplifies the Kubernetes authorization process :
GKEStartJobOperator
operator for Google Kubernetes Engine.
Note
The Kubernetes executor is not required to use this operator.
How does this operator work?¶
The KubernetesJobOperator
uses the
Kubernetes API to launch a job in a Kubernetes cluster. The operator uses the Kube Python Client to generate a Kubernetes API
request that dynamically launches this Job.
Users can specify a kubeconfig file using the config_file
parameter, otherwise the operator will default
to ~/.kube/config
. It also allows users to supply a template YAML file using the job_template_file
parameter.
k8s_job = KubernetesJobOperator(
task_id="job-task",
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
The KubernetesJobOperator
also supports deferrable mode:
k8s_job_def = KubernetesJobOperator(
task_id="job-task-def",
namespace="default",
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME + "-def",
wait_until_job_complete=True,
deferrable=True,
)
Difference between KubernetesPodOperator
and KubernetesJobOperator
¶
The KubernetesJobOperator
is operator for creating Job.
A Job creates one or more Pods and will continue to retry execution of the Pods until a specified number of them successfully terminate.
As Pods successfully complete, the Job tracks the successful completions. When a specified number of successful completions is reached, the Job is complete.
Users can limit how many times a Job retries execution using configuration parameters like activeDeadlineSeconds
and backoffLimit
.
Instead of template
parameter for Pod creating this operator uses KubernetesPodOperator
.
It means that user can use all parameters from KubernetesPodOperator
in KubernetesJobOperator
.
More information about the Jobs here: Kubernetes Job Documentation
KubernetesDeleteJobOperator¶
The KubernetesDeleteJobOperator
allows
you to delete Jobs on a Kubernetes cluster.
delete_job_task = KubernetesDeleteJobOperator(
task_id="delete_job_task",
name=k8s_job.output["job_name"],
namespace=JOB_NAMESPACE,
wait_for_completion=True,
delete_on_status="Complete",
poll_interval=1.0,
)
KubernetesPatchJobOperator¶
The KubernetesPatchJobOperator
allows
you to update Jobs on a Kubernetes cluster.
update_job = KubernetesPatchJobOperator(
task_id="update-job-task",
namespace="default",
name=k8s_job.output["job_name"],
body={"spec": {"suspend": False}},
)