airflow.models.dag
¶
Module Contents¶
-
airflow.models.dag.
DEFAULT_VIEW_PRESETS
= ['tree', 'graph', 'duration', 'gantt', 'landing_times'][source]¶
-
airflow.models.dag.
get_last_dagrun
(dag_id, session, include_externally_triggered=False)[source]¶ -
Returns the last dag run for a dag, None if there was none.
-
Last dag run can be any type of run eg. scheduled or backfilled.
-
Overridden DagRuns are ignored.
-
class
airflow.models.dag.
DAG
(dag_id: str, description: Optional[str] = None, schedule_interval: Optional[ScheduleInterval] = timedelta(days=1), start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, full_filepath: Optional[str] = None, template_searchpath: Optional[Union[str, Iterable[str]]] = None, template_undefined: Type[jinja2.StrictUndefined] = jinja2.StrictUndefined, user_defined_macros: Optional[Dict] = None, user_defined_filters: Optional[Dict] = None, default_args: Optional[Dict] = None, concurrency: int = conf.getint('core', 'dag_concurrency'), max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout: Optional[timedelta] = None, sla_miss_callback: Optional[Callable] = None, default_view: str = conf.get('webserver', 'dag_default_view').lower(), orientation: str = conf.get('webserver', 'dag_orientation'), catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback: Optional[DagStateChangeCallback] = None, on_failure_callback: Optional[DagStateChangeCallback] = None, doc_md: Optional[str] = None, params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, jinja_environment_kwargs: Optional[Dict] = None, tags: Optional[List[str]] = None)[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can't run until their previous schedule (and upstream tasks) are completed.
DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.
- Parameters
dag_id (str) -- The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII)
description (str) -- The description for the DAG to e.g. be shown on the webserver
schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) -- Defines how often that DAG runs, this timedelta object gets added to your latest task instance's execution_date to figure out the next schedule
start_date (datetime.datetime) -- The timestamp from which the scheduler will attempt to backfill
end_date (datetime.datetime) -- A date beyond which your DAG won't run, leave to None for open ended scheduling
template_searchpath (str or list[str]) -- This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default
template_undefined (jinja2.StrictUndefined) -- Template undefined type.
user_defined_macros (dict) -- a dictionary of macros that will be exposed in your jinja templates. For example, passing
dict(foo='bar')
to this argument allows you to{{ foo }}
in all jinja templates related to this DAG. Note that you can pass any type of object here.user_defined_filters (dict) -- a dictionary of filters that will be exposed in your jinja templates. For example, passing
dict(hello=lambda name: 'Hello %s' % name)
to this argument allows you to{{ 'world' | hello }}
in all jinja templates related to this DAG.default_args (dict) -- A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains 'depends_on_past': True here and 'depends_on_past': False in the operator's call default_args, the actual value will be False.
params (dict) -- a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.
concurrency (int) -- the number of task instances allowed to run concurrently
max_active_runs (int) -- maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
dagrun_timeout (datetime.timedelta) -- specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns.
sla_miss_callback (types.FunctionType) -- specify a function to call when reporting SLA timeouts.
default_view (str) -- Specify DAG default view (tree, graph, duration, gantt, landing_times), default tree
orientation (str) -- Specify DAG orientation in graph view (LR, TB, RL, BT), default LR
catchup (bool) -- Perform scheduler catchup (or only run latest)? Defaults to True
on_failure_callback (callable) -- A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
on_success_callback (callable) -- Much like the
on_failure_callback
except that it is executed when the dag succeeds.access_control (dict) -- Specify optional DAG-level permissions, e.g., "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
is_paused_upon_creation (bool or None) -- Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used.
jinja_environment_kwargs (dict) --
additional configuration options to be passed to Jinja
Environment
for template renderingExample: to avoid Jinja from removing a trailing newline from template strings
DAG(dag_id='my-dag', jinja_environment_kwargs={ 'keep_trailing_newline': True, # some other jinja2 Environment options here } )
tags (List[str]) -- List of tags to help filtering DAGS in the UI.
-
owner
[source]¶ Return list of all owners found in DAG tasks.
- Returns
Comma separated list of owners in DAG tasks
- Return type
-
concurrency_reached
[source]¶ This attribute is deprecated. Please use airflow.models.DAG.get_concurrency_reached method.
-
is_paused
[source]¶ This attribute is deprecated. Please use airflow.models.DAG.get_is_paused method.
-
normalized_schedule_interval
[source]¶ Returns Normalized Schedule Interval. This is used internally by the Scheduler to schedule DAGs.
Converts Cron Preset to a Cron Expression (e.g
@monthly
to0 0 1 * *
)If Schedule Interval is "@once" return "None"
If not (1) or (2) returns schedule_interval
-
latest_execution_date
[source]¶ This attribute is deprecated. Please use airflow.models.DAG.get_latest_execution_date method.
-
roots
[source]¶ Return nodes with no parents. These are first to execute and are called roots or root nodes.
-
leaves
[source]¶ Return nodes with no children. These are last to execute and are called leaves or leaf nodes.
-
static
_upgrade_outdated_dag_access_control
(access_control=None)[source]¶ Looks for outdated dag level permissions (can_dag_read and can_dag_edit) in DAG access_controls (for example, {'role1': {'can_dag_read'}, 'role2': {'can_dag_read', 'can_dag_edit'}}) and replaces them with updated permissions (can_read and can_edit).
-
date_range
(self, start_date: datetime, num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow())[source]¶
-
is_fixed_time_schedule
(self)[source]¶ Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
- Returns
True if the schedule has a fixed time, False if not.
-
following_schedule
(self, dttm)[source]¶ Calculates the following schedule for this dag in UTC.
- Parameters
dttm -- utc datetime
- Returns
utc datetime
-
previous_schedule
(self, dttm)[source]¶ Calculates the previous schedule for this dag in UTC
- Parameters
dttm -- utc datetime
- Returns
utc datetime
-
next_dagrun_info
(self, date_last_automated_dagrun: Optional[pendulum.DateTime])[source]¶ Get information about the next DagRun of this dag after
date_last_automated_dagrun
-- the execution date, and the earliest it could be scheduled- Parameters
date_last_automated_dagrun -- The max(execution_date) of existing "automated" DagRuns for this dag (scheduled or backfill, but not manual)
-
next_dagrun_after_date
(self, date_last_automated_dagrun: Optional[pendulum.DateTime])[source]¶ Get the next execution date after the given
date_last_automated_dagrun
, according to schedule_interval, start_date, end_date etc. This doesn't check max active run or any other "concurrency" type limits, it only performs calculations based on the various date and interval fields of this dag and it's tasks.- Parameters
date_last_automated_dagrun (pendulum.Pendulum) -- The execution_date of the last scheduler or backfill triggered run for this dag
-
get_run_dates
(self, start_date, end_date=None)[source]¶ Returns a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates.
- Parameters
start_date (datetime) -- the start date of the interval
end_date (datetime) -- the end date of the interval, defaults to timezone.utcnow()
- Returns
a list of dates within the interval following the dag's schedule
- Return type
-
normalize_schedule
(self, dttm)[source]¶ Returns dttm + interval unless dttm is first interval then it returns dttm
-
param
(self, name: str, default=None)[source]¶ Return a DagParam object for current dag.
- Parameters
name -- dag parameter name.
default -- fallback value for dag parameter.
- Returns
DagParam instance for specified name and current dag.
-
get_concurrency_reached
(self, session=None)[source]¶ Returns a boolean indicating whether the concurrency limit for this DAG has been reached
-
handle_callback
(self, dagrun, success=True, reason=None, session=None)[source]¶ Triggers the appropriate callback depending on the value of success, namely the on_failure_callback or on_success_callback. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a 'reason', primarily to differentiate DagRun failures.
- Parameters
dagrun -- DagRun object
success -- Flag to specify if failure or success callback should be called
reason -- Completion reason
session -- Database session
-
get_active_runs
(self)[source]¶ Returns a list of dag run execution dates currently running
- Returns
List of execution dates
-
get_num_active_runs
(self, external_trigger=None, session=None)[source]¶ Returns the number of active "running" dag runs
- Parameters
external_trigger (bool) -- True for externally triggered active dag runs
session --
- Returns
number greater than 0 for active dag runs
-
get_dagrun
(self, execution_date, session=None)[source]¶ Returns the dag run for a given execution date if it exists, otherwise none.
- Parameters
execution_date -- The execution date of the DagRun to find.
session --
- Returns
The DagRun if found, otherwise None.
-
get_dagruns_between
(self, start_date, end_date, session=None)[source]¶ Returns the list of dag runs between start_date (inclusive) and end_date (inclusive).
- Parameters
start_date -- The starting execution date of the DagRun to find.
end_date -- The ending execution date of the DagRun to find.
session --
- Returns
The list of DagRuns found.
-
get_latest_execution_date
(self, session=None)[source]¶ Returns the latest date for which at least one dag run exists
-
set_dependency
(self, upstream_task_id, downstream_task_id)[source]¶ Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task()
-
topological_sort
(self, include_subdag_tasks: bool = False)[source]¶ Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies.
Heavily inspired by: http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/
- Parameters
include_subdag_tasks -- whether to include tasks in subdags, default to False
- Returns
list of tasks in topological order
-
set_dag_runs_state
(self, state: str = State.RUNNING, session: Session = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dag_ids: List[str] = None)[source]¶
-
clear
(self, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state: str = State.RUNNING, dry_run=False, session=None, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, visited_external_tis=None)[source]¶ Clears a set of task instances associated with the current dag for a specified date range.
- Parameters
start_date (datetime.datetime or None) -- The minimum execution_date to clear
end_date (datetime.datetime or None) -- The maximum execution_date to clear
only_failed (bool) -- Only clear failed tasks
only_running (bool) -- Only clear running tasks.
confirm_prompt (bool) -- Ask for confirmation
include_subdags (bool) -- Clear tasks in subdags and clear external tasks indicated by ExternalTaskMarker
include_parentdag (bool) -- Clear tasks in the parent dag of the subdag.
dag_run_state -- state to set DagRun to
dry_run (bool) -- Find the tasks to clear but don't clear them.
session (sqlalchemy.orm.session.Session) -- The sqlalchemy session to use
get_tis (bool) -- Return the sqlalchemy query for finding the TaskInstance without clearing the tasks
recursion_depth (int) -- The recursion depth of nested calls to DAG.clear().
max_recursion_depth (int) -- The maximum recursion depth allowed. This is determined by the first encountered ExternalTaskMarker. Default is None indicating no ExternalTaskMarker has been encountered.
dag_bag (airflow.models.dagbag.DagBag) -- The DagBag used to find the dags
visited_external_tis (set) -- A set used internally to keep track of the visited TaskInstance when clearing tasks across multiple DAGs linked by ExternalTaskMarker to avoid redundant work.
-
classmethod
clear_dags
(cls, dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=State.RUNNING, dry_run=False)[source]¶
-
partial_subset
(self, task_ids_or_regex: Union[str, PatternType, Iterable[str]], include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]¶ Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed.
- Parameters
task_ids_or_regex ([str] or str or re.Pattern) -- Either a list of task_ids, or a regex to match against task ids (as a string, or compiled regex pattern).
include_downstream -- Include all downstream tasks of matched tasks, in addition to matched tasks.
include_upstream -- Include all upstream tasks of matched tasks, in addition to matched tasks.
-
add_task
(self, task)[source]¶ Add a task to the DAG
- Parameters
task (task) -- the task you want to add
-
add_tasks
(self, tasks)[source]¶ Add a list of tasks to the DAG
- Parameters
tasks (list of tasks) -- a lit of tasks you want to add
-
run
(self, start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False)[source]¶ Runs the DAG.
- Parameters
start_date (datetime.datetime) -- the start date of the range to run
end_date (datetime.datetime) -- the end date of the range to run
mark_success (bool) -- True to mark jobs as succeeded without running them
local (bool) -- True to run the tasks using the LocalExecutor
executor (airflow.executor.base_executor.BaseExecutor) -- The executor instance to run the tasks
donot_pickle (bool) -- True to avoid pickling DAG object and send to workers
ignore_task_deps (bool) -- True to skip upstream tasks
ignore_first_depends_on_past (bool) -- True to ignore depends_on_past dependencies for the first set of tasks only
pool (str) -- Resource pool to use
delay_on_limit_secs (float) -- Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached
verbose (bool) -- Make logging output more verbose
conf (dict) -- user defined dictionary passed from CLI
rerun_failed_tasks --
run_backwards --
- Type
- Type
-
create_dagrun
(self, state: State, execution_date: Optional[datetime] = None, run_id: Optional[str] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = False, conf: Optional[dict] = None, run_type: Optional[DagRunType] = None, session=None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None)[source]¶ Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.
- Parameters
run_id (str) -- defines the run id for this dag run
run_type (airflow.utils.types.DagRunType) -- type of DagRun
execution_date (datetime.datetime) -- the execution date of this dag run
state (airflow.utils.state.State) -- the state of the dag run
start_date (datetime) -- the date this dag run should be evaluated
external_trigger (bool) -- whether this dag run is externally triggered
conf (dict) -- Dict containing configuration/parameters to pass to the DAG
creating_job_id (int) -- id of the job creating this DagRun
session (sqlalchemy.orm.session.Session) -- database session
dag_hash (str) -- Hash of Serialized DAG
-
classmethod
bulk_sync_to_db
(cls, dags: Collection['DAG'], session=None)[source]¶ This method is deprecated in favor of bulk_write_to_db
-
classmethod
bulk_write_to_db
(cls, dags: Collection['DAG'], session=None)[source]¶ Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including calculated fields.
Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
- Parameters
dags (List[airflow.models.dag.DAG]) -- the DAG objects to save to the DB
- Returns
None
-
sync_to_db
(self, session=None)[source]¶ Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
- Returns
None
-
static
deactivate_unknown_dags
(active_dag_ids, session=None)[source]¶ Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM
- Parameters
active_dag_ids (list[unicode]) -- list of DAG IDs that are active
- Returns
None
-
static
deactivate_stale_dags
(expiration_date, session=None)[source]¶ Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted.
- Parameters
expiration_date (datetime) -- set inactive DAGs that were touched before this time
- Returns
None
-
class
airflow.models.dag.
DagTag
[source]¶ Bases:
airflow.models.base.Base
A tag name per dag, to allow quick filtering in the DAG view.
-
class
airflow.models.dag.
DagModel
(**kwargs)[source]¶ Bases:
airflow.models.base.Base
Table containing DAG properties
-
static
get_paused_dag_ids
(dag_ids: List[str], session: Session = None)[source]¶ Given a list of dag_ids, get a set of Paused Dag Ids
- Parameters
dag_ids -- List of Dag ids
session -- ORM Session
- Returns
Paused Dag_ids
-
get_default_view
(self)[source]¶ Get the Default DAG View, returns the default config value if DagModel does not have a value
-
set_is_paused
(self, is_paused: bool, including_subdags: bool = True, session=None)[source]¶ Pause/Un-pause a DAG.
- Parameters
is_paused -- Is the DAG paused
including_subdags -- whether to include the DAG's subdags
session -- session
-
classmethod
deactivate_deleted_dags
(cls, alive_dag_filelocs: List[str], session=None)[source]¶ Set
is_active=False
on the DAGs for which the DAG files have been removed. Additionally changeis_active=False
toTrue
if the DAG file exists.- Parameters
alive_dag_filelocs -- file paths of alive DAGs
session -- ORM Session
-
classmethod
dags_needing_dagruns
(cls, session: Session)[source]¶ Return (and lock) a list of Dag objects that are due to create a new DagRun.
This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked.
-
calculate_dagrun_date_fields
(self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime], active_runs_of_dag: int)[source]¶ Calculate
next_dagrun
and next_dagrun_create_after`- Parameters
dag -- The DAG object
most_recent_dag_run -- DateTime of most recent run of this dag, or none if not yet scheduled.
active_runs_of_dag -- Number of currently active runs of this dag
-
static
-
airflow.models.dag.
dag
(*dag_args, **dag_kwargs)[source]¶ -
Python dag decorator. Wraps a function into an Airflow DAG.
-
Accepts kwargs for operator kwarg. Can be used to parametrize DAGs.
-
class
airflow.models.dag.
DagContext
[source]¶ DAG context is used to keep the current DAG when DAG is used as ContextManager.
You can use DAG as context:
with DAG( dag_id='example_dag', default_args=default_args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60) ) as dag:
If you do this the context stores the DAG and whenever new task is created, it will use such stored DAG as the parent DAG.