Executes task in a Kubernetes POD

Module Contents

class airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator(*, namespace: Optional[str] = None, image: Optional[str] = None, name: Optional[str] = None, cmds: Optional[List[str]] = None, arguments: Optional[List[str]] = None, ports: Optional[List[k8s.V1ContainerPort]] = None, volume_mounts: Optional[List[k8s.V1VolumeMount]] = None, volumes: Optional[List[k8s.V1Volume]] = None, env_vars: Optional[List[k8s.V1EnvVar]] = None, env_from: Optional[List[k8s.V1EnvFromSource]] = None, secrets: Optional[List[Secret]] = None, in_cluster: Optional[bool] = None, cluster_context: Optional[str] = None, labels: Optional[Dict] = None, reattach_on_restart: bool = True, startup_timeout_seconds: int = 120, get_logs: bool = True, image_pull_policy: str = 'IfNotPresent', annotations: Optional[Dict] = None, resources: Optional[k8s.V1ResourceRequirements] = None, affinity: Optional[k8s.V1Affinity] = None, config_file: Optional[str] = None, node_selectors: Optional[dict] = None, node_selector: Optional[dict] = None, image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None, service_account_name: str = 'default', is_delete_operator_pod: bool = False, hostnetwork: bool = False, tolerations: Optional[List[k8s.V1Toleration]] = None, security_context: Optional[Dict] = None, dnspolicy: Optional[str] = None, schedulername: Optional[str] = None, full_pod_spec: Optional[k8s.V1Pod] = None, init_containers: Optional[List[k8s.V1Container]] = None, log_events_on_failure: bool = False, do_xcom_push: bool = False, pod_template_file: Optional[str] = None, priority_class_name: Optional[str] = None, pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None, termination_grace_period: Optional[int] = None, configmaps: Optional[str] = None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a task in a Kubernetes Pod

See also

For more information on how to use this operator, take a look at the guide: KubernetesPodOperator


If you use Google Kubernetes Engine and Airflow is not running in the same cluster, consider using GKEStartPodOperator, which simplifies the authorization process.

  • namespace (str) -- the namespace to run within kubernetes.

  • image (str) -- Docker image you wish to launch. Defaults to, but fully qualified URLS will point to custom repositories. (templated)

  • name (str) -- name of the pod in which the task will run, will be used (plus a random suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).

  • cmds (list[str]) -- entrypoint of the container. (templated) The docker images's entrypoint is used if this is not provided.

  • arguments (list[str]) -- arguments of the entrypoint. (templated) The docker image's CMD is used if this is not provided.

  • ports (list[k8s.V1ContainerPort]) -- ports for launched pod.

  • volume_mounts (list[k8s.V1VolumeMount]) -- volumeMounts for launched pod.

  • volumes (list[k8s.V1Volume]) -- volumes for launched pod. Includes ConfigMaps and PersistentVolumes.

  • env_vars (list[k8s.V1EnvVar]) -- Environment variables initialized in the container. (templated)

  • secrets (list[airflow.kubernetes.secret.Secret]) -- Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume.

  • in_cluster (bool) -- run kubernetes client with in_cluster configuration.

  • cluster_context (str) -- context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used.

  • reattach_on_restart (bool) -- if the scheduler dies while the pod is running, reattach and monitor

  • labels (dict) -- labels to apply to the Pod. (templated)

  • startup_timeout_seconds (int) -- timeout in seconds to startup the pod.

  • get_logs (bool) -- get the stdout of the container as logs of the tasks.

  • image_pull_policy (str) -- Specify a policy to cache or always pull an image.

  • annotations (dict) -- non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels.

  • resources (k8s.V1ResourceRequirements) -- A dict containing resources requests and limits. Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. See also

  • affinity (k8s.V1Affinity) -- A dict containing a group of affinity scheduling rules.

  • config_file (str) -- The path to the Kubernetes config file. (templated) If not specified, default value is ~/.kube/config

  • node_selectors (dict) -- A dict containing a group of scheduling rules.

  • image_pull_secrets (List[k8s.V1LocalObjectReference]) -- Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b

  • service_account_name (str) -- Name of the service account

  • is_delete_operator_pod (bool) -- What to do when the pod reaches its final state, or the execution is interrupted. If False (default): do nothing, If True: delete the pod

  • hostnetwork (bool) -- If True enable host networking on the pod.

  • tolerations (List[k8s.V1Toleration]) -- A list of kubernetes tolerations.

  • security_context (dict) -- security options the pod should run with (PodSecurityContext).

  • dnspolicy (str) -- dnspolicy for the pod.

  • schedulername (str) -- Specify a schedulername for the pod

  • full_pod_spec (kubernetes.client.models.V1Pod) -- The complete podSpec

  • init_containers (list[kubernetes.client.models.V1Container]) -- init container for the launched Pod

  • log_events_on_failure (bool) -- Log the pod's events if a failure occurs

  • do_xcom_push (bool) -- If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes.

  • pod_template_file (str) -- path to pod template file (templated)

  • priority_class_name (str) -- priority class name for the launched Pod

  • termination_grace_period (int) -- Termination grace period if task killed in UI, defaults to kubernetes default

template_fields :Iterable[str] = ['image', 'cmds', 'arguments', 'env_vars', 'labels', 'config_file', 'pod_template_file'][source]
_render_nested_template_fields(self, content: Any, context: Dict, jinja_env: jinja2.Environment, seen_oids: set)[source]
static create_labels_for_pod(context)[source]

Generate labels for the pod to track the pod in case of Operator crash


context -- task context provided by airflow DAG



execute(self, context)[source]
handle_pod_overlap(self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod)[source]

In cases where the Scheduler restarts while a KubernetesPodOperator task is running, this function will either continue to monitor the existing pod or launch a new pod based on the reattach_on_restart parameter.

  • labels (dict) -- labels used to determine if a pod is repeated

  • try_numbers_match (bool) -- do the try numbers match? Only needed for logging purposes

  • launcher -- PodLauncher

  • pod -- Pod found with matching labels

static _get_pod_identifying_label_string(labels)[source]
static _try_numbers_match(context, pod)[source]
_set_name(self, name)[source]

Creates a V1Pod based on user parameters. Note that a pod or pod_template_file will supersede all other values.

create_new_pod_for_operator(self, labels, launcher)[source]

Creates a new pod and monitors for duration of task

  • labels -- labels used to track pod

  • launcher -- pod launcher that will manage launching and monitoring pods


patch_already_checked(self, pod: k8s.V1Pod)[source]

Add an "already tried annotation to ensure we only retry once

monitor_launched_pod(self, launcher, pod)[source]

Monitors a pod to completion that was created by a previous KubernetesPodOperator

  • launcher -- pod launcher that will manage launching and monitoring pods

  • pod -- podspec used to find pod using k8s API



Was this entry helpful?