airflow.models.dagbag

Module Contents

class airflow.models.dagbag.DagBag(dag_folder=None, executor=None, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), store_serialized_dags=False)[source]

Bases: airflow.dag.base_dag.BaseDagBag, airflow.utils.log.logging_mixin.LoggingMixin

A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks. This makes it easier to run distinct environments for say production and development, tests, or for different teams or security profiles. What would have been system level settings are now dagbag level so that one system can run multiple, independent settings sets.

Parameters
  • dag_folder (unicode) – the folder to scan to find DAGs

  • executor – the executor to use when executing task instances in this DagBag

  • include_examples (bool) – whether to include the examples that ship with airflow or not

  • has_logged – an instance boolean that gets flipped from False to True after a file has been skipped. This is to prevent overloading the user with logging messages about skipped files. Therefore only once per DagBag is a file logged being skipped.

  • store_serialized_dags (bool) – Read DAGs from DB if store_serialized_dags is True. If False DAGs are read from python files.

CYCLE_NEW = 0[source]
CYCLE_IN_PROGRESS = 1[source]
CYCLE_DONE = 2[source]
DAGBAG_IMPORT_TIMEOUT[source]
UNIT_TEST_MODE[source]
SCHEDULER_ZOMBIE_TASK_THRESHOLD[source]
dag_ids[source]
size(self)[source]
Returns

the amount of dags contained in this dagbag

get_dag(self, dag_id)[source]

Gets the DAG out of the dictionary, and refreshes it if expired

Parameters

dag_id (str) – DAG Id

process_file(self, filepath, only_if_updated=True, safe_mode=True)[source]

Given a path to a python module or zip file, this method imports the module and look for dag objects within it.

kill_zombies(self, zombies, session=None)[source]

Fail given zombie tasks, which are tasks that haven’t had a heartbeat for too long, in the current DagBag.

Parameters
  • zombies (airflow.utils.dag_processing.SimpleTaskInstance) – zombie task instances to kill.

  • session (sqlalchemy.orm.session.Session) – DB session.

bag_dag(self, dag, parent_dag, root_dag)[source]

Adds the DAG into the bag, recurses into sub dags. Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags

collect_dags(self, dag_folder=None, only_if_updated=True, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))[source]

Given a file path or a folder, this method looks for python modules, imports them and adds them to the dagbag collection.

Note that if a .airflowignore file is found while processing the directory, it will behave much like a .gitignore, ignoring files that match any of the regex patterns specified in the file.

Note: The patterns in .airflowignore are treated as un-anchored regexes, not shell-like glob patterns.

collect_dags_from_db(self)[source]

Collects DAGs from database.

dagbag_report(self)[source]

Prints a report around DagBag loading stats

Was this entry helpful?