airflow.providers.celery.executors.celery_kubernetes_executor

Module Contents

Classes

CeleryKubernetesExecutor

CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor.

class airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor(celery_executor, kubernetes_executor)[source]

Bases: airflow.executors.base_executor.BaseExecutor

CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor.

It chooses an executor to use based on the queue defined on the task. When the queue is the value of kubernetes_queue in section [celery_kubernetes_executor] of the configuration (default value: kubernetes), KubernetesExecutor is selected to run the task, otherwise, CeleryExecutor is used.

property queued_tasks: dict[airflow.models.taskinstancekey.TaskInstanceKey, airflow.executors.base_executor.QueuedTaskInstanceType][source]

Return queued tasks from celery and kubernetes executor.

property running: set[airflow.models.taskinstancekey.TaskInstanceKey][source]

Return running tasks from celery and kubernetes executor.

property job_id: int | str | None[source]

Inherited attribute from BaseExecutor.

Since this is not really an executor, but a wrapper of executors we implemented it as property, so we can have custom setter.

property slots_available: int[source]

Number of new tasks this executor instance can accept.

property slots_occupied[source]

Number of tasks this executor instance is currently managing.

supports_ad_hoc_ti_run: bool = True[source]
supports_pickling: bool = True[source]
supports_sentry: bool = False[source]
is_local: bool = False[source]
is_single_threaded: bool = False[source]
is_production: bool = True[source]
serve_logs: bool = False[source]
change_sensor_mode_to_reschedule: bool = False[source]
callback_sink: airflow.callbacks.base_callback_sink.BaseCallbackSink | None[source]
kubernetes_queue()[source]
start()[source]

Start celery and kubernetes executor.

queue_command(task_instance, command, priority=1, queue=None)[source]

Queues command via celery or kubernetes executor.

queue_task_instance(task_instance, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, pool=None, cfg_path=None, **kwargs)[source]

Queues task instance via celery or kubernetes executor.

get_task_log(ti, try_number)[source]

Fetch task log from Kubernetes executor.

has_task(task_instance)[source]

Check if a task is either queued or running in either celery or kubernetes executor.

Parameters

task_instance (airflow.models.taskinstance.TaskInstance) – TaskInstance

Returns

True if the task is known to this executor

Return type

bool

heartbeat()[source]

Heartbeat sent to trigger new jobs in celery and kubernetes executor.

get_event_buffer(dag_ids=None)[source]

Return and flush the event buffer from celery and kubernetes executor.

Parameters

dag_ids (list[str] | None) – dag_ids to return events for, if None returns all

Returns

a dict of events

Return type

dict[airflow.models.taskinstancekey.TaskInstanceKey, airflow.executors.base_executor.EventBufferValueType]

try_adopt_task_instances(tis)[source]

Try to adopt running task instances that have been abandoned by a SchedulerJob dying.

Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling)

Returns

any TaskInstances that were unable to be adopted

Return type

collections.abc.Sequence[airflow.models.taskinstance.TaskInstance]

cleanup_stuck_queued_tasks(tis)[source]

Handle remnants of tasks that were failed because they were stuck in queued.

Tasks can get stuck in queued. If such a task is detected, it will be marked as UP_FOR_RETRY if the task instance has remaining retries or marked as FAILED if it doesn’t.

Parameters

tis (list[airflow.models.taskinstance.TaskInstance]) – List of Task Instances to clean up

Returns

List of readable task instances for a warning message

Return type

list[str]

revoke_task(*, ti)[source]

Attempt to remove task from executor.

It should attempt to ensure that the task is no longer running on the worker, and ensure that it is cleared out from internal data structures.

It should not change the state of the task in airflow, or add any events to the event buffer.

It should not raise any error.

Parameters

ti (airflow.models.taskinstance.TaskInstance) – Task instance to remove

end()[source]

End celery and kubernetes executor.

terminate()[source]

Terminate celery and kubernetes executor.

debug_dump()[source]

Debug dump; called in response to SIGUSR2 by the scheduler.

send_callback(request)[source]

Send callback for execution.

Parameters

request (airflow.callbacks.callback_requests.CallbackRequest) – Callback request to be executed.

static get_cli_commands()[source]

Vends CLI commands to be included in Airflow CLI.

Override this method to expose commands via Airflow CLI to manage this executor. This can be commands to setup/teardown the executor, inspect state, etc. Make sure to choose unique names for those commands, to avoid collisions.

Was this entry helpful?