airflow.models.taskinstance

Module Contents

Classes

TaskInstance

Task instances store the state of a task instance.

SimpleTaskInstance

Simplified Task Instance.

TaskInstanceNote

For storage of arbitrary notes concerning the task instance.

Functions

set_current_context(context)

Set the current execution context to the provided context object.

clear_task_instances(tis, session[, ...])

Clear a set of task instances, but make sure the running ones get killed.

Attributes

TR

log

PAST_DEPENDS_MET

TaskInstanceStateType

airflow.models.taskinstance.TR[source]
airflow.models.taskinstance.log[source]
airflow.models.taskinstance.PAST_DEPENDS_MET = 'past_depends_met'[source]
airflow.models.taskinstance.set_current_context(context)[source]

Set the current execution context to the provided context object.

This method should be called once per Task execution, before calling operator.execute.

airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state=DagRunState.QUEUED)[source]

Clear a set of task instances, but make sure the running ones get killed.

Also sets Dagrun’s state to QUEUED and start_date to the time of execution. But only for finished DRs (SUCCESS and FAILED). Doesn’t clear DR’s state and start_date`for running DRs (QUEUED and RUNNING) because clearing the state for already running DR is redundant and clearing `start_date affects DR’s duration.

Parameters
class airflow.models.taskinstance.TaskInstance(task, execution_date=None, run_id=None, state=None, map_index=-1)[source]

Bases: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

Task instances store the state of a task instance.

This table is the authority and single source of truth around what tasks have run and the state they are in.

The SqlAlchemy model doesn’t have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions.

Database transactions on this table should insure double triggers and any confusion around what task instances are or aren’t ready to run even while multiple schedulers may be firing task instances.

A value of -1 in map_index represents any of: a TI without mapped tasks; a TI with mapped tasks that has yet to be expanded (state=pending); a TI with mapped tasks that expanded to an empty list (state=skipped).

property stats_tags: dict[str, str][source]

Returns task instance tags.

property next_try_number: int[source]
property operator_name: str | None[source]

@property: use a more friendly display name for the operator, if set.

property log_url: str[source]

Log URL for TaskInstance.

property mark_success_url: str[source]

URL to mark TI success.

property key: airflow.models.taskinstancekey.TaskInstanceKey[source]

Returns a tuple that identifies the task instance uniquely.

property is_premature: bool[source]

Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed.

property previous_ti: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[source]

This attribute is deprecated.

Please use airflow.models.taskinstance.TaskInstance.get_previous_ti.

property previous_ti_success: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[source]

This attribute is deprecated.

Please use airflow.models.taskinstance.TaskInstance.get_previous_ti.

property previous_start_date_success: pendulum.DateTime | None[source]

This attribute is deprecated.

Please use airflow.models.taskinstance.TaskInstance.get_previous_start_date.

__tablename__ = 'task_instance'[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
start_date[source]
end_date[source]
duration[source]
state[source]
try_number[source]
max_tries[source]
hostname[source]
unixname[source]
job_id[source]
pool[source]
pool_slots[source]
queue[source]
priority_weight[source]
operator[source]
custom_operator_name[source]
queued_dttm[source]
queued_by_job_id[source]
pid[source]
executor[source]
executor_config[source]
updated_at[source]
rendered_map_index[source]
external_executor_id[source]
trigger_id[source]
trigger_timeout[source]
next_method[source]
next_kwargs[source]
__table_args__ = ()[source]
dag_model: airflow.models.dag.DagModel[source]
trigger[source]
triggerer_job[source]
dag_run[source]
rendered_task_instance_fields[source]
execution_date[source]
task_instance_note[source]
note[source]
task: airflow.models.operator.Operator | None[source]
test_mode: bool = False[source]
is_trigger_log_context: bool = False[source]
run_as_user: str | None[source]
__hash__()[source]

Return hash(self).

init_on_load()[source]

Initialize the attributes that aren’t stored in the DB.

task_display_name()[source]
command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

Return a command that can be executed anywhere where airflow is installed.

This command is part of the message sent to executors by the orchestrator.

static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None, map_index=-1)[source]

Generate the shell command required to execute this task instance.

Parameters
  • dag_id (str) – DAG ID

  • task_id (str) – Task ID

  • run_id (str) – The run_id of this task’s DagRun

  • mark_success (bool) – Whether to mark the task as successful

  • ignore_all_deps (bool) – Ignore all ignorable dependencies. Overrides the other ignore_* parameters.

  • ignore_depends_on_past (bool) – Ignore depends_on_past parameter of DAGs (e.g. for Backfills)

  • wait_for_past_depends_before_skipping (bool) – Wait for past depends before marking the ti as skipped

  • ignore_task_deps (bool) – Ignore task-specific dependencies such as depends_on_past and trigger rule

  • ignore_ti_state (bool) – Ignore the task instance’s previous failure/success

  • local (bool) – Whether to run the task locally

  • pickle_id (int | None) – If the DAG was serialized to the DB, the ID associated with the pickled DAG

  • file_path (pathlib.PurePath | str | None) – path to the file containing the DAG definition

  • raw (bool) – raw mode (needs more details)

  • job_id (str | None) – job ID (needs more details)

  • pool (str | None) – the Airflow pool that the task should run in

  • cfg_path (str | None) – the Path to the configuration file

Returns

shell command that can be used to run the task instance

Return type

list[str]

current_state(session=NEW_SESSION)[source]

Get the very latest state from the database.

If a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used.

sqlalchemy.inspect is used here to get the primary keys ensuring that if they change it will not regress

Parameters

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

error(session=NEW_SESSION)[source]

Force the task instance’s state to FAILED in the database.

Parameters

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]
refresh_from_db(session=NEW_SESSION, lock_for_update=False)[source]

Refresh the task instance from the database based on the primary key.

Parameters
  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • lock_for_update (bool) – if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed.

refresh_from_task(task, pool_override=None)[source]

Copy common attributes from the given task.

Parameters
  • task (airflow.models.operator.Operator) – The task object to copy from

  • pool_override (str | None) – Use the pool_override instead of task’s pool

clear_xcom_data(session=NEW_SESSION)[source]
set_state(state, session=NEW_SESSION)[source]

Set TaskInstance state.

Parameters
Returns

Was the state changed

Return type

bool

are_dependents_done(session=NEW_SESSION)[source]

Check whether the immediate dependents of this task instance have succeeded or have been skipped.

This is meant to be used by wait_for_downstream.

This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table.

Parameters

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

get_previous_dagrun(state=None, session=None)[source]

Return the DagRun that ran before this task instance’s DagRun.

Parameters
get_previous_ti(state=None, session=NEW_SESSION)[source]

Return the task instance for the task that ran before this task instance.

Parameters
get_previous_execution_date(state=None, session=NEW_SESSION)[source]

Return the execution date from property previous_ti_success.

Parameters
get_previous_start_date(state=None, session=NEW_SESSION)[source]

Return the start date from property previous_ti_success.

Parameters
are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]

Are all conditions met for this task instance to be run given the context for the dependencies.

(e.g. a task instance being force run from the UI will ignore some dependencies).

Parameters
  • dep_context (airflow.ti_deps.dep_context.DepContext | None) – The execution context that determines the dependencies that should be evaluated.

  • session (sqlalchemy.orm.session.Session) – database session

  • verbose (bool) – whether log details on failed dependencies on info or debug log level

get_failed_dep_statuses(dep_context=None, session=NEW_SESSION)[source]

Get failed Dependencies.

__repr__()[source]

Return repr(self).

next_retry_datetime()[source]

Get datetime of the next retry if the task instance fails.

For exponential backoff, retry_delay is used as base and will be converted to seconds.

ready_for_retry()[source]

Check on whether the task instance is in the right state and timeframe to be retried.

get_dagrun(session=NEW_SESSION)[source]

Return the DagRun for this TaskInstance.

Parameters

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

Returns

DagRun

Return type

airflow.models.dagrun.DagRun

classmethod ensure_dag(task_instance, session=NEW_SESSION)[source]

Ensure that task has a dag object associated, might have been removed by serialization.

check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, external_executor_id=None, session=NEW_SESSION)[source]
emit_state_change_metric(new_state)[source]

Send a time metric representing how much time a given state transition took.

The previous state and metric name is deduced from the state the task was put in.

Parameters

new_state (airflow.utils.state.TaskInstanceState) – The state that has just been set for this task. We do not use self.state, because sometimes the state is updated directly in the DB and not in the local TaskInstance object. Supported states: QUEUED and RUNNING

clear_next_method_args()[source]

Ensure we unset next_method and next_kwargs to ensure that any retries don’t reuse them.

defer_task(exception, session=NEW_SESSION)[source]

Mark the task as deferred and sets up the trigger that is needed to resume it when TaskDeferred is raised.

run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]

Run TaskInstance.

dry_run()[source]

Only Renders Templates for the TI.

classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_stop=False)[source]

Handle Failure for the TaskInstance.

Parameters

fail_stop (bool) – if true, stop remaining tasks in dag

static save_to_db(ti, session=NEW_SESSION)[source]
handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]

Handle Failure for a task instance.

Parameters
  • error (None | str | BaseException) – if specified, log the specific exception if thrown

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • test_mode (bool | None) – doesn’t record success or failure in the DB if True

  • context (airflow.utils.context.Context | None) – Jinja2 context

  • force_fail (bool) – if True, task does not retry

is_eligible_to_retry()[source]

Is task instance is eligible for retry.

get_template_context(session=None, ignore_param_exceptions=True)[source]

Return TI Context.

Parameters
  • session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM Session

  • ignore_param_exceptions (bool) – flag to suppress value exceptions while initializing the ParamsDict

get_rendered_template_fields(session=NEW_SESSION)[source]

Update task with rendered template fields for presentation in UI.

If task has already run, will fetch from DB; otherwise will render.

overwrite_params_with_dag_run_conf(params, dag_run)[source]

Overwrite Task Params with DagRun.conf.

render_templates(context=None, jinja_env=None)[source]

Render templates in the operator fields.

If the task was originally mapped, this may replace self.task with the unmapped, fully rendered BaseOperator. The original self.task before replacement is returned.

render_k8s_pod_yaml()[source]

Render the k8s pod yaml.

get_rendered_k8s_spec(session=NEW_SESSION)[source]

Render the k8s pod yaml.

get_email_subject_content(exception, task=None)[source]

Get the email subject content for exceptions.

Parameters
email_alert(exception, task)[source]

Send alert email with exception information.

Parameters
set_duration()[source]

Set task instance duration.

xcom_push(key, value, execution_date=None, session=NEW_SESSION)[source]

Make an XCom available for tasks to pull.

Parameters
  • key (str) – Key to store the value under.

  • value (Any) – Value to store. What types are possible depends on whether enable_xcom_pickling is true or not. If so, this can be any picklable object; only be JSON-serializable may be used otherwise.

  • execution_date (datetime.datetime | None) – Deprecated parameter that has no effect.

xcom_pull(task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False, session=NEW_SESSION, *, map_indexes=None, default=None)[source]

Pull XComs that optionally meet certain criteria.

Parameters
  • key (str) – A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass None.

  • task_ids (str | Iterable[str] | None) – Only XComs from tasks with matching ids will be pulled. Pass None to remove the filter.

  • dag_id (str | None) – If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used.

  • map_indexes (int | Iterable[int] | None) – If provided, only pull XComs with matching indexes. If None (default), this is inferred from the task(s) being pulled (see below for details).

  • include_prior_dates (bool) – If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.

When pulling one single task (task_id is None or a str) without specifying map_indexes, the return value is inferred from whether the specified task is mapped. If not, value from the one single task instance is returned. If the task to pull is mapped, an iterator (not a list) yielding XComs from mapped task instances is returned. In either case, default (None if not specified) is returned if no matching XComs are found.

When pulling multiple tasks (i.e. either task_id or map_index is a non-str iterable), a list of matching XComs is returned. Elements in the list is ordered by item ordering in task_id and map_index.

get_num_running_task_instances(session, same_dagrun=False)[source]

Return Number of running TIs from the DB.

init_run_context(raw=False)[source]

Set the log context.

static filter_for_tis(tis)[source]

Return SQLAlchemy filter to query selected task instances.

schedule_downstream_tasks(session=NEW_SESSION, max_tis_per_query=None)[source]

Schedule downstream tasks of this task instance.

get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]

Infer the map indexes of an upstream “relevant” to this ti.

The bulk of the logic mainly exists to solve the problem described by the following example, where ‘val’ must resolve to different values, depending on where the reference is being used:

@task
def this_task(v):  # This is self.task.
    return v * 2


@task_group
def tg1(inp):
    val = upstream(inp)  # This is the upstream task.
    this_task(val)  # When inp is 1, val here should resolve to 2.
    return val


# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])


@task_group
def tg2(inp):
    another_task(inp, val)  # val here should resolve to [2, 4, 6].


tg2.expand(inp=["a", "b"])

The surrounding mapped task groups of upstream and self.task are inspected to find a common “ancestor”. If such an ancestor is found, we need to return specific map indexes to pull a partial value from upstream XCom.

Parameters
  • upstream (airflow.models.operator.Operator) – The referenced upstream task.

  • ti_count (int | None) – The total count of task instance this task was expanded by the scheduler, i.e. expanded_ti_count in the template context.

Returns

Specific map index or map indexes to pull, or None if we want to “whole” return value (i.e. no mapped task groups involved).

Return type

int | range | None

airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None)[source]

Simplified Task Instance.

Used to send data between processes via Queues.

__eq__(other)[source]

Return self==value.

as_dict()[source]
classmethod from_ti(ti)[source]
classmethod from_dict(obj_dict)[source]
class airflow.models.taskinstance.TaskInstanceNote(content, user_id=None)[source]

Bases: airflow.models.base.TaskInstanceDependencies

For storage of arbitrary notes concerning the task instance.

__tablename__ = 'task_instance_note'[source]
user_id[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
content[source]
created_at[source]
updated_at[source]
task_instance[source]
__table_args__ = ()[source]
__repr__()[source]

Return repr(self).

Was this entry helpful?