airflow.executors.kubernetes_executor

KubernetesExecutor

See also

For more information on how the KubernetesExecutor works, take a look at the guide: Kubernetes Executor

Module Contents

airflow.executors.kubernetes_executor.KubernetesJobType[source]
airflow.executors.kubernetes_executor.KubernetesResultsType[source]
airflow.executors.kubernetes_executor.KubernetesWatchType[source]
class airflow.executors.kubernetes_executor.ResourceVersion[source]

Singleton for tracking resourceVersion from Kubernetes

_instance[source]
resource_version = 0[source]
classmethod __new__(cls)[source]
class airflow.executors.kubernetes_executor.KubernetesJobWatcher(namespace: Optional[str], multi_namespace_mode: bool, watcher_queue: 'Queue[KubernetesWatchType]', resource_version: Optional[str], scheduler_job_id: Optional[str], kube_config: Configuration)[source]

Bases: multiprocessing.Process, airflow.utils.log.logging_mixin.LoggingMixin

Watches for Kubernetes jobs

run(self)[source]

Performs watching

_run(self, kube_client: client.CoreV1Api, resource_version: Optional[str], scheduler_job_id: str, kube_config: Any)[source]
process_error(self, event: Any)[source]

Process error response

process_status(self, pod_id: str, namespace: str, status: str, annotations: Dict[str, str], resource_version: str, event: Any)[source]

Process status response

class airflow.executors.kubernetes_executor.AirflowKubernetesScheduler(kube_config: Any, task_queue: 'Queue[KubernetesJobType]', result_queue: 'Queue[KubernetesResultsType]', kube_client: client.CoreV1Api, scheduler_job_id: str)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Airflow Scheduler for Kubernetes

_make_kube_watcher(self)[source]
_health_check_kube_watcher(self)[source]
run_next(self, next_job: KubernetesJobType)[source]

The run_next command will check the task_queue for any un-run jobs. It will then create a unique job-id, launch that job in the cluster, and store relevant info in the current_jobs map so we can track the job's status

delete_pod(self, pod_id: str, namespace: str)[source]

Deletes POD

sync(self)[source]

The sync function checks the status of all currently running kubernetes jobs. If a job is completed, its status is placed in the result queue to be sent back to the scheduler.

Returns

process_watcher_task(self, task: KubernetesWatchType)[source]

Process the task by watcher.

_annotations_to_key(self, annotations: Dict[str, str])[source]
_flush_watcher_queue(self)[source]
terminate(self)[source]

Terminates the watcher.

airflow.executors.kubernetes_executor.get_base_pod_from_template(pod_template_file: Optional[str], kube_config: Any) → k8s.V1Pod[source]
Reads either the pod_template_file set in the executor_config or the base pod_template_file
set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor
Parameters
  • pod_template_file -- absolute path to a pod_template_file.yaml or None

  • kube_config -- The KubeConfig class generated by airflow that contains all kube metadata

Returns

a V1Pod that can be used as the base pod for k8s tasks

class airflow.executors.kubernetes_executor.KubernetesExecutor[source]

Bases: airflow.executors.base_executor.BaseExecutor, airflow.utils.log.logging_mixin.LoggingMixin

Executor for Kubernetes

clear_not_launched_queued_tasks(self, session=None)[source]

If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may not have been launched. Thus on starting up the scheduler let's check every "Queued" task to see if it has been launched (ie: if there is a corresponding pod on kubernetes)

If it has been launched then do nothing, otherwise reset the state to "None" so the task will be rescheduled

This will not be necessary in a future version of airflow in which there is proper support for State.LAUNCHED

start(self)[source]

Starts the executor

execute_async(self, key: TaskInstanceKey, command: CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None)[source]

Executes task asynchronously

sync(self)[source]

Synchronize task state.

_change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str, namespace: str)[source]
try_adopt_task_instances(self, tis: List[TaskInstance])[source]
adopt_launched_task(self, kube_client, pod, pod_ids: dict)[source]

Patch existing pod so that the current KubernetesJobWatcher can monitor it via label selectors

Parameters
  • kube_client -- kubernetes client for speaking to kube API

  • pod -- V1Pod spec that we will patch with new label

  • pod_ids -- pod_ids we expect to patch.

_adopt_completed_pods(self, kube_client: kubernetes.client.CoreV1Api)[source]

Patch completed pod so that the KubernetesJobWatcher can delete it.

Parameters

kube_client -- kubernetes client for speaking to kube API

_flush_task_queue(self)[source]
_flush_result_queue(self)[source]
end(self)[source]

Called when the executor shuts down

terminate(self)[source]

Terminate the executor is not doing anything.

Was this entry helpful?