airflow.models.dag¶
Module Contents¶
- 
airflow.models.dag.DEFAULT_VIEW_PRESETS= ['tree', 'graph', 'duration', 'gantt', 'landing_times'][source]¶
- 
exception airflow.models.dag.InconsistentDataInterval(instance: Any, start_field_name: str, end_field_name: str)[source]¶
- Bases: - airflow.exceptions.AirflowException- Exception raised when a model populates data interval fields incorrectly. - The data interval fields should either both be None (for runs scheduled prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is implemented). This is raised if exactly one of the fields is None. 
- 
airflow.models.dag.create_timetable(interval: ScheduleIntervalArg, timezone: tzinfo) → Timetable[source]¶
- 
Create a Timetable instance from a ``schedule_interval`` argument.
- 
airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[source]¶
- 
Returns the last dag run for a dag, None if there was none.
- 
Last dag run can be any type of run eg. scheduled or backfilled.
- 
Overridden DagRuns are ignored.
- 
class airflow.models.dag.DAG(dag_id: str, description: Optional[str] = None, schedule_interval: ScheduleIntervalArg = ScheduleIntervalArgNotSet, timetable: Optional[Timetable] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, full_filepath: Optional[str] = None, template_searchpath: Optional[Union[str, Iterable[str]]] = None, template_undefined: Type[jinja2.StrictUndefined] = jinja2.StrictUndefined, user_defined_macros: Optional[Dict] = None, user_defined_filters: Optional[Dict] = None, default_args: Optional[Dict] = None, concurrency: Optional[int] = None, max_active_tasks: int = conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout: Optional[timedelta] = None, sla_miss_callback: Optional[Callable[['DAG', str, str, List[str], List[TaskInstance]], None]] = None, default_view: str = conf.get('webserver', 'dag_default_view').lower(), orientation: str = conf.get('webserver', 'dag_orientation'), catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback: Optional[DagStateChangeCallback] = None, on_failure_callback: Optional[DagStateChangeCallback] = None, doc_md: Optional[str] = None, params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, jinja_environment_kwargs: Optional[Dict] = None, render_template_as_native_obj: bool = False, tags: Optional[List[str]] = None)[source]¶
- Bases: - airflow.utils.log.logging_mixin.LoggingMixin- A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can't run until their previous schedule (and upstream tasks) are completed. - DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG. - Parameters
- dag_id (str) -- The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) 
- description (str) -- The description for the DAG to e.g. be shown on the webserver 
- schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) -- Defines how often that DAG runs, this timedelta object gets added to your latest task instance's execution_date to figure out the next schedule 
- start_date (datetime.datetime) -- The timestamp from which the scheduler will attempt to backfill 
- end_date (datetime.datetime) -- A date beyond which your DAG won't run, leave to None for open ended scheduling 
- template_searchpath (str or list[str]) -- This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default 
- template_undefined (jinja2.StrictUndefined) -- Template undefined type. 
- user_defined_macros (dict) -- a dictionary of macros that will be exposed in your jinja templates. For example, passing - dict(foo='bar')to this argument allows you to- {{ foo }}in all jinja templates related to this DAG. Note that you can pass any type of object here.
- user_defined_filters (dict) -- a dictionary of filters that will be exposed in your jinja templates. For example, passing - dict(hello=lambda name: 'Hello %s' % name)to this argument allows you to- {{ 'world' | hello }}in all jinja templates related to this DAG.
- default_args (dict) -- A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains 'depends_on_past': True here and 'depends_on_past': False in the operator's call default_args, the actual value will be False. 
- params (dict) -- a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level. 
- max_active_tasks (int) -- the number of task instances allowed to run concurrently 
- max_active_runs (int) -- maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs 
- dagrun_timeout (datetime.timedelta) -- specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns. 
- sla_miss_callback (callable) -- specify a function to call when reporting SLA timeouts. See sla_miss_callback for more information about the function signature and parameters that are passed to the callback. 
- default_view (str) -- Specify DAG default view (tree, graph, duration, gantt, landing_times), default tree 
- orientation (str) -- Specify DAG orientation in graph view (LR, TB, RL, BT), default LR 
- catchup (bool) -- Perform scheduler catchup (or only run latest)? Defaults to True 
- on_failure_callback (callable) -- A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function. 
- on_success_callback (callable) -- Much like the - on_failure_callbackexcept that it is executed when the dag succeeds.
- access_control (dict) -- Specify optional DAG-level actions, e.g., "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}" 
- is_paused_upon_creation (bool or None) -- Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used. 
- jinja_environment_kwargs (dict) -- - additional configuration options to be passed to Jinja - Environmentfor template rendering- Example: to avoid Jinja from removing a trailing newline from template strings - DAG(dag_id='my-dag', jinja_environment_kwargs={ 'keep_trailing_newline': True, # some other jinja2 Environment options here } ) 
- render_template_as_native_obj (bool) -- If True, uses a Jinja - NativeEnvironmentto render templates as native Python types. If False, a Jinja- Environmentis used to render templates as string values.
- tags (List[str]) -- List of tags to help filtering DAGs in the UI. 
 
 - 
fileloc:str[source]¶
- File path that needs to be imported to load this DAG or subdag. - This may not be an actual file on disk in the case when this DAG is loaded from a ZIP file or other DAG distribution format. 
 - 
relative_fileloc[source]¶
- File location of the importable dag 'file' relative to the configured DAGs folder. 
 - 
owner[source]¶
- Return list of all owners found in DAG tasks. - Returns
- Comma separated list of owners in DAG tasks 
- Return type
 
 - 
concurrency_reached[source]¶
- This attribute is deprecated. Please use airflow.models.DAG.get_concurrency_reached method. 
 - 
is_paused[source]¶
- This attribute is deprecated. Please use airflow.models.DAG.get_is_paused method. 
 - 
latest_execution_date[source]¶
- This attribute is deprecated. Please use airflow.models.DAG.get_latest_execution_date method. 
 - 
roots[source]¶
- Return nodes with no parents. These are first to execute and are called roots or root nodes. 
 - 
leaves[source]¶
- Return nodes with no children. These are last to execute and are called leaves or leaf nodes. 
 - 
date_range(self, start_date: datetime, num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow())[source]¶
 - 
following_schedule(self, dttm)[source]¶
- Calculates the following schedule for this dag in UTC. - Parameters
- dttm -- utc datetime 
- Returns
- utc datetime 
 
 - 
get_next_data_interval(self, dag_model: 'DagModel')[source]¶
- Get the data interval of the next scheduled run. - For compatibility, this method infers the data interval from the DAG's schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39. - This function is private to Airflow core and should not be depended as a part of the Python API. 
 - 
get_run_data_interval(self, run: DagRun)[source]¶
- Get the data interval of this run. - For compatibility, this method infers the data interval from the DAG's schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39. - This function is private to Airflow core and should not be depended as a part of the Python API. 
 - 
infer_automated_data_interval(self, logical_date: datetime)[source]¶
- Infer a data interval for a run against this DAG. - This method is used to bridge runs created prior to AIP-39 implementation, which do not have an explicit data interval. Therefore, this method only considers - schedule_intervalvalues valid prior to Airflow 2.2.- DO NOT use this method is there is a known data interval. 
 - 
next_dagrun_info(self, last_automated_dagrun: Union[None, datetime, DataInterval], *, restricted: bool = True)[source]¶
- Get information about the next DagRun of this dag after - date_last_automated_dagrun.- This calculates what time interval the next DagRun should operate on (its execution date), and when it can be scheduled, , according to the dag's timetable, start_date, end_date, etc. This doesn't check max active run or any other "max_active_tasks" type limits, but only performs calculations based on the various date and interval fields of this dag and its tasks. - Parameters
- date_last_automated_dagrun -- The - max(execution_date)of existing "automated" DagRuns for this dag (scheduled or backfill, but not manual).
- restricted -- If set to False (default is True), ignore - start_date,- end_date, and- catchupspecified on the DAG or tasks.
 
- Returns
- DagRunInfo of the next dagrun, or None if a dagrun is not going to be scheduled. 
 
 - 
iter_dagrun_infos_between(self, earliest: Optional[pendulum.DateTime], latest: pendulum.DateTime, *, align: bool = True)[source]¶
- Yield DagRunInfo using this DAG's timetable between given interval. - DagRunInfo instances yielded if their - logical_dateis not earlier than- earliest, nor later than- latest. The instances are ordered by their- logical_datefrom earliest to latest.- If - alignis- False, the first run will happen immediately on- earliest, even if it does not fall on the logical timetable schedule. The default is- True, but subdags will ignore this value and always behave as if this is set to- Falsefor backward compatibility.- Example: A DAG is scheduled to run every midnight ( - 0 0 * * *). If- earliestis- 2021-06-03 23:00:00, the first DagRunInfo would be- 2021-06-03 23:00:00if- align=False, and- 2021-06-04 00:00:00if- align=True.
 - 
get_run_dates(self, start_date, end_date=None)[source]¶
- Returns a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates. - Parameters
- start_date (datetime) -- The start date of the interval. 
- end_date (datetime) -- The end date of the interval. Defaults to - timezone.utcnow().
 
- Returns
- A list of dates within the interval following the dag's schedule. 
- Return type
 
 - 
param(self, name: str, default=None)[source]¶
- Return a DagParam object for current dag. - Parameters
- name -- dag parameter name. 
- default -- fallback value for dag parameter. 
 
- Returns
- DagParam instance for specified name and current dag. 
 
 - 
get_concurrency_reached(self, session=None)[source]¶
- Returns a boolean indicating whether the max_active_tasks limit for this DAG has been reached 
 - 
handle_callback(self, dagrun, success=True, reason=None, session=None)[source]¶
- Triggers the appropriate callback depending on the value of success, namely the on_failure_callback or on_success_callback. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a 'reason', primarily to differentiate DagRun failures. - Parameters
- dagrun -- DagRun object 
- success -- Flag to specify if failure or success callback should be called 
- reason -- Completion reason 
- session -- Database session 
 
 
 - 
get_active_runs(self)[source]¶
- Returns a list of dag run execution dates currently running - Returns
- List of execution dates 
 
 - 
get_num_active_runs(self, external_trigger=None, only_running=True, session=None)[source]¶
- Returns the number of active "running" dag runs - Parameters
- external_trigger (bool) -- True for externally triggered active dag runs 
- session -- 
 
- Returns
- number greater than 0 for active dag runs 
 
 - 
get_dagrun(self, execution_date: Optional[str] = None, run_id: Optional[str] = None, session: Optional[Session] = None)[source]¶
- Returns the dag run for a given execution date or run_id if it exists, otherwise none. - Parameters
- execution_date -- The execution date of the DagRun to find. 
- run_id -- The run_id of the DagRun to find. 
- session -- 
 
- Returns
- The DagRun if found, otherwise None. 
 
 - 
get_dagruns_between(self, start_date, end_date, session=None)[source]¶
- Returns the list of dag runs between start_date (inclusive) and end_date (inclusive). - Parameters
- start_date -- The starting execution date of the DagRun to find. 
- end_date -- The ending execution date of the DagRun to find. 
- session -- 
 
- Returns
- The list of DagRuns found. 
 
 - 
get_latest_execution_date(self, session: Session)[source]¶
- Returns the latest date for which at least one dag run exists 
 - 
set_dependency(self, upstream_task_id, downstream_task_id)[source]¶
- Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task() 
 - 
get_task_instances_before(self, base_date: datetime, num: int, *, session: Session)[source]¶
- Get - numtask instances before (including)- base_date.- The returned list may contain exactly - numtask instances. It can have less if there are less than- numscheduled DAG runs before- base_date, or more if there are manual task runs between the requested period, which does not count toward- num.
 - 
set_task_instance_state(self, task_id: str, execution_date: datetime, state: State, upstream: Optional[bool] = False, downstream: Optional[bool] = False, future: Optional[bool] = False, past: Optional[bool] = False, commit: Optional[bool] = True, session=None)[source]¶
- Set the state of a TaskInstance to the given state, and clear its downstream tasks that are in failed or upstream_failed state. - Parameters
- task_id (str) -- Task ID of the TaskInstance 
- execution_date (datetime) -- execution_date of the TaskInstance 
- state (State) -- State to set the TaskInstance to 
- upstream (bool) -- Include all upstream tasks of the given task_id 
- downstream (bool) -- Include all downstream tasks of the given task_id 
- future (bool) -- Include all future TaskInstances of the given task_id 
- commit (bool) -- Commit changes 
- past (bool) -- Include all past TaskInstances of the given task_id 
 
 
 - 
topological_sort(self, include_subdag_tasks: bool = False)[source]¶
- Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies. - Heavily inspired by: http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/ - Parameters
- include_subdag_tasks -- whether to include tasks in subdags, default to False 
- Returns
- list of tasks in topological order 
 
 - 
set_dag_runs_state(self, state: str = State.RUNNING, session: Session = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dag_ids: List[str] = None)[source]¶
 - 
clear(self, task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run=False, session=None, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids: FrozenSet[str] = frozenset({}))[source]¶
- Clears a set of task instances associated with the current dag for a specified date range. - Parameters
- task_ids (List[str]) -- List of task ids to clear 
- start_date (datetime.datetime or None) -- The minimum execution_date to clear 
- end_date (datetime.datetime or None) -- The maximum execution_date to clear 
- only_failed (bool) -- Only clear failed tasks 
- only_running (bool) -- Only clear running tasks. 
- confirm_prompt (bool) -- Ask for confirmation 
- include_subdags (bool) -- Clear tasks in subdags and clear external tasks indicated by ExternalTaskMarker 
- include_parentdag (bool) -- Clear tasks in the parent dag of the subdag. 
- dag_run_state -- state to set DagRun to. If set to False, dagrun state will not be changed. 
- dry_run (bool) -- Find the tasks to clear but don't clear them. 
- session (sqlalchemy.orm.session.Session) -- The sqlalchemy session to use 
- dag_bag (airflow.models.dagbag.DagBag) -- The DagBag used to find the dags subdags (Optional) 
- exclude_task_ids (frozenset) -- A set of - task_idthat should not be cleared
 
 
 - 
classmethod clear_dags(cls, dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]¶
 - 
partial_subset(self, task_ids_or_regex: Union[str, RePatternType, Iterable[str]], include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]¶
- Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed. - Parameters
- task_ids_or_regex ([str] or str or re.Pattern) -- Either a list of task_ids, or a regex to match against task ids (as a string, or compiled regex pattern). 
- include_downstream -- Include all downstream tasks of matched tasks, in addition to matched tasks. 
- include_upstream -- Include all upstream tasks of matched tasks, in addition to matched tasks. 
 
 
 - 
add_task(self, task)[source]¶
- Add a task to the DAG - Parameters
- task (task) -- the task you want to add 
 
 - 
add_tasks(self, tasks)[source]¶
- Add a list of tasks to the DAG - Parameters
- tasks (list of tasks) -- a lit of tasks you want to add 
 
 - 
run(self, start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False)[source]¶
- Runs the DAG. - Parameters
- start_date (datetime.datetime) -- the start date of the range to run 
- end_date (datetime.datetime) -- the end date of the range to run 
- mark_success (bool) -- True to mark jobs as succeeded without running them 
- local (bool) -- True to run the tasks using the LocalExecutor 
- executor (airflow.executor.base_executor.BaseExecutor) -- The executor instance to run the tasks 
- donot_pickle (bool) -- True to avoid pickling DAG object and send to workers 
- ignore_task_deps (bool) -- True to skip upstream tasks 
- ignore_first_depends_on_past (bool) -- True to ignore depends_on_past dependencies for the first set of tasks only 
- pool (str) -- Resource pool to use 
- delay_on_limit_secs (float) -- Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached 
- verbose (bool) -- Make logging output more verbose 
- conf (dict) -- user defined dictionary passed from CLI 
- rerun_failed_tasks -- 
- run_backwards -- 
- run_at_least_once -- If true, always run the DAG at least once even if no logical run exists within the time range. 
 
- Type
- Type
- Type
 
 - 
create_dagrun(self, state: DagRunState, execution_date: Optional[datetime] = None, run_id: Optional[str] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = False, conf: Optional[dict] = None, run_type: Optional[DagRunType] = None, session=None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None, data_interval: Optional[Tuple[datetime, datetime]] = None)[source]¶
- Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run. - Parameters
- run_id (str) -- defines the run id for this dag run 
- run_type (airflow.utils.types.DagRunType) -- type of DagRun 
- execution_date (datetime.datetime) -- the execution date of this dag run 
- state (airflow.utils.state.DagRunState) -- the state of the dag run 
- start_date (datetime) -- the date this dag run should be evaluated 
- external_trigger (bool) -- whether this dag run is externally triggered 
- conf (dict) -- Dict containing configuration/parameters to pass to the DAG 
- creating_job_id (int) -- id of the job creating this DagRun 
- session (sqlalchemy.orm.session.Session) -- database session 
- dag_hash (str) -- Hash of Serialized DAG 
- data_interval (tuple[datetime, datetime] | None) -- Data interval of the DagRun 
 
 
 - 
classmethod bulk_sync_to_db(cls, dags: Collection['DAG'], session=None)[source]¶
- This method is deprecated in favor of bulk_write_to_db 
 - 
classmethod bulk_write_to_db(cls, dags: Collection['DAG'], session=None)[source]¶
- Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including calculated fields. - Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator. - Parameters
- dags (List[airflow.models.dag.DAG]) -- the DAG objects to save to the DB 
- Returns
- None 
 
 - 
sync_to_db(self, session=None)[source]¶
- Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator. - Returns
- None 
 
 - 
static deactivate_unknown_dags(active_dag_ids, session=None)[source]¶
- Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM - Parameters
- active_dag_ids (list[unicode]) -- list of DAG IDs that are active 
- Returns
- None 
 
 - 
static deactivate_stale_dags(expiration_date, session=None)[source]¶
- Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted. - Parameters
- expiration_date (datetime) -- set inactive DAGs that were touched before this time 
- Returns
- None 
 
 - 
static get_num_task_instances(dag_id, task_ids=None, states=None, session=None)[source]¶
- Returns the number of task instances in the given DAG. 
 - 
classmethod get_serialized_fields(cls)[source]¶
- Stringified DAGs and operators contain exactly these fields. 
 - 
get_edge_info(self, upstream_task_id: str, downstream_task_id: str)[source]¶
- Returns edge information for the given pair of tasks if present, and None if there is no information. 
 
- 
class airflow.models.dag.DagTag[source]¶
- Bases: - airflow.models.base.Base- A tag name per dag, to allow quick filtering in the DAG view. 
- 
class airflow.models.dag.DagModel(concurrency=None, **kwargs)[source]¶
- Bases: - airflow.models.base.Base- Table containing DAG properties - 
relative_fileloc[source]¶
- File location of the importable dag 'file' relative to the configured DAGs folder. 
 - 
static get_paused_dag_ids(dag_ids: List[str], session: Session = None)[source]¶
- Given a list of dag_ids, get a set of Paused Dag Ids - Parameters
- dag_ids -- List of Dag ids 
- session -- ORM Session 
 
- Returns
- Paused Dag_ids 
 
 - 
get_default_view(self)[source]¶
- Get the Default DAG View, returns the default config value if DagModel does not have a value 
 - 
set_is_paused(self, is_paused: bool, including_subdags: bool = True, session=None)[source]¶
- Pause/Un-pause a DAG. - Parameters
- is_paused -- Is the DAG paused 
- including_subdags -- whether to include the DAG's subdags 
- session -- session 
 
 
 - 
classmethod deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None)[source]¶
- Set - is_active=Falseon the DAGs for which the DAG files have been removed.- Parameters
- alive_dag_filelocs -- file paths of alive DAGs 
- session -- ORM Session 
 
 
 - 
classmethod dags_needing_dagruns(cls, session: Session)[source]¶
- Return (and lock) a list of Dag objects that are due to create a new DagRun. - This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. 
 
- 
- 
airflow.models.dag.dag(*dag_args, **dag_kwargs)[source]¶
- 
Python dag decorator. Wraps a function into an Airflow DAG.
- 
Accepts kwargs for operator kwarg. Can be used to parametrize DAGs.
- Parameters
- dag_args (Any) -- Arguments for DAG object 
- dag_kwargs (Any) -- Kwargs for DAG object. 
 
 
- 
class airflow.models.dag.DagContext[source]¶
- DAG context is used to keep the current DAG when DAG is used as ContextManager. - You can use DAG as context: - with DAG( dag_id="example_dag", default_args=default_args, schedule_interval="0 0 * * *", dagrun_timeout=timedelta(minutes=60), ) as dag: ... - If you do this the context stores the DAG and whenever new task is created, it will use such stored DAG as the parent DAG.