airflow.models.dagrun

Module Contents

Classes

TISchedulingDecision

Type of return for DagRun.task_instance_scheduling_decisions

DagRun

DagRun describes an instance of a Dag. It can be created

class airflow.models.dagrun.TISchedulingDecision[source]

Bases: NamedTuple

Type of return for DagRun.task_instance_scheduling_decisions

tis :List[airflow.models.taskinstance.TaskInstance][source]
schedulable_tis :List[airflow.models.taskinstance.TaskInstance][source]
changed_tis :bool[source]
unfinished_tasks :List[airflow.models.taskinstance.TaskInstance][source]
finished_tasks :List[airflow.models.taskinstance.TaskInstance][source]
class airflow.models.dagrun.DagRun(dag_id: Optional[str] = None, run_id: Optional[str] = None, queued_at: Optional[datetime.datetime] = __NO_VALUE, execution_date: Optional[datetime.datetime] = None, start_date: Optional[datetime.datetime] = None, external_trigger: Optional[bool] = None, conf: Optional[Any] = None, state: Optional[airflow.utils.state.DagRunState] = None, run_type: Optional[str] = None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None, data_interval: Optional[Tuple[datetime.datetime, datetime.datetime]] = None)[source]

Bases: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

DagRun describes an instance of a Dag. It can be created by the scheduler (for regular runs) or by an external trigger

__tablename__ = dag_run[source]
id[source]
dag_id[source]
queued_at[source]
execution_date[source]
start_date[source]
end_date[source]
run_id[source]
creating_job_id[source]
external_trigger[source]
run_type[source]
conf[source]
data_interval_start[source]
data_interval_end[source]
last_scheduling_decision[source]
dag_hash[source]
dag[source]
__table_args__[source]
task_instances[source]
DEFAULT_DAGRUNS_TO_EXAMINE[source]
__repr__(self)[source]

Return repr(self).

property logical_date(self) datetime.datetime[source]
get_state(self)[source]
set_state(self, state: airflow.utils.state.DagRunState)[source]
property state(self)[source]
refresh_from_db(self, session: sqlalchemy.orm.session.Session = None)[source]

Reloads the current dagrun from the database

Parameters

session (Session) -- database session

classmethod active_runs_of_dags(cls, dag_ids=None, only_running=False, session=None) Dict[str, int][source]

Get the number of active dag runs for each dag.

classmethod next_dagruns_to_examine(cls, state: airflow.utils.state.DagRunState, session: sqlalchemy.orm.session.Session, max_number: Optional[int] = None)[source]

Return the next DagRuns that the scheduler should attempt to schedule.

This will return zero or more DagRun rows that are row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked.

Return type

list[airflow.models.DagRun]

classmethod find(cls, dag_id: Optional[Union[str, List[str]]] = None, run_id: Optional[str] = None, execution_date: Optional[Union[datetime.datetime, List[datetime.datetime]]] = None, state: Optional[airflow.utils.state.DagRunState] = None, external_trigger: Optional[bool] = None, no_backfills: bool = False, run_type: Optional[airflow.utils.types.DagRunType] = None, session: sqlalchemy.orm.session.Session = None, execution_start_date: Optional[datetime.datetime] = None, execution_end_date: Optional[datetime.datetime] = None) List[DagRun][source]

Returns a set of dag runs for the given search criteria.

Parameters
  • dag_id (str or list[str]) -- the dag_id or list of dag_id to find dag runs for

  • run_id (str) -- defines the run id for this dag run

  • run_type (airflow.utils.types.DagRunType) -- type of DagRun

  • execution_date (datetime.datetime or list[datetime.datetime]) -- the execution date

  • state (DagRunState) -- the state of the dag run

  • external_trigger (bool) -- whether this dag run is externally triggered

  • no_backfills (bool) -- return no backfills (True), return all (False). Defaults to False

  • session (sqlalchemy.orm.session.Session) -- database session

  • execution_start_date (datetime.datetime) -- dag run that was executed from this date

  • execution_end_date (datetime.datetime) -- dag run that was executed until this date

classmethod find_duplicate(cls, dag_id: str, run_id: str, execution_date: datetime.datetime, session: sqlalchemy.orm.session.Session = None) Optional[DagRun][source]

Return an existing run for the DAG with a specific run_id or execution_date.

None is returned if no such DAG run is found.

Parameters
static generate_run_id(run_type: airflow.utils.types.DagRunType, execution_date: datetime.datetime) str[source]

Generate Run ID based on Run Type and Execution Date

get_task_instances(self, state: Optional[Iterable[airflow.utils.state.TaskInstanceState]] = None, session=None) Iterable[airflow.models.taskinstance.TaskInstance][source]

Returns the task instances for this dag run

get_task_instance(self, task_id: str, session: sqlalchemy.orm.session.Session = None) Optional[airflow.models.taskinstance.TaskInstance][source]

Returns the task instance specified by task_id for this dag run

Parameters
  • task_id (str) -- the task id

  • session (Session) -- Sqlalchemy ORM Session

get_dag(self) airflow.models.dag.DAG[source]

Returns the Dag associated with this DagRun.

Returns

DAG

get_previous_dagrun(self, state: Optional[airflow.utils.state.DagRunState] = None, session: sqlalchemy.orm.session.Session = None) Optional[DagRun][source]

The previous DagRun, if there is one

get_previous_scheduled_dagrun(self, session: sqlalchemy.orm.session.Session = None) Optional[DagRun][source]

The previous, SCHEDULED DagRun, if there is one

update_state(self, session: sqlalchemy.orm.session.Session = None, execute_callbacks: bool = True) Tuple[List[airflow.models.taskinstance.TaskInstance], Optional[airflow.utils.callback_requests.DagCallbackRequest]][source]

Determines the overall state of the DagRun based on the state of its TaskInstances.

Parameters
  • session (Session) -- Sqlalchemy ORM Session

  • execute_callbacks (bool) -- Should dag callbacks (success/failure, SLA etc) be invoked directly (default: true) or recorded as a pending request in the callback property

Returns

Tuple containing tis that can be scheduled in the current loop & callback that needs to be executed

task_instance_scheduling_decisions(self, session: sqlalchemy.orm.session.Session = None) TISchedulingDecision[source]
verify_integrity(self, session: sqlalchemy.orm.session.Session = None)[source]

Verifies the DagRun by checking for removed tasks or tasks that are not in the database yet. It will set state to removed or add the task if required.

Parameters

session (Session) -- Sqlalchemy ORM Session

static get_run(session: sqlalchemy.orm.session.Session, dag_id: str, execution_date: datetime.datetime) Optional[DagRun][source]

Get a single DAG Run

Parameters
  • session (Session) -- Sqlalchemy ORM Session

  • dag_id (unicode) -- DAG ID

  • execution_date (datetime) -- execution date

Returns

DagRun corresponding to the given dag_id and execution date if one exists. None otherwise.

Return type

airflow.models.DagRun

property is_backfill(self) bool[source]
classmethod get_latest_runs(cls, session=None) List[DagRun][source]

Returns the latest DagRun for each DAG

schedule_tis(self, schedulable_tis: Iterable[airflow.models.taskinstance.TaskInstance], session: sqlalchemy.orm.session.Session = None) int[source]

Set the given task instances in to the scheduled state.

Each element of schedulable_tis should have it's task attribute already set.

Any DummyOperator without callbacks is instead set straight to the success state.

All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it is the caller's responsibility to call this function only with TIs from a single dag run.

Was this entry helpful?