airflow.models.dagrun
¶
Module Contents¶
Classes¶
Type of return for DagRun.task_instance_scheduling_decisions. |
|
Invocation instance of a DAG. |
|
For storage of arbitrary notes concerning the dagrun instance. |
Attributes¶
- airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|dataset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]¶
- class airflow.models.dagrun.TISchedulingDecision[source]¶
Bases:
NamedTuple
Type of return for DagRun.task_instance_scheduling_decisions.
- schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- finished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, execution_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, dag_hash=None, creating_job_id=None, data_interval=None)[source]¶
Bases:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
Invocation instance of a DAG.
A DAG run can be created by the scheduler (i.e. scheduled runs), or by an external trigger (i.e. manual runs).
- property logical_date: datetime.datetime[source]¶
- set_state(state)[source]¶
Change the state of the DagRan.
Changes to attributes are implemented in accordance with the following table (rows represent old states, columns represent new states):
¶ QUEUED
RUNNING
SUCCESS
FAILED
None
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
QUEUED
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
RUNNING
queued_at = timezone.utcnow() start_date = None end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
SUCCESS
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
FAILED
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
- refresh_from_db(session=NEW_SESSION)[source]¶
Reload the current dagrun from the database.
- Parameters
session (sqlalchemy.orm.Session) – database session
- classmethod active_runs_of_dags(dag_ids=None, only_running=False, session=NEW_SESSION)[source]¶
Get the number of active dag runs for each dag.
- classmethod next_dagruns_to_examine(state, session, max_number=None)[source]¶
Return the next DagRuns that the scheduler should attempt to schedule.
This will return zero or more DagRun rows that are row-level-locked with a “SELECT … FOR UPDATE” query, you should ensure that any scheduling decisions are made in a single transaction – as soon as the transaction is committed it will be unlocked.
- classmethod find(dag_id=None, run_id=None, execution_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, execution_start_date=None, execution_end_date=None)[source]¶
Return a set of dag runs for the given search criteria.
- Parameters
dag_id (str | list[str] | None) – the dag_id or list of dag_id to find dag runs for
run_id (Iterable[str] | None) – defines the run id for this dag run
run_type (airflow.utils.types.DagRunType | None) – type of DagRun
execution_date (datetime.datetime | Iterable[datetime.datetime] | None) – the execution date
state (airflow.utils.state.DagRunState | None) – the state of the dag run
external_trigger (bool | None) – whether this dag run is externally triggered
no_backfills (bool) – return no backfills (True), return all (False). Defaults to False
session (sqlalchemy.orm.Session) – database session
execution_start_date (datetime.datetime | None) – dag run that was executed from this date
execution_end_date (datetime.datetime | None) – dag run that was executed until this date
- classmethod find_duplicate(dag_id, run_id, execution_date, session=NEW_SESSION)[source]¶
Return an existing run for the DAG with a specific run_id or execution_date.
None is returned if no such DAG run is found.
- Parameters
dag_id (str) – the dag_id to find duplicates for
run_id (str) – defines the run id for this dag run
execution_date (datetime.datetime) – the execution date
session (sqlalchemy.orm.Session) – database session
- static generate_run_id(run_type, execution_date)[source]¶
Generate Run ID based on Run Type and Execution Date.
- static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]¶
Return the task instances for this dag run.
- get_task_instances(state=None, session=NEW_SESSION)[source]¶
Returns the task instances for this dag run.
Redirect to DagRun.fetch_task_instances method. Keep this method because it is widely used across the code.
- get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]¶
Return the task instance specified by task_id for this dag run.
- Parameters
task_id (str) – the task id
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]¶
Returns the task instance specified by task_id for this dag run.
- Parameters
dag_id (str) – the DAG id
dag_run_id (str) – the DAG run id
task_id (str) – the task id
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]¶
Return the previous DagRun, if there is one.
- Parameters
dag_run (DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic) – the dag run
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – the dag run state
- static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]¶
Return the previous SCHEDULED DagRun, if there is one.
- Parameters
dag_run_id (int) – the DAG run ID
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
- update_state(session=NEW_SESSION, execute_callbacks=True)[source]¶
Determine the overall state of the DagRun based on the state of its TaskInstances.
- Parameters
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
execute_callbacks (bool) – Should dag callbacks (success/failure, SLA etc.) be invoked directly (default: true) or recorded as a pending request in the
returned_callback
property
- Returns
Tuple containing tis that can be scheduled in the current loop & returned_callback that needs to be executed
- Return type
tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]
- verify_integrity(*, session=NEW_SESSION)[source]¶
Verify the DagRun by checking for removed tasks or tasks that are not in the database yet.
It will set state to removed or add the task if required.
- Missing_indexes
A dictionary of task vs indexes that are missing.
- Parameters
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]¶
Set the given task instances in to the scheduled state.
Each element of
schedulable_tis
should have itstask
attribute already set.Any EmptyOperator without callbacks or outlets is instead set straight to the success state.
All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked – it is the caller’s responsibility to call this function only with TIs from a single dag run.