airflow.models.taskinstance

Module Contents

airflow.models.taskinstance.ApiClient[source]
airflow.models.taskinstance.TR[source]
airflow.models.taskinstance.Context[source]
airflow.models.taskinstance.log[source]
airflow.models.taskinstance.set_current_context(context: Context)[source]
Sets the current execution context to the provided context object.
This method should be called once per Task execution, before calling operator.execute.
airflow.models.taskinstance.load_error_file(fd: IO[bytes]) → Optional[Union[str, Exception]][source]
Load and return error from error file
airflow.models.taskinstance.set_error_file(error_file: str, error: Union[str, Exception])None[source]
Write error into error file by path
airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state: Union[DagRunState, Literal[False]] = DagRunState.QUEUED)[source]
Clears a set of task instances, but makes sure the running ones
get killed.
Parameters
  • tis -- a list of task instances

  • session -- current session

  • dag_run_state -- state to set DagRun to. If set to False, dagrun state will not be changed.

  • dag -- DAG object

  • activate_dag_runs -- Deprecated parameter, do not pass

class airflow.models.taskinstance.TaskInstanceKey[source]

Bases: typing.NamedTuple

Key used to identify task instance.

dag_id :str[source]
task_id :str[source]
run_id :str[source]
try_number :int = 1[source]
primary[source]

Return task instance primary key part of the key

reduced[source]

Remake the key by subtracting 1 from try number to match in memory information

key[source]

For API-compatibly with TaskInstance.

Returns self

with_try_number(self, try_number: int)[source]

Returns TaskInstanceKey with provided try_number

class airflow.models.taskinstance.TaskInstance(task, execution_date: Optional[datetime] = None, run_id: str = None, state: Optional[str] = None)[source]

Bases: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the state they are in.

The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions.

Database transactions on this table should insure double triggers and any confusion around what task instances are or aren't ready to run even while multiple schedulers may be firing task instances.

__tablename__ = task_instance[source]
task_id[source]
dag_id[source]
run_id[source]
start_date[source]
end_date[source]
duration[source]
state[source]
max_tries[source]
hostname[source]
unixname[source]
job_id[source]
pool[source]
pool_slots[source]
queue[source]
priority_weight[source]
operator[source]
queued_dttm[source]
queued_by_job_id[source]
pid[source]
executor_config[source]
external_executor_id[source]
trigger_id[source]
trigger_timeout[source]
next_method[source]
next_kwargs[source]
__table_args__[source]
dag_model[source]
trigger[source]
dag_run[source]
execution_date[source]
try_number[source]

Return the try number that this task number will be when it is actually run.

If the TaskInstance is currently running, this will match the column in the database, in all other cases this will be incremented.

prev_attempted_tries[source]

Based on this instance's try_number, this will calculate the number of previously attempted tries, defaulting to 0.

next_try_number[source]

Setting Next Try Number

log_url[source]

Log URL for TaskInstance

mark_success_url[source]

URL to mark TI success

key[source]

Returns a tuple that identifies the task instance uniquely

is_premature[source]

Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed.

previous_ti[source]

This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_ti method.

previous_ti_success[source]

This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_ti method.

previous_start_date_success[source]

This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_start_date method.

init_on_load(self)[source]

Initialize the attributes that aren't stored in the DB

command_as_list(self, mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator.

static generate_command(dag_id: str, task_id: str, run_id: str = None, mark_success: bool = False, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, local: bool = False, pickle_id: Optional[int] = None, file_path: Optional[str] = None, raw: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, cfg_path: Optional[str] = None)[source]

Generates the shell command required to execute this task instance.

Parameters
  • dag_id (str) -- DAG ID

  • task_id (str) -- Task ID

  • run_id (datetime) -- The run_id of this task's DagRun

  • mark_success (bool) -- Whether to mark the task as successful

  • ignore_all_deps (bool) -- Ignore all ignorable dependencies. Overrides the other ignore_* parameters.

  • ignore_depends_on_past (bool) -- Ignore depends_on_past parameter of DAGs (e.g. for Backfills)

  • ignore_task_deps (bool) -- Ignore task-specific dependencies such as depends_on_past and trigger rule

  • ignore_ti_state (bool) -- Ignore the task instance's previous failure/success

  • local (bool) -- Whether to run the task locally

  • pickle_id (Optional[int]) -- If the DAG was serialized to the DB, the ID associated with the pickled DAG

  • file_path (Optional[str]) -- path to the file containing the DAG definition

  • raw (Optional[bool]) -- raw mode (needs more details)

  • job_id (Optional[int]) -- job ID (needs more details)

  • pool (Optional[str]) -- the Airflow pool that the task should run in

  • cfg_path (Optional[str]) -- the Path to the configuration file

Returns

shell command that can be used to run the task instance

Return type

list[str]

current_state(self, session=None)[source]

Get the very latest state from the database, if a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used.

Parameters

session (Session) -- SQLAlchemy ORM Session

error(self, session=None)[source]

Forces the task instance's state to FAILED in the database.

Parameters

session (Session) -- SQLAlchemy ORM Session

refresh_from_db(self, session=None, lock_for_update=False)[source]

Refreshes the task instance from the database based on the primary key

Parameters
  • session (Session) -- SQLAlchemy ORM Session

  • lock_for_update (bool) -- if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed.

refresh_from_task(self, task, pool_override=None)[source]

Copy common attributes from the given task.

Parameters
clear_xcom_data(self, session=None)[source]

Clears all XCom data from the database for the task instance

Parameters

session (Session) -- SQLAlchemy ORM Session

set_state(self, state: str, session=None)[source]

Set TaskInstance state.

Parameters
  • state (str) -- State to set for the TI

  • session (Session) -- SQLAlchemy ORM Session

are_dependents_done(self, session=None)[source]

Checks whether the immediate dependents of this task instance have succeeded or have been skipped. This is meant to be used by wait_for_downstream.

This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table.

Parameters

session (Session) -- SQLAlchemy ORM Session

get_previous_dagrun(self, state: Optional[str] = None, session: Optional[Session] = None)[source]

The DagRun that ran before this task instance's DagRun.

Parameters
  • state -- If passed, it only take into account instances of a specific state.

  • session -- SQLAlchemy ORM Session.

get_previous_ti(self, state: Optional[str] = None, session: Session = None)[source]

The task instance for the task that ran before this task instance.

Parameters
  • state -- If passed, it only take into account instances of a specific state.

  • session -- SQLAlchemy ORM Session

get_previous_execution_date(self, state: Optional[str] = None, session: Session = None)[source]

The execution date from property previous_ti_success.

Parameters
  • state -- If passed, it only take into account instances of a specific state.

  • session -- SQLAlchemy ORM Session

get_previous_start_date(self, state: Optional[str] = None, session: Session = None)[source]

The start date from property previous_ti_success.

Parameters
  • state -- If passed, it only take into account instances of a specific state.

  • session -- SQLAlchemy ORM Session

are_dependencies_met(self, dep_context=None, session=None, verbose=False)[source]

Returns whether or not all the conditions are met for this task instance to be run given the context for the dependencies (e.g. a task instance being force run from the UI will ignore some dependencies).

Parameters
  • dep_context (DepContext) -- The execution context that determines the dependencies that should be evaluated.

  • session (sqlalchemy.orm.session.Session) -- database session

  • verbose (bool) -- whether log details on failed dependencies on info or debug log level

get_failed_dep_statuses(self, dep_context=None, session=None)[source]

Get failed Dependencies

__repr__(self)[source]
next_retry_datetime(self)[source]

Get datetime of the next retry if the task instance fails. For exponential backoff, retry_delay is used as base and will be converted to seconds.

ready_for_retry(self)[source]

Checks on whether the task instance is in the right state and timeframe to be retried.

get_dagrun(self, session: Session = None)[source]

Returns the DagRun for this TaskInstance

Parameters

session -- SQLAlchemy ORM Session

Returns

DagRun

check_and_change_state_before_execution(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, external_executor_id: Optional[str] = None, session=None)[source]

Checks dependencies and then sets state to RUNNING if they are met. Returns True if and only if state is set to RUNNING, which implies that task should be executed, in preparation for _run_raw_task

Parameters
  • verbose (bool) -- whether to turn on more verbose logging

  • ignore_all_deps (bool) -- Ignore all of the non-critical dependencies, just runs

  • ignore_depends_on_past (bool) -- Ignore depends_on_past DAG attribute

  • ignore_task_deps (bool) -- Don't check the dependencies of this TaskInstance's task

  • ignore_ti_state (bool) -- Disregards previous task instance state

  • mark_success (bool) -- Don't run the task, mark its state as success

  • test_mode (bool) -- Doesn't record success or failure in the DB

  • job_id (str) -- Job (BackfillJob / LocalTaskJob / SchedulerJob) ID

  • pool (str) -- specifies the pool to use to run the task instance

  • external_executor_id (str) -- The identifier of the celery executor

  • session (Session) -- SQLAlchemy ORM Session

Returns

whether the state was changed to running or not

Return type

bool

clear_next_method_args(self)[source]
run(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None)[source]

Run TaskInstance

dry_run(self)[source]

Only Renders Templates for the TI

handle_failure(self, error: Union[str, Exception], test_mode: Optional[bool] = None, force_fail: bool = False, error_file: Optional[str] = None, session=None)[source]

Handle Failure for the TaskInstance

handle_failure_with_callback(self, error: Union[str, Exception], test_mode: Optional[bool] = None, force_fail: bool = False, session=None)[source]
is_eligible_to_retry(self)[source]

Is task instance is eligible for retry

get_template_context(self, session: Session = None, ignore_param_exceptions: bool = True)[source]

Return TI Context

get_rendered_template_fields(self, session=None)[source]

Fetch rendered template fields from DB

get_rendered_k8s_spec(self, session=None)[source]

Fetch rendered template fields from DB

overwrite_params_with_dag_run_conf(self, params, dag_run)[source]

Overwrite Task Params with DagRun.conf

render_templates(self, context: Optional[Context] = None)[source]

Render templates in the operator fields.

render_k8s_pod_yaml(self)[source]

Render k8s pod yaml

get_email_subject_content(self, exception)[source]

Get the email subject content for exceptions.

email_alert(self, exception)[source]

Send alert email with exception information.

set_duration(self)[source]

Set TI duration

xcom_push(self, key: str, value: Any, execution_date: Optional[datetime] = None, session: Session = None)[source]

Make an XCom available for tasks to pull.

Parameters
  • key (str) -- A key for the XCom

  • value (any picklable object) -- A value for the XCom. The value is pickled and stored in the database.

  • execution_date (datetime) -- if provided, the XCom will not be visible until this date. This can be used, for example, to send a message to a task on a future date without it being immediately visible.

  • session (Session) -- Sqlalchemy ORM Session

xcom_pull(self, task_ids: Optional[Union[str, Iterable[str]]] = None, dag_id: Optional[str] = None, key: str = XCOM_RETURN_KEY, include_prior_dates: bool = False, session: Session = None)[source]

Pull XComs that optionally meet certain criteria.

The default value for key limits the search to XComs that were returned by other tasks (as opposed to those that were pushed manually). To remove this filter, pass key=None (or any desired value).

If a single task_id string is provided, the result is the value of the most recent matching XCom from that task_id. If multiple task_ids are provided, a tuple of matching values is returned. None is returned whenever no matches are found.

Parameters
  • key (str) -- A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as a constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass key=None.

  • task_ids (str or iterable of strings (representing task_ids)) -- Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.

  • dag_id (str) -- If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used.

  • include_prior_dates (bool) -- If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.

  • session (Session) -- Sqlalchemy ORM Session

get_num_running_task_instances(self, session)[source]

Return Number of running TIs from the DB

init_run_context(self, raw=False)[source]

Sets the log context.

static filter_for_tis(tis: Iterable[Union['TaskInstance', TaskInstanceKey]])[source]

Returns SQLAlchemy filter to query selected task instances

airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(ti: TaskInstance)[source]

Simplified Task Instance.

Used to send data between processes via Queues.

dag_id[source]
task_id[source]
run_id[source]
start_date[source]
end_date[source]
try_number[source]
state[source]
pool[source]
priority_weight[source]
queue[source]
key[source]
executor_config[source]
airflow.models.taskinstance.STATICA_HACK = True[source]
airflow.models.taskinstance.queued_by_job[source]

Was this entry helpful?