airflow.providers.cncf.kubernetes.executors.kubernetes_executor

KubernetesExecutor.

See also

For more information on how the KubernetesExecutor works, take a look at the guide: Kubernetes Executor

Attributes

ARG_NAMESPACE

ARG_MIN_PENDING_MINUTES

KUBERNETES_COMMANDS

Classes

KubernetesExecutor

Executor for Kubernetes.

Module Contents

airflow.providers.cncf.kubernetes.executors.kubernetes_executor.ARG_NAMESPACE[source]
airflow.providers.cncf.kubernetes.executors.kubernetes_executor.ARG_MIN_PENDING_MINUTES[source]
airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KUBERNETES_COMMANDS[source]
class airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor[source]

Bases: airflow.executors.base_executor.BaseExecutor

Executor for Kubernetes.

RUNNING_POD_LOG_LINES = 100[source]
supports_ad_hoc_ti_run: bool = True[source]
queued_tasks: dict[airflow.models.taskinstancekey.TaskInstanceKey, airflow.executors.workloads.All][source]
kube_config[source]
task_queue: queue.Queue[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types.KubernetesJobType][source]
result_queue: queue.Queue[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types.KubernetesResultsType][source]
kube_scheduler: airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler | None = None[source]
kube_client: kubernetes.client.CoreV1Api | None = None[source]
scheduler_job_id: str | None = None[source]
last_handled: dict[airflow.models.taskinstancekey.TaskInstanceKey, float][source]
kubernetes_queue: str | None = None[source]
task_publish_retries: collections.Counter[airflow.models.taskinstancekey.TaskInstanceKey][source]
task_publish_max_retries[source]
get_pod_combined_search_str_to_pod_map()[source]

List the worker pods owned by this scheduler and create a map containing pod combined search str -> pod.

For every pod, it creates two below entries in the map dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker},<map_index={map_index}>,run_id={run_id}

start()[source]

Start the executor.

execute_async(key, command, queue=None, executor_config=None)[source]

Execute task asynchronously.

queue_workload(workload, session)[source]
sync()[source]

Synchronize task state.

get_task_log(ti, try_number)[source]

Return the task logs.

Parameters:
Returns:

tuple of logs and messages

Return type:

tuple[list[str], list[str]]

try_adopt_task_instances(tis)[source]

Try to adopt running task instances that have been abandoned by a SchedulerJob dying.

Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling)

Returns:

any TaskInstances that were unable to be adopted

Return type:

collections.abc.Sequence[airflow.models.taskinstance.TaskInstance]

cleanup_stuck_queued_tasks(tis)[source]

Handle remnants of tasks that were failed because they were stuck in queued.

Tasks can get stuck in queued. If such a task is detected, it will be marked as UP_FOR_RETRY if the task instance has remaining retries or marked as FAILED if it doesn’t.

Parameters:

tis (list[airflow.models.taskinstance.TaskInstance]) – List of Task Instances to clean up

Returns:

List of readable task instances for a warning message

Return type:

list[str]

revoke_task(*, ti)[source]

Revoke task that may be running.

Parameters:

ti (airflow.models.taskinstance.TaskInstance) – task instance to revoke

adopt_launched_task(kube_client, pod, tis_to_flush_by_key)[source]

Patch existing pod so that the current KubernetesJobWatcher can monitor it via label selectors.

Parameters:
  • kube_client (kubernetes.client.CoreV1Api) – kubernetes client for speaking to kube API

  • pod (kubernetes.client.models.V1Pod) – V1Pod spec that we will patch with new label

  • tis_to_flush_by_key (dict[airflow.models.taskinstancekey.TaskInstanceKey, kubernetes.client.models.V1Pod]) – TIs that will be flushed if they aren’t adopted

end()[source]

Shut down the executor.

terminate()[source]

Terminate the executor is not doing anything.

static get_cli_commands()[source]

Vends CLI commands to be included in Airflow CLI.

Override this method to expose commands via Airflow CLI to manage this executor. This can be commands to setup/teardown the executor, inspect state, etc. Make sure to choose unique names for those commands, to avoid collisions.

Was this entry helpful?