Source code for airflow.utils.state

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from enum import Enum


[docs]class JobState(str, Enum): """All possible states that a Job can be in."""
[docs] RUNNING = "running"
[docs] SUCCESS = "success"
[docs] RESTARTING = "restarting"
[docs] FAILED = "failed"
[docs] def __str__(self) -> str: return self.value
[docs]class TaskInstanceState(str, Enum): """All possible states that a Task Instance can be in. Note that None is also allowed, so always use this in a type hint with Optional. """ # The scheduler sets a TaskInstance state to None when it's created but not # yet run, but we don't list it here since TaskInstance is a string enum. # Use None instead if need this state. # Set by the scheduler
[docs] REMOVED = "removed" # Task vanished from DAG before it ran
[docs] SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
# Set by the task instance itself
[docs] QUEUED = "queued" # Executor has enqueued the task
[docs] RUNNING = "running" # Task is executing
[docs] SUCCESS = "success" # Task completed
[docs] RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
[docs] FAILED = "failed" # Task errored out
[docs] UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
[docs] UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
[docs] UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
[docs] SKIPPED = "skipped" # Skipped by branching or some other mechanism
[docs] DEFERRED = "deferred" # Deferrable operator waiting on a trigger
# Not used anymore, kept for compatibility. # TODO: Remove in Airflow 3.0. SHUTDOWN = "shutdown" """The task instance is being shut down. :meta private: """
[docs] def __str__(self) -> str: return self.value
[docs]class DagRunState(str, Enum): """All possible states that a DagRun can be in. These are "shared" with TaskInstanceState in some parts of the code, so please ensure that their values always match the ones with the same name in TaskInstanceState. """
[docs] QUEUED = "queued"
[docs] RUNNING = "running"
[docs] SUCCESS = "success"
[docs] FAILED = "failed"
[docs] def __str__(self) -> str: return self.value
[docs]class State: """Static class with task instance state constants and color methods to avoid hard-coding.""" # Backwards-compat constants for code that does not yet use the enum # These first three are shared by DagState and TaskState
[docs] SUCCESS = TaskInstanceState.SUCCESS
[docs] RUNNING = TaskInstanceState.RUNNING
[docs] FAILED = TaskInstanceState.FAILED
# These are TaskState only
[docs] NONE = None
[docs] REMOVED = TaskInstanceState.REMOVED
[docs] SCHEDULED = TaskInstanceState.SCHEDULED
[docs] QUEUED = TaskInstanceState.QUEUED
[docs] RESTARTING = TaskInstanceState.RESTARTING
[docs] UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
[docs] UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
[docs] UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
[docs] SKIPPED = TaskInstanceState.SKIPPED
[docs] DEFERRED = TaskInstanceState.DEFERRED
# Not used anymore, kept for compatibility. # TODO: Remove in Airflow 3.0. SHUTDOWN = TaskInstanceState.SHUTDOWN """The task instance is being shut down. :meta private: """
[docs] finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
[docs] unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
[docs] task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState)
[docs] dag_states: tuple[DagRunState, ...] = ( DagRunState.QUEUED, DagRunState.SUCCESS, DagRunState.RUNNING, DagRunState.FAILED, )
[docs] state_color: dict[TaskInstanceState | None, str] = { None: "lightblue", TaskInstanceState.QUEUED: "gray", TaskInstanceState.RUNNING: "lime", TaskInstanceState.SUCCESS: "green", TaskInstanceState.RESTARTING: "violet", TaskInstanceState.FAILED: "red", TaskInstanceState.UP_FOR_RETRY: "gold", TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise", TaskInstanceState.UPSTREAM_FAILED: "orange", TaskInstanceState.SKIPPED: "hotpink", TaskInstanceState.REMOVED: "lightgrey", TaskInstanceState.SCHEDULED: "tan", TaskInstanceState.DEFERRED: "mediumpurple", }
@classmethod
[docs] def color(cls, state): """Return color for a state.""" return cls.state_color.get(state, "white")
@classmethod
[docs] def color_fg(cls, state): """Black&white colors for a state.""" color = cls.color(state) if color in ["green", "red"]: return "white" return "black"
[docs] finished: frozenset[TaskInstanceState] = frozenset( [ TaskInstanceState.SUCCESS, TaskInstanceState.FAILED, TaskInstanceState.SKIPPED, TaskInstanceState.UPSTREAM_FAILED, TaskInstanceState.REMOVED, ] )
""" A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no further action. Note that the attempt could have resulted in failure or have been interrupted; or perhaps never run at all (skip, or upstream_failed) in any case, it is no longer running. """
[docs] unfinished: frozenset[TaskInstanceState | None] = frozenset( [ None, TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING, TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.UP_FOR_RESCHEDULE, TaskInstanceState.DEFERRED, ] )
""" A list of states indicating that a task either has not completed a run or has not even started. """
[docs] failed_states: frozenset[TaskInstanceState] = frozenset( [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED] )
""" A list of states indicating that a task or dag is a failed state. """
[docs] success_states: frozenset[TaskInstanceState] = frozenset( [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED] )
""" A list of states indicating that a task or dag is a success state. """ # Kept for compatibility. DO NOT USE. # TODO: Remove in Airflow 3.0. terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING]) """ A list of states indicating that a task has been terminated. :meta private: """
[docs] adoptable_states = frozenset( [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING] )
""" A list of states indicating that a task can be adopted or reset by a scheduler job if it was queued by another scheduler job that is not running anymore. """

Was this entry helpful?