airflow.models
¶
Airflow models
Submodules¶
airflow.models.base
airflow.models.baseoperator
airflow.models.connection
airflow.models.crypto
airflow.models.dag
airflow.models.dagbag
airflow.models.dagcode
airflow.models.dagparam
airflow.models.dagpickle
airflow.models.dagrun
airflow.models.errors
airflow.models.log
airflow.models.param
airflow.models.pool
airflow.models.renderedtifields
airflow.models.sensorinstance
airflow.models.serialized_dag
airflow.models.skipmixin
airflow.models.slamiss
airflow.models.taskfail
airflow.models.taskinstance
airflow.models.taskmixin
airflow.models.taskreschedule
airflow.models.trigger
airflow.models.variable
airflow.models.xcom
airflow.models.xcom_arg
Package Contents¶
-
class
airflow.models.
BaseOperator
(task_id: str, owner: str = conf.get('operators', 'DEFAULT_OWNER'), email: Optional[Union[str, Iterable[str]]] = None, email_on_retry: bool = conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure: bool = conf.getboolean('email', 'default_email_on_failure', fallback=True), retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0), retry_delay: timedelta = timedelta(seconds=300), retry_exponential_backoff: bool = False, max_retry_delay: Optional[timedelta] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, depends_on_past: bool = False, wait_for_downstream: bool = False, dag=None, params: Optional[Dict] = None, default_args: Optional[Dict] = None, priority_weight: int = 1, weight_rule: str = conf.get('core', 'default_task_weight_rule', fallback=WeightRule.DOWNSTREAM), queue: str = conf.get('operators', 'default_queue'), pool: Optional[str] = None, pool_slots: int = 1, sla: Optional[timedelta] = None, execution_timeout: Optional[timedelta] = None, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, on_retry_callback: Optional[TaskStateChangeCallback] = None, pre_execute: Optional[TaskPreExecuteHook] = None, post_execute: Optional[TaskPostExecuteHook] = None, trigger_rule: str = TriggerRule.ALL_SUCCESS, resources: Optional[Dict] = None, run_as_user: Optional[str] = None, task_concurrency: Optional[int] = None, max_active_tis_per_dag: Optional[int] = None, executor_config: Optional[Dict] = None, do_xcom_push: bool = True, inlets: Optional[Any] = None, outlets: Optional[Any] = None, task_group: Optional['TaskGroup'] = None, doc: Optional[str] = None, doc_md: Optional[str] = None, doc_json: Optional[str] = None, doc_yaml: Optional[str] = None, doc_rst: Optional[str] = None, **kwargs)[source]¶ Bases:
airflow.models.base.Operator
,airflow.utils.log.logging_mixin.LoggingMixin
,airflow.models.taskmixin.TaskMixin
Abstract base class for all operators. Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the 'execute' method.
Operators derived from this class should perform or trigger certain tasks synchronously (wait for completion). Example of operators could be an operator that runs a Pig job (PigOperator), a sensor operator that waits for a partition to land in Hive (HiveSensorOperator), or one that moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these operators (tasks) target specific operations, running specific scripts, functions or data transfers.
This class is abstract and shouldn't be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a node in DAG objects. Task dependencies should be set by using the set_upstream and/or set_downstream methods.
- Parameters
task_id (str) -- a unique, meaningful id for the task
owner (str) -- the owner of the task. Using a meaningful description (e.g. user/person/team/role name) to clarify ownership is recommended.
email (str or list[str]) -- the 'to' email address(es) used in email alerts. This can be a single email or multiple ones. Multiple addresses can be specified as a comma or semi-colon separated string or by passing a list of strings.
email_on_retry (bool) -- Indicates whether email alerts should be sent when a task is retried
email_on_failure (bool) -- Indicates whether email alerts should be sent when a task failed
retries (int) -- the number of retries that should be performed before failing the task
retry_delay (datetime.timedelta) -- delay between retries
retry_exponential_backoff (bool) -- allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
max_retry_delay (datetime.timedelta) -- maximum delay interval between retries
start_date (datetime.datetime) -- The
start_date
for the task, determines theexecution_date
for the first task instance. The best practice is to have the start_date rounded to your DAG'sschedule_interval
. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latestexecution_date
and adds theschedule_interval
to determine the nextexecution_date
. It is also very important to note that different tasks' dependencies need to line up in time. If task A depends on task B and their start_date are offset in a way that their execution_date don't line up, A's dependencies will never be met. If you are looking to delay a task, for example running a daily task at 2AM, look into theTimeSensor
andTimeDeltaSensor
. We advise against using dynamicstart_date
and recommend using fixed ones. Read the FAQ entry about start_date for more information.end_date (datetime.datetime) -- if specified, the scheduler won't go beyond this date
depends_on_past (bool) -- when set to true, task instances will run sequentially and only if the previous instance has succeeded or has been skipped. The task instance for the start_date is allowed to run.
wait_for_downstream (bool) -- when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully or be skipped before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X. Note that depends_on_past is forced to True wherever wait_for_downstream is used. Also note that only tasks immediately downstream of the previous task instance are waited for; the statuses of any tasks further downstream are ignored.
dag (airflow.models.DAG) -- a reference to the dag the task is attached to (if any)
priority_weight (int) -- priority weight of this task against other task. This allows the executor to trigger higher priority tasks before others when things get backed up. Set priority_weight as a higher number for more important tasks.
weight_rule (str) -- weighting method used for the effective total priority weight of the task. Options are:
{ downstream | upstream | absolute }
default isdownstream
When set todownstream
the effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple dag run instances and desire to have all upstream tasks to complete for all runs before each dag can continue processing downstream tasks. When set toupstream
the effective weight is the aggregate sum of all upstream ancestors. This is the opposite where downstream tasks have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple dag run instances and prefer to have each dag complete before starting upstream tasks of other dags. When set toabsolute
, the effective weight is the exactpriority_weight
specified without additional weighting. You may want to do this when you know exactly what priority weight each task should have. Additionally, when set toabsolute
, there is bonus effect of significantly speeding up the task creation process as for very large DAGs. Options can be set as string or using the constants defined in the static classairflow.utils.WeightRule
queue (str) -- which queue to target when running this job. Not all executors implement queue management, the CeleryExecutor does support targeting specific queues.
pool (str) -- the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks
pool_slots (int) -- the number of pool slots this task should use (>= 1) Values less than 1 are not allowed.
sla (datetime.timedelta) -- time by which the job is expected to succeed. Note that this represents the
timedelta
after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send an email soon after 1:00AM on the2016-01-02
if the2016-01-01
instance has not succeeded yet. The scheduler pays special attention for jobs with an SLA and sends alert emails for SLA misses. SLA misses are also recorded in the database for future reference. All tasks that share the same SLA time get bundled in a single email, sent soon after that time. SLA notification are sent once and only once for each task instance.execution_timeout (datetime.timedelta) -- max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
on_failure_callback (TaskStateChangeCallback) -- a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
on_execute_callback (TaskStateChangeCallback) -- much like the
on_failure_callback
except that it is executed right before the task is executed.on_retry_callback (TaskStateChangeCallback) -- much like the
on_failure_callback
except that it is executed when retries occur.on_success_callback (TaskStateChangeCallback) -- much like the
on_failure_callback
except that it is executed when the task succeeds.pre_execute (TaskPreExecuteHook) --
a function to be called immediately before task execution, receiving a context dictionary; raising an exception will prevent the task from being executed.
This is an experimental feature.
post_execute (TaskPostExecuteHook) --
a function to be called immediately after task execution, receiving a context dictionary and task result; raising an exception will prevent the task from succeeding.
This is an experimental feature.
trigger_rule (str) -- defines the rule by which dependencies are applied for the task to get triggered. Options are:
{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_min_one_success | none_skipped | always}
default isall_success
. Options can be set as string or using the constants defined in the static classairflow.utils.TriggerRule
resources (dict) -- A map of resource parameter names (the argument names of the Resources constructor) to their values.
run_as_user (str) -- unix username to impersonate while running the task
max_active_tis_per_dag (int) -- When set, a task will be able to limit the concurrent runs across execution_dates.
executor_config (dict) --
Additional task-level configuration parameters that are interpreted by a specific executor. Parameters are namespaced by the name of executor.
Example: to run this task in a specific docker container through the KubernetesExecutor
MyOperator(..., executor_config={ "KubernetesExecutor": {"image": "myCustomDockerImage"} } )
do_xcom_push (bool) -- if True, an XCom is pushed containing the Operator's result
task_group (airflow.utils.task_group.TaskGroup) -- The TaskGroup to which the task should belong. This is typically provided when not using a TaskGroup as a context manager.
doc (str) -- Add documentation or notes to your Task objects that is visible in Task Instance details View in the Webserver
doc_md (str) -- Add documentation (in Markdown format) or notes to your Task objects that is visible in Task Instance details View in the Webserver
doc_rst (str) -- Add documentation (in RST format) or notes to your Task objects that is visible in Task Instance details View in the Webserver
doc_json (str) -- Add documentation (in JSON format) or notes to your Task objects that is visible in Task Instance details View in the Webserver
doc_yaml (str) -- Add documentation (in YAML format) or notes to your Task objects that is visible in Task Instance details View in the Webserver
-
template_fields
:Iterable[str] = []¶
-
template_ext
:Iterable[str] = []¶
-
template_fields_renderers
:Dict[str, str]¶
-
ui_color
:str = #fff¶
-
ui_fgcolor
:str = #000¶
-
pool
:str =¶
-
shallow_copy_attrs
:Tuple[str, ...] = []¶
-
operator_extra_links
:Iterable['BaseOperatorLink'] = []¶
-
supports_lineage
= False¶
-
dag
¶ Returns the Operator's DAG if set, otherwise raises an error
-
dag_id
¶ Returns dag id if it has one or an adhoc + owner
-
deps
:Iterable[BaseTIDep]¶ Returns the set of dependencies for the operator. These differ from execution context dependencies in that they are specific to tasks and can be extended/overridden by subclasses.
-
priority_weight_total
¶ Total priority weight for the task. It might include all upstream or downstream tasks. depending on the weight rule.
WeightRule.ABSOLUTE - only own weight
WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks
WeightRule.UPSTREAM - adds priority weight of all upstream tasks
-
upstream_list
¶ @property: list of tasks directly upstream
-
upstream_task_ids
¶ @property: set of ids of tasks directly upstream
-
downstream_list
¶ @property: list of tasks directly downstream
-
downstream_task_ids
¶ @property: set of ids of tasks directly downstream
-
task_type
¶ @property: type of the task
-
roots
¶ Required by TaskMixin
-
leaves
¶ Required by TaskMixin
-
output
¶ Returns reference to XCom pushed by current operator
-
inherits_from_dummy_operator
¶ Used to determine if an Operator is inherited from DummyOperator
-
__eq__
(self, other)¶
-
__ne__
(self, other)¶
-
__hash__
(self)¶
-
__or__
(self, other)¶ Called for [This Operator] | [Operator], The inlets of other will be set to pickup the outlets from this operator. Other will be set as a downstream task of this operator.
-
__gt__
(self, other)¶ Called for [Operator] > [Outlet], so that if other is an attr annotated object it is set as an outlet of this Operator.
-
__lt__
(self, other)¶ Called for [Inlet] > [Operator] or [Operator] < [Inlet], so that if other is an attr annotated object it is set as an inlet to this operator
-
__setattr__
(self, key, value)¶
-
add_inlets
(self, inlets: Iterable[Any])¶ Sets inlets to this operator
-
add_outlets
(self, outlets: Iterable[Any])¶ Defines the outlets of this operator
-
get_inlet_defs
(self)¶ - Returns
list of inlets defined for this operator
-
get_outlet_defs
(self)¶ - Returns
list of outlets defined for this operator
-
has_dag
(self)¶ Returns True if the Operator has been assigned to a DAG.
-
prepare_for_execution
(self)¶ Lock task for execution to disable custom action in __setattr__ and returns a copy of the task
-
set_xcomargs_dependencies
(self)¶ Resolves upstream dependencies of a task. In this way passing an
XComArg
as value for a template field will result in creating upstream relation between two tasks.Example:
with DAG(...): generate_content = GenerateContentOperator(task_id="generate_content") send_email = EmailOperator(..., html_content=generate_content.output) # This is equivalent to with DAG(...): generate_content = GenerateContentOperator(task_id="generate_content") send_email = EmailOperator( ..., html_content="{{ task_instance.xcom_pull('generate_content') }}" ) generate_content >> send_email
-
operator_extra_link_dict
(self)¶ Returns dictionary of all extra links for the operator
-
global_operator_extra_link_dict
(self)¶ Returns dictionary of all global extra links
-
pre_execute
(self, context: Any)¶ This hook is triggered right before self.execute() is called.
-
execute
(self, context: Any)¶ This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
-
post_execute
(self, context: Any, result: Any = None)¶ This hook is triggered right after self.execute() is called. It is passed the execution context and any results returned by the operator.
-
on_kill
(self)¶ Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.
-
__deepcopy__
(self, memo)¶ Hack sorting double chained task lists by task_id to avoid hitting max_depth on deepcopy operations.
-
__getstate__
(self)¶
-
__setstate__
(self, state)¶
-
render_template_fields
(self, context: Dict, jinja_env: Optional[jinja2.Environment] = None)¶ Template all attributes listed in template_fields. Note this operation is irreversible.
- Parameters
context (dict) -- Dict with values to apply on content
jinja_env (jinja2.Environment) -- Jinja environment
-
render_template
(self, content: Any, context: Dict, jinja_env: Optional[jinja2.Environment] = None, seen_oids: Optional[Set] = None)¶ Render a templated string. The content can be a collection holding multiple templated strings and will be templated recursively.
- Parameters
content (Any) -- Content to template. Only strings can be templated (may be inside collection).
context (dict) -- Dict with values to apply on templated content
jinja_env (jinja2.Environment) -- Jinja environment. Can be provided to avoid re-creating Jinja environments during recursion.
seen_oids (set) -- template fields already rendered (to avoid RecursionError on circular dependencies)
- Returns
Templated content
-
get_template_env
(self)¶ Fetch a Jinja template environment from the DAG or instantiate empty environment if no DAG.
-
prepare_template
(self)¶ Hook that is triggered after the templated fields get replaced by their content. If you need your operator to alter the content of the file before the template is rendered, it should override this method to do so.
-
resolve_template_files
(self)¶ Getting the content of files for template_field / template_ext
-
clear
(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, upstream: bool = False, downstream: bool = False, session: Session = None)¶ Clears the state of task instances associated with the task, following the parameters specified.
-
get_task_instances
(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, session: Session = None)¶ Get a set of task instance related to this task for a specific date range.
-
get_flat_relative_ids
(self, upstream: bool = False, found_descendants: Optional[Set[str]] = None)¶ Get a flat set of relatives' ids, either upstream or downstream.
-
get_flat_relatives
(self, upstream: bool = False)¶ Get a flat list of relatives, either upstream or downstream.
-
run
(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ignore_first_depends_on_past: bool = True, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, session: Session = None)¶ Run a set of task instances for a date range.
-
dry_run
(self)¶ Performs dry run for the operator - just render template fields.
-
get_direct_relative_ids
(self, upstream: bool = False)¶ Get set of the direct relative ids to the current task, upstream or downstream.
-
get_direct_relatives
(self, upstream: bool = False)¶ Get list of the direct relatives to the current task, upstream or downstream.
-
__repr__
(self)¶
-
set_downstream
(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], edge_modifier: Optional[EdgeModifier] = None)¶ Set a task or a task list to be directly downstream from the current task. Required by TaskMixin.
-
set_upstream
(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], edge_modifier: Optional[EdgeModifier] = None)¶ Set a task or a task list to be directly upstream from the current task. Required by TaskMixin.
-
static
xcom_push
(context: Any, key: str, value: Any, execution_date: Optional[datetime] = None)¶ Make an XCom available for tasks to pull.
- Parameters
context -- Execution Context Dictionary
key (str) -- A key for the XCom
value (any pickleable object) -- A value for the XCom. The value is pickled and stored in the database.
execution_date (datetime) -- if provided, the XCom will not be visible until this date. This can be used, for example, to send a message to a task on a future date without it being immediately visible.
- Type
Any
-
static
xcom_pull
(context: Any, task_ids: Optional[List[str]] = None, dag_id: Optional[str] = None, key: str = XCOM_RETURN_KEY, include_prior_dates: Optional[bool] = None)¶ Pull XComs that optionally meet certain criteria.
The default value for key limits the search to XComs that were returned by other tasks (as opposed to those that were pushed manually). To remove this filter, pass key=None (or any desired value).
If a single task_id string is provided, the result is the value of the most recent matching XCom from that task_id. If multiple task_ids are provided, a tuple of matching values is returned. None is returned whenever no matches are found.
- Parameters
context -- Execution Context Dictionary
key (str) -- A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as a constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass key=None.
task_ids (str or iterable of strings (representing task_ids)) -- Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.
dag_id (str) -- If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used.
include_prior_dates (bool) -- If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.
- Type
Any
-
extra_links
(self)¶ @property: extra links for the task
-
get_extra_links
(self, dttm: datetime, link_name: str)¶ For an operator, gets the URL that the external links specified in extra_links should point to.
- Raises
ValueError -- The error message of a ValueError will be passed on through to the fronted to show up as a tooltip on the disabled link
- Parameters
dttm -- The datetime parsed execution date for the URL being searched for
link_name -- The name of the link we're looking for the URL for. Should be one of the options specified in extra_links
- Returns
A URL
-
classmethod
get_serialized_fields
(cls)¶ Stringified DAGs and operators contain exactly these fields.
-
is_smart_sensor_compatible
(self)¶ Return if this operator can use smart service. Default False.
-
defer
(self, *, trigger: BaseTrigger, method_name: str, kwargs: Optional[Dict[str, Any]] = None, timeout: Optional[timedelta] = None)¶ Marks this Operator as being "deferred" - that is, suspending its execution until the provided trigger fires an event.
This is achieved by raising a special exception (TaskDeferred) which is caught in the main _execute_task wrapper.
-
class
airflow.models.
BaseOperatorLink
[source]¶ Abstract base class that defines how we get an operator link.
-
operators
:ClassVar[List[Type[BaseOperator]]] = []¶ This property will be used by Airflow Plugins to find the Operators to which you want to assign this Operator Link
- Returns
List of Operator classes used by task for which you want to create extra link
-
name
¶ Name of the link. This will be the button name on the task UI.
- Returns
link name
-
get_link
(self, operator: BaseOperator, dttm: datetime)¶ Link to external system.
- Parameters
operator -- airflow operator
dttm -- datetime
- Returns
link to external system
-
-
class
airflow.models.
Connection
(conn_id: Optional[str] = None, conn_type: Optional[str] = None, description: Optional[str] = None, host: Optional[str] = None, login: Optional[str] = None, password: Optional[str] = None, schema: Optional[str] = None, port: Optional[int] = None, extra: Optional[Union[str, dict]] = None, uri: Optional[str] = None)[source]¶ Bases:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
Placeholder to store information about different database instances connection information. The idea here is that scripts use references to database instances (conn_id) instead of hard coding hostname, logins and passwords when using operators or hooks.
See also
For more information on how to use this class, see: Managing Connections
- Parameters
conn_id (str) -- The connection ID.
conn_type (str) -- The connection type.
description (str) -- The connection description.
host (str) -- The host.
login (str) -- The login.
password (str) -- The password.
schema (str) -- The schema.
port (int) -- The port number.
extra (str) -- Extra metadata. Non-standard data such as private/SSH keys can be saved here. JSON encoded object.
uri (str) -- URI address describing connection parameters.
-
EXTRA_KEY
= __extra__¶
-
__tablename__
= connection¶
-
id
¶
-
conn_id
¶
-
conn_type
¶
-
description
¶
-
host
¶
-
schema
¶
-
login
¶
-
port
¶
-
is_encrypted
¶
-
is_extra_encrypted
¶
-
password
¶ Password. The value is decrypted/encrypted when reading/setting the value.
-
extra
¶ Extra data. The value is decrypted/encrypted when reading/setting the value.
-
extra_dejson
¶ Returns the extra property by deserializing json.
-
on_db_load
(self)¶
-
parse_from_uri
(self, **uri)¶ This method is deprecated. Please use uri parameter in constructor.
-
get_uri
(self)¶ Return connection in URI format
-
get_password
(self)¶ Return encrypted password.
-
get_extra
(self)¶ Return encrypted extra-data.
-
get_hook
(self)¶ Return hook based on conn_type.
-
__repr__
(self)¶
-
log_info
(self)¶ This method is deprecated. You can read each field individually or use the default representation (__repr__).
-
debug_info
(self)¶ This method is deprecated. You can read each field individually or use the default representation (__repr__).
-
test_connection
(self)¶ Calls out get_hook method and executes test_connection method on that.
-
class
airflow.models.
DAG
(dag_id: str, description: Optional[str] = None, schedule_interval: ScheduleIntervalArg = ScheduleIntervalArgNotSet, timetable: Optional[Timetable] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, full_filepath: Optional[str] = None, template_searchpath: Optional[Union[str, Iterable[str]]] = None, template_undefined: Type[jinja2.StrictUndefined] = jinja2.StrictUndefined, user_defined_macros: Optional[Dict] = None, user_defined_filters: Optional[Dict] = None, default_args: Optional[Dict] = None, concurrency: Optional[int] = None, max_active_tasks: int = conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout: Optional[timedelta] = None, sla_miss_callback: Optional[Callable[['DAG', str, str, List[str], List[TaskInstance]], None]] = None, default_view: str = conf.get('webserver', 'dag_default_view').lower(), orientation: str = conf.get('webserver', 'dag_orientation'), catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback: Optional[DagStateChangeCallback] = None, on_failure_callback: Optional[DagStateChangeCallback] = None, doc_md: Optional[str] = None, params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, jinja_environment_kwargs: Optional[Dict] = None, render_template_as_native_obj: bool = False, tags: Optional[List[str]] = None)[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can't run until their previous schedule (and upstream tasks) are completed.
DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.
- Parameters
dag_id (str) -- The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII)
description (str) -- The description for the DAG to e.g. be shown on the webserver
schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) -- Defines how often that DAG runs, this timedelta object gets added to your latest task instance's execution_date to figure out the next schedule
start_date (datetime.datetime) -- The timestamp from which the scheduler will attempt to backfill
end_date (datetime.datetime) -- A date beyond which your DAG won't run, leave to None for open ended scheduling
template_searchpath (str or list[str]) -- This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default
template_undefined (jinja2.StrictUndefined) -- Template undefined type.
user_defined_macros (dict) -- a dictionary of macros that will be exposed in your jinja templates. For example, passing
dict(foo='bar')
to this argument allows you to{{ foo }}
in all jinja templates related to this DAG. Note that you can pass any type of object here.user_defined_filters (dict) -- a dictionary of filters that will be exposed in your jinja templates. For example, passing
dict(hello=lambda name: 'Hello %s' % name)
to this argument allows you to{{ 'world' | hello }}
in all jinja templates related to this DAG.default_args (dict) -- A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains 'depends_on_past': True here and 'depends_on_past': False in the operator's call default_args, the actual value will be False.
params (dict) -- a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.
max_active_tasks (int) -- the number of task instances allowed to run concurrently
max_active_runs (int) -- maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
dagrun_timeout (datetime.timedelta) -- specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns.
sla_miss_callback (callable) -- specify a function to call when reporting SLA timeouts. See sla_miss_callback for more information about the function signature and parameters that are passed to the callback.
default_view (str) -- Specify DAG default view (tree, graph, duration, gantt, landing_times), default tree
orientation (str) -- Specify DAG orientation in graph view (LR, TB, RL, BT), default LR
catchup (bool) -- Perform scheduler catchup (or only run latest)? Defaults to True
on_failure_callback (callable) -- A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
on_success_callback (callable) -- Much like the
on_failure_callback
except that it is executed when the dag succeeds.access_control (dict) -- Specify optional DAG-level actions, e.g., "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
is_paused_upon_creation (bool or None) -- Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used.
jinja_environment_kwargs (dict) --
additional configuration options to be passed to Jinja
Environment
for template renderingExample: to avoid Jinja from removing a trailing newline from template strings
DAG(dag_id='my-dag', jinja_environment_kwargs={ 'keep_trailing_newline': True, # some other jinja2 Environment options here } )
render_template_as_native_obj (bool) -- If True, uses a Jinja
NativeEnvironment
to render templates as native Python types. If False, a JinjaEnvironment
is used to render templates as string values.tags (List[str]) -- List of tags to help filtering DAGs in the UI.
-
fileloc
:str¶ File path that needs to be imported to load this DAG or subdag.
This may not be an actual file on disk in the case when this DAG is loaded from a ZIP file or other DAG distribution format.
-
dag_id
¶
-
full_filepath
¶
-
concurrency
¶
-
max_active_tasks
¶
-
access_control
¶
-
description
¶
-
default_view
¶
-
pickle_id
¶
-
tasks
¶
-
task_ids
¶
-
task_group
¶
-
filepath
¶
-
relative_fileloc
¶ File location of the importable dag 'file' relative to the configured DAGs folder.
-
folder
¶ Folder location of where the DAG object is instantiated.
-
owner
¶ Return list of all owners found in DAG tasks.
- Returns
Comma separated list of owners in DAG tasks
- Return type
-
allow_future_exec_dates
¶
-
concurrency_reached
¶ This attribute is deprecated. Please use airflow.models.DAG.get_concurrency_reached method.
-
is_paused
¶ This attribute is deprecated. Please use airflow.models.DAG.get_is_paused method.
-
normalized_schedule_interval
¶
-
latest_execution_date
¶ This attribute is deprecated. Please use airflow.models.DAG.get_latest_execution_date method.
-
subdags
¶ Returns a list of the subdag objects associated to this DAG
-
roots
¶ Return nodes with no parents. These are first to execute and are called roots or root nodes.
-
leaves
¶ Return nodes with no children. These are last to execute and are called leaves or leaf nodes.
-
task
¶
-
__repr__
(self)¶
-
__eq__
(self, other)¶
-
__ne__
(self, other)¶
-
__lt__
(self, other)¶
-
__hash__
(self)¶
-
__enter__
(self)¶
-
__exit__
(self, _type, _value, _tb)¶
-
date_range
(self, start_date: datetime, num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow())¶
-
is_fixed_time_schedule
(self)¶
-
following_schedule
(self, dttm)¶ Calculates the following schedule for this dag in UTC.
- Parameters
dttm -- utc datetime
- Returns
utc datetime
-
previous_schedule
(self, dttm)¶
-
get_next_data_interval
(self, dag_model: 'DagModel')¶ Get the data interval of the next scheduled run.
For compatibility, this method infers the data interval from the DAG's schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended as a part of the Python API.
-
get_run_data_interval
(self, run: DagRun)¶ Get the data interval of this run.
For compatibility, this method infers the data interval from the DAG's schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended as a part of the Python API.
-
infer_automated_data_interval
(self, logical_date: datetime)¶ Infer a data interval for a run against this DAG.
This method is used to bridge runs created prior to AIP-39 implementation, which do not have an explicit data interval. Therefore, this method only considers
schedule_interval
values valid prior to Airflow 2.2.DO NOT use this method is there is a known data interval.
-
next_dagrun_info
(self, last_automated_dagrun: Union[None, datetime, DataInterval], *, restricted: bool = True)¶ Get information about the next DagRun of this dag after
date_last_automated_dagrun
.This calculates what time interval the next DagRun should operate on (its execution date), and when it can be scheduled, , according to the dag's timetable, start_date, end_date, etc. This doesn't check max active run or any other "max_active_tasks" type limits, but only performs calculations based on the various date and interval fields of this dag and its tasks.
- Parameters
date_last_automated_dagrun -- The
max(execution_date)
of existing "automated" DagRuns for this dag (scheduled or backfill, but not manual).restricted -- If set to False (default is True), ignore
start_date
,end_date
, andcatchup
specified on the DAG or tasks.
- Returns
DagRunInfo of the next dagrun, or None if a dagrun is not going to be scheduled.
-
next_dagrun_after_date
(self, date_last_automated_dagrun: Optional[pendulum.DateTime])¶
-
iter_dagrun_infos_between
(self, earliest: Optional[pendulum.DateTime], latest: pendulum.DateTime, *, align: bool = True)¶ Yield DagRunInfo using this DAG's timetable between given interval.
DagRunInfo instances yielded if their
logical_date
is not earlier thanearliest
, nor later thanlatest
. The instances are ordered by theirlogical_date
from earliest to latest.If
align
isFalse
, the first run will happen immediately onearliest
, even if it does not fall on the logical timetable schedule. The default isTrue
, but subdags will ignore this value and always behave as if this is set toFalse
for backward compatibility.Example: A DAG is scheduled to run every midnight (
0 0 * * *
). Ifearliest
is2021-06-03 23:00:00
, the first DagRunInfo would be2021-06-03 23:00:00
ifalign=False
, and2021-06-04 00:00:00
ifalign=True
.
-
get_run_dates
(self, start_date, end_date=None)¶ Returns a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates.
- Parameters
start_date (datetime) -- The start date of the interval.
end_date (datetime) -- The end date of the interval. Defaults to
timezone.utcnow()
.
- Returns
A list of dates within the interval following the dag's schedule.
- Return type
-
normalize_schedule
(self, dttm)¶
-
get_last_dagrun
(self, session=None, include_externally_triggered=False)¶
-
has_dag_runs
(self, session=None, include_externally_triggered=True)¶
-
param
(self, name: str, default=None)¶ Return a DagParam object for current dag.
- Parameters
name -- dag parameter name.
default -- fallback value for dag parameter.
- Returns
DagParam instance for specified name and current dag.
-
get_concurrency_reached
(self, session=None)¶ Returns a boolean indicating whether the max_active_tasks limit for this DAG has been reached
-
get_is_active
(self, session=None)¶ Returns a boolean indicating whether this DAG is active
-
get_is_paused
(self, session=None)¶ Returns a boolean indicating whether this DAG is paused
-
handle_callback
(self, dagrun, success=True, reason=None, session=None)¶ Triggers the appropriate callback depending on the value of success, namely the on_failure_callback or on_success_callback. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a 'reason', primarily to differentiate DagRun failures.
- Parameters
dagrun -- DagRun object
success -- Flag to specify if failure or success callback should be called
reason -- Completion reason
session -- Database session
-
get_active_runs
(self)¶ Returns a list of dag run execution dates currently running
- Returns
List of execution dates
-
get_num_active_runs
(self, external_trigger=None, only_running=True, session=None)¶ Returns the number of active "running" dag runs
- Parameters
external_trigger (bool) -- True for externally triggered active dag runs
session --
- Returns
number greater than 0 for active dag runs
-
get_dagrun
(self, execution_date: Optional[str] = None, run_id: Optional[str] = None, session: Optional[Session] = None)¶ Returns the dag run for a given execution date or run_id if it exists, otherwise none.
- Parameters
execution_date -- The execution date of the DagRun to find.
run_id -- The run_id of the DagRun to find.
session --
- Returns
The DagRun if found, otherwise None.
-
get_dagruns_between
(self, start_date, end_date, session=None)¶ Returns the list of dag runs between start_date (inclusive) and end_date (inclusive).
- Parameters
start_date -- The starting execution date of the DagRun to find.
end_date -- The ending execution date of the DagRun to find.
session --
- Returns
The list of DagRuns found.
-
get_latest_execution_date
(self, session: Session)¶ Returns the latest date for which at least one dag run exists
-
resolve_template_files
(self)¶
-
get_template_env
(self)¶ Build a Jinja2 environment.
-
set_dependency
(self, upstream_task_id, downstream_task_id)¶ Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task()
-
get_task_instances_before
(self, base_date: datetime, num: int, *, session: Session)¶ Get
num
task instances before (including)base_date
.The returned list may contain exactly
num
task instances. It can have less if there are less thannum
scheduled DAG runs beforebase_date
, or more if there are manual task runs between the requested period, which does not count towardnum
.
-
get_task_instances
(self, start_date=None, end_date=None, state=None, session=None)¶
-
set_task_instance_state
(self, task_id: str, execution_date: datetime, state: State, upstream: Optional[bool] = False, downstream: Optional[bool] = False, future: Optional[bool] = False, past: Optional[bool] = False, commit: Optional[bool] = True, session=None)¶ Set the state of a TaskInstance to the given state, and clear its downstream tasks that are in failed or upstream_failed state.
- Parameters
task_id (str) -- Task ID of the TaskInstance
execution_date (datetime) -- execution_date of the TaskInstance
state (State) -- State to set the TaskInstance to
upstream (bool) -- Include all upstream tasks of the given task_id
downstream (bool) -- Include all downstream tasks of the given task_id
future (bool) -- Include all future TaskInstances of the given task_id
commit (bool) -- Commit changes
past (bool) -- Include all past TaskInstances of the given task_id
-
topological_sort
(self, include_subdag_tasks: bool = False)¶ Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies.
Heavily inspired by: http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/
- Parameters
include_subdag_tasks -- whether to include tasks in subdags, default to False
- Returns
list of tasks in topological order
-
set_dag_runs_state
(self, state: str = State.RUNNING, session: Session = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dag_ids: List[str] = None)¶
-
clear
(self, task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run=False, session=None, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids: FrozenSet[str] = frozenset({}))¶ Clears a set of task instances associated with the current dag for a specified date range.
- Parameters
task_ids (List[str]) -- List of task ids to clear
start_date (datetime.datetime or None) -- The minimum execution_date to clear
end_date (datetime.datetime or None) -- The maximum execution_date to clear
only_failed (bool) -- Only clear failed tasks
only_running (bool) -- Only clear running tasks.
confirm_prompt (bool) -- Ask for confirmation
include_subdags (bool) -- Clear tasks in subdags and clear external tasks indicated by ExternalTaskMarker
include_parentdag (bool) -- Clear tasks in the parent dag of the subdag.
dag_run_state -- state to set DagRun to. If set to False, dagrun state will not be changed.
dry_run (bool) -- Find the tasks to clear but don't clear them.
session (sqlalchemy.orm.session.Session) -- The sqlalchemy session to use
dag_bag (airflow.models.dagbag.DagBag) -- The DagBag used to find the dags subdags (Optional)
exclude_task_ids (frozenset) -- A set of
task_id
that should not be cleared
-
classmethod
clear_dags
(cls, dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)¶
-
__deepcopy__
(self, memo)¶
-
sub_dag
(self, *args, **kwargs)¶ This method is deprecated in favor of partial_subset
-
partial_subset
(self, task_ids_or_regex: Union[str, RePatternType, Iterable[str]], include_downstream=False, include_upstream=True, include_direct_upstream=False)¶ Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed.
- Parameters
task_ids_or_regex ([str] or str or re.Pattern) -- Either a list of task_ids, or a regex to match against task ids (as a string, or compiled regex pattern).
include_downstream -- Include all downstream tasks of matched tasks, in addition to matched tasks.
include_upstream -- Include all upstream tasks of matched tasks, in addition to matched tasks.
-
pickle_info
(self)¶
-
pickle
(self, session=None)¶
-
tree_view
(self)¶ Print an ASCII tree representation of the DAG.
-
add_task
(self, task)¶ Add a task to the DAG
- Parameters
task (task) -- the task you want to add
-
add_tasks
(self, tasks)¶ Add a list of tasks to the DAG
- Parameters
tasks (list of tasks) -- a lit of tasks you want to add
-
run
(self, start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False)¶ Runs the DAG.
- Parameters
start_date (datetime.datetime) -- the start date of the range to run
end_date (datetime.datetime) -- the end date of the range to run
mark_success (bool) -- True to mark jobs as succeeded without running them
local (bool) -- True to run the tasks using the LocalExecutor
executor (airflow.executor.base_executor.BaseExecutor) -- The executor instance to run the tasks
donot_pickle (bool) -- True to avoid pickling DAG object and send to workers
ignore_task_deps (bool) -- True to skip upstream tasks
ignore_first_depends_on_past (bool) -- True to ignore depends_on_past dependencies for the first set of tasks only
pool (str) -- Resource pool to use
delay_on_limit_secs (float) -- Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached
verbose (bool) -- Make logging output more verbose
conf (dict) -- user defined dictionary passed from CLI
rerun_failed_tasks --
run_backwards --
run_at_least_once -- If true, always run the DAG at least once even if no logical run exists within the time range.
- Type
- Type
- Type
-
cli
(self)¶ Exposes a CLI specific to this DAG
-
create_dagrun
(self, state: DagRunState, execution_date: Optional[datetime] = None, run_id: Optional[str] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = False, conf: Optional[dict] = None, run_type: Optional[DagRunType] = None, session=None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None, data_interval: Optional[Tuple[datetime, datetime]] = None)¶ Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run.
- Parameters
run_id (str) -- defines the run id for this dag run
run_type (airflow.utils.types.DagRunType) -- type of DagRun
execution_date (datetime.datetime) -- the execution date of this dag run
state (airflow.utils.state.DagRunState) -- the state of the dag run
start_date (datetime) -- the date this dag run should be evaluated
external_trigger (bool) -- whether this dag run is externally triggered
conf (dict) -- Dict containing configuration/parameters to pass to the DAG
creating_job_id (int) -- id of the job creating this DagRun
session (sqlalchemy.orm.session.Session) -- database session
dag_hash (str) -- Hash of Serialized DAG
data_interval (tuple[datetime, datetime] | None) -- Data interval of the DagRun
-
classmethod
bulk_sync_to_db
(cls, dags: Collection['DAG'], session=None)¶ This method is deprecated in favor of bulk_write_to_db
-
classmethod
bulk_write_to_db
(cls, dags: Collection['DAG'], session=None)¶ Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including calculated fields.
Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
- Parameters
dags (List[airflow.models.dag.DAG]) -- the DAG objects to save to the DB
- Returns
None
-
sync_to_db
(self, session=None)¶ Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
- Returns
None
-
get_default_view
(self)¶ This is only there for backward compatible jinja2 templates
-
static
deactivate_unknown_dags
(active_dag_ids, session=None)¶ Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM
- Parameters
active_dag_ids (list[unicode]) -- list of DAG IDs that are active
- Returns
None
-
static
deactivate_stale_dags
(expiration_date, session=None)¶ Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted.
- Parameters
expiration_date (datetime) -- set inactive DAGs that were touched before this time
- Returns
None
-
static
get_num_task_instances
(dag_id, task_ids=None, states=None, session=None)¶ Returns the number of task instances in the given DAG.
-
classmethod
get_serialized_fields
(cls)¶ Stringified DAGs and operators contain exactly these fields.
-
get_edge_info
(self, upstream_task_id: str, downstream_task_id: str)¶ Returns edge information for the given pair of tasks if present, and None if there is no information.
-
set_edge_info
(self, upstream_task_id: str, downstream_task_id: str, info: EdgeInfoType)¶ Sets the given edge information on the DAG. Note that this will overwrite, rather than merge with, existing info.
-
validate_schedule_and_params
(self)¶ Validates & raise exception if there are any Params in the DAG which neither have a default value nor have the null in schema['type'] list, but the DAG have a schedule_interval which is not None.
-
class
airflow.models.
DagModel
(concurrency=None, **kwargs)[source]¶ Bases:
airflow.models.base.Base
Table containing DAG properties
-
__tablename__
= dag¶ These items are stored in the database for state related information
-
dag_id
¶
-
root_dag_id
¶
-
is_paused_at_creation
¶
-
is_paused
¶
-
is_subdag
¶
-
is_active
¶
-
last_parsed_time
¶
-
last_pickled
¶
-
last_expired
¶
-
scheduler_lock
¶
-
pickle_id
¶
-
fileloc
¶
-
owners
¶
-
description
¶
-
default_view
¶
-
schedule_interval
¶
-
max_active_tasks
¶
-
max_active_runs
¶
-
has_task_concurrency_limits
¶
-
next_dagrun
¶
-
next_dagrun_data_interval_start
¶
-
next_dagrun_data_interval_end
¶
-
next_dagrun_create_after
¶
-
__table_args__
¶
-
parent_dag
¶
-
NUM_DAGS_PER_DAGRUN_QUERY
¶
-
next_dagrun_data_interval
¶
-
timezone
¶
-
safe_dag_id
¶
-
relative_fileloc
¶ File location of the importable dag 'file' relative to the configured DAGs folder.
-
__repr__
(self)¶
-
static
get_dagmodel
(dag_id, session=None)¶
-
classmethod
get_current
(cls, dag_id, session=None)¶
-
get_last_dagrun
(self, session=None, include_externally_triggered=False)¶
-
static
get_paused_dag_ids
(dag_ids: List[str], session: Session = None)¶ Given a list of dag_ids, get a set of Paused Dag Ids
- Parameters
dag_ids -- List of Dag ids
session -- ORM Session
- Returns
Paused Dag_ids
-
get_default_view
(self)¶ Get the Default DAG View, returns the default config value if DagModel does not have a value
-
set_is_paused
(self, is_paused: bool, including_subdags: bool = True, session=None)¶ Pause/Un-pause a DAG.
- Parameters
is_paused -- Is the DAG paused
including_subdags -- whether to include the DAG's subdags
session -- session
-
classmethod
deactivate_deleted_dags
(cls, alive_dag_filelocs: List[str], session=None)¶ Set
is_active=False
on the DAGs for which the DAG files have been removed.- Parameters
alive_dag_filelocs -- file paths of alive DAGs
session -- ORM Session
-
classmethod
dags_needing_dagruns
(cls, session: Session)¶ Return (and lock) a list of Dag objects that are due to create a new DagRun.
This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked.
-
-
class
airflow.models.
DagTag
[source]¶ Bases:
airflow.models.base.Base
A tag name per dag, to allow quick filtering in the DAG view.
-
__tablename__
= dag_tag¶
-
name
¶
-
dag_id
¶
-
__repr__
(self)¶
-
-
class
airflow.models.
DagBag
(dag_folder: Union[str, 'pathlib.Path', None] = None, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), read_dags_from_db: bool = False, store_serialized_dags: Optional[bool] = None, load_op_links: bool = True)[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks. This makes it easier to run distinct environments for say production and development, tests, or for different teams or security profiles. What would have been system level settings are now dagbag level so that one system can run multiple, independent settings sets.
- Parameters
dag_folder (unicode) -- the folder to scan to find DAGs
include_examples (bool) -- whether to include the examples that ship with airflow or not
include_smart_sensor (bool) -- whether to include the smart sensor native DAGs that create the smart sensor operators for whole cluster
read_dags_from_db (bool) -- Read DAGs from DB if
True
is passed. IfFalse
DAGs are read from python files.load_op_links (bool) -- Should the extra operator link be loaded via plugins when de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links are not loaded to not run User code in Scheduler.
-
DAGBAG_IMPORT_TIMEOUT
¶
-
SCHEDULER_ZOMBIE_TASK_THRESHOLD
¶
-
store_serialized_dags
¶ Whether or not to read dags from DB
-
dag_ids
¶ - Returns
a list of DAG IDs in this bag
- Return type
List[unicode]
-
size
(self)¶ - Returns
the amount of dags contained in this dagbag
-
get_dag
(self, dag_id, session: Session = None)¶ Gets the DAG out of the dictionary, and refreshes it if expired
- Parameters
dag_id (str) -- DAG Id
-
process_file
(self, filepath, only_if_updated=True, safe_mode=True)¶ Given a path to a python module or zip file, this method imports the module and look for dag objects within it.
-
bag_dag
(self, dag, root_dag)¶ Adds the DAG into the bag, recurses into sub dags.
- Raises
AirflowDagCycleException if a cycle is detected in this dag or its subdags.
- Raises
AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
-
collect_dags
(self, dag_folder: Union[str, 'pathlib.Path', None] = None, only_if_updated: bool = True, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))¶ Given a file path or a folder, this method looks for python modules, imports them and adds them to the dagbag collection.
Note that if a
.airflowignore
file is found while processing the directory, it will behave much like a.gitignore
, ignoring files that match any of the regex patterns specified in the file.Note: The patterns in .airflowignore are treated as un-anchored regexes, not shell-like glob patterns.
-
collect_dags_from_db
(self)¶ Collects DAGs from database.
-
dagbag_report
(self)¶ Prints a report around DagBag loading stats
-
sync_to_db
(self, session: Optional[Session] = None)¶ Save attributes about list of DAG to the DB.
-
class
airflow.models.
DagPickle
(dag)[source]¶ Bases:
airflow.models.base.Base
Dags can originate from different places (user repos, main repo, ...) and also get executed in different places (different executors). This object represents a version of a DAG and becomes a source of truth for a BackfillJob execution. A pickle is a native python serialized object, and in this case gets stored in the database for the duration of the job.
The executors pick up the DagPickle id and read the dag definition from the database.
-
id
¶
-
pickle
¶
-
created_dttm
¶
-
pickle_hash
¶
-
__tablename__
= dag_pickle¶
-
-
class
airflow.models.
DagRun
(dag_id: Optional[str] = None, run_id: Optional[str] = None, queued_at: Optional[datetime] = __NO_VALUE, execution_date: Optional[datetime] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = None, conf: Optional[Any] = None, state: Optional[DagRunState] = None, run_type: Optional[str] = None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None, data_interval: Optional[Tuple[datetime, datetime]] = None)[source]¶ Bases:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
DagRun describes an instance of a Dag. It can be created by the scheduler (for regular runs) or by an external trigger
-
__tablename__
= dag_run¶
-
id
¶
-
dag_id
¶
-
queued_at
¶
-
execution_date
¶
-
start_date
¶
-
end_date
¶
-
run_id
¶
-
creating_job_id
¶
-
external_trigger
¶
-
run_type
¶
-
conf
¶
-
data_interval_start
¶
-
data_interval_end
¶
-
last_scheduling_decision
¶
-
dag_hash
¶
-
dag
¶
-
__table_args__
¶
-
task_instances
¶
-
DEFAULT_DAGRUNS_TO_EXAMINE
¶
-
logical_date
¶
-
state
¶
-
is_backfill
¶
-
__repr__
(self)¶
-
get_state
(self)¶
-
set_state
(self, state: DagRunState)¶
-
refresh_from_db
(self, session: Session = None)¶ Reloads the current dagrun from the database
- Parameters
session (Session) -- database session
-
classmethod
active_runs_of_dags
(cls, dag_ids=None, only_running=False, session=None)¶ Get the number of active dag runs for each dag.
-
classmethod
next_dagruns_to_examine
(cls, state: DagRunState, session: Session, max_number: Optional[int] = None)¶ Return the next DagRuns that the scheduler should attempt to schedule.
This will return zero or more DagRun rows that are row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked.
- Return type
-
static
find
(dag_id: Optional[Union[str, List[str]]] = None, run_id: Optional[str] = None, execution_date: Optional[datetime] = None, state: Optional[DagRunState] = None, external_trigger: Optional[bool] = None, no_backfills: bool = False, run_type: Optional[DagRunType] = None, session: Session = None, execution_start_date: Optional[datetime] = None, execution_end_date: Optional[datetime] = None)¶ Returns a set of dag runs for the given search criteria.
- Parameters
dag_id (str or list[str]) -- the dag_id or list of dag_id to find dag runs for
run_id (str) -- defines the run id for this dag run
run_type (airflow.utils.types.DagRunType) -- type of DagRun
execution_date (datetime.datetime or list[datetime.datetime]) -- the execution date
state (DagRunState) -- the state of the dag run
external_trigger (bool) -- whether this dag run is externally triggered
no_backfills (bool) -- return no backfills (True), return all (False). Defaults to False
session (sqlalchemy.orm.session.Session) -- database session
execution_start_date (datetime.datetime) -- dag run that was executed from this date
execution_end_date (datetime.datetime) -- dag run that was executed until this date
-
static
generate_run_id
(run_type: DagRunType, execution_date: datetime)¶ Generate Run ID based on Run Type and Execution Date
-
get_task_instances
(self, state: Optional[Iterable[TaskInstanceState]] = None, session=None)¶ Returns the task instances for this dag run
-
get_task_instance
(self, task_id: str, session: Session = None)¶ Returns the task instance specified by task_id for this dag run
- Parameters
task_id (str) -- the task id
session (Session) -- Sqlalchemy ORM Session
-
get_dag
(self)¶ Returns the Dag associated with this DagRun.
- Returns
DAG
-
get_previous_dagrun
(self, state: Optional[DagRunState] = None, session: Session = None)¶ The previous DagRun, if there is one
-
get_previous_scheduled_dagrun
(self, session: Session = None)¶ The previous, SCHEDULED DagRun, if there is one
-
update_state
(self, session: Session = None, execute_callbacks: bool = True)¶ Determines the overall state of the DagRun based on the state of its TaskInstances.
- Parameters
session (Session) -- Sqlalchemy ORM Session
execute_callbacks (bool) -- Should dag callbacks (success/failure, SLA etc) be invoked directly (default: true) or recorded as a pending request in the
callback
property
- Returns
Tuple containing tis that can be scheduled in the current loop & callback that needs to be executed
-
task_instance_scheduling_decisions
(self, session: Session = None)¶
-
verify_integrity
(self, session: Session = None)¶ Verifies the DagRun by checking for removed tasks or tasks that are not in the database yet. It will set state to removed or add the task if required.
- Parameters
session (Session) -- Sqlalchemy ORM Session
-
static
get_run
(session: Session, dag_id: str, execution_date: datetime)¶ Get a single DAG Run
- Parameters
session (Session) -- Sqlalchemy ORM Session
dag_id (unicode) -- DAG ID
execution_date (datetime) -- execution date
- Returns
DagRun corresponding to the given dag_id and execution date if one exists. None otherwise.
- Return type
-
classmethod
get_latest_runs
(cls, session=None)¶ Returns the latest DagRun for each DAG
-
schedule_tis
(self, schedulable_tis: Iterable[TI], session: Session = None)¶ Set the given task instances in to the scheduled state.
Each element of
schedulable_tis
should have it'stask
attribute already set.Any DummyOperator without callbacks is instead set straight to the success state.
All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it is the caller's responsibility to call this function only with TIs from a single dag run.
-
-
class
airflow.models.
ImportError
[source]¶ Bases:
airflow.models.base.Base
A table to store all Import Errors. The ImportErrors are recorded when parsing DAGs. This errors are displayed on the Webserver.
-
__tablename__
= import_error¶
-
id
¶
-
timestamp
¶
-
filename
¶
-
stacktrace
¶
-
-
class
airflow.models.
Log
(event, task_instance=None, owner=None, extra=None, **kwargs)[source]¶ Bases:
airflow.models.base.Base
Used to actively log events to the database
-
__tablename__
= log¶
-
id
¶
-
dttm
¶
-
dag_id
¶
-
task_id
¶
-
event
¶
-
execution_date
¶
-
owner
¶
-
extra
¶
-
__table_args__
¶
-
-
class
airflow.models.
Param
(default: Any = __NO_VALUE_SENTINEL, description: str = None, **kwargs)[source]¶ Class to hold the default value of a Param and rule set to do the validations. Without the rule set it always validates and returns the default value.
- Parameters
-
has_value
¶
-
resolve
(self, value: Optional[Any] = __NO_VALUE_SENTINEL, suppress_exception: bool = False)¶ Runs the validations and returns the Param's final value. May raise ValueError on failed validations, or TypeError if no value is passed and no value already exists.
- Parameters
value (Optional[Any]) -- The value to be updated for the Param
suppress_exception (bool) -- To raise an exception or not when the validations fails. If true and validations fails, the return value would be None.
-
dump
(self)¶ Dump the Param as a dictionary
-
class
airflow.models.
Pool
[source]¶ Bases:
airflow.models.base.Base
the class to get Pool info.
-
__tablename__
= slot_pool¶
-
id
¶
-
pool
¶
-
slots
¶
-
description
¶
-
DEFAULT_POOL_NAME
= default_pool¶
-
__repr__
(self)¶
-
static
get_pool
(pool_name, session: Session = None)¶ Get the Pool with specific pool name from the Pools.
- Parameters
pool_name -- The pool name of the Pool to get.
session -- SQLAlchemy ORM Session
- Returns
the pool object
-
static
get_default_pool
(session: Session = None)¶ Get the Pool of the default_pool from the Pools.
- Parameters
session -- SQLAlchemy ORM Session
- Returns
the pool object
-
static
slots_stats
(*, lock_rows: bool = False, session: Session = None)¶ Get Pool stats (Number of Running, Queued, Open & Total tasks)
If
lock_rows
is True, and the database engine in use supports theNOWAIT
syntax, then a non-blocking lock will be attempted -- if the lock is not available then SQLAlchemy will throw an OperationalError.- Parameters
lock_rows -- Should we attempt to obtain a row-level lock on all the Pool rows returns
session -- SQLAlchemy ORM Session
-
to_json
(self)¶ Get the Pool in a json structure
- Returns
the pool object in json format
-
occupied_slots
(self, session: Session)¶ Get the number of slots used by running/queued tasks at the moment.
- Parameters
session -- SQLAlchemy ORM Session
- Returns
the used number of slots
-
running_slots
(self, session: Session)¶ Get the number of slots used by running tasks at the moment.
- Parameters
session -- SQLAlchemy ORM Session
- Returns
the used number of slots
-
queued_slots
(self, session: Session)¶ Get the number of slots used by queued tasks at the moment.
- Parameters
session -- SQLAlchemy ORM Session
- Returns
the used number of slots
-
open_slots
(self, session: Session)¶ Get the number of slots open at the moment.
- Parameters
session -- SQLAlchemy ORM Session
- Returns
the number of slots
-
-
class
airflow.models.
RenderedTaskInstanceFields
(ti: TaskInstance, render_templates=True)[source]¶ Bases:
airflow.models.base.Base
Save Rendered Template Fields
-
__tablename__
= rendered_task_instance_fields¶
-
dag_id
¶
-
task_id
¶
-
execution_date
¶
-
rendered_fields
¶
-
k8s_pod_yaml
¶
-
__repr__
(self)¶
-
classmethod
get_templated_fields
(cls, ti: TaskInstance, session: Session = None)¶ Get templated field for a TaskInstance from the RenderedTaskInstanceFields table.
- Parameters
ti -- Task Instance
session -- SqlAlchemy Session
- Returns
Rendered Templated TI field
-
classmethod
get_k8s_pod_yaml
(cls, ti: TaskInstance, session: Session = None)¶ Get rendered Kubernetes Pod Yaml for a TaskInstance from the RenderedTaskInstanceFields table.
- Parameters
ti -- Task Instance
session -- SqlAlchemy Session
- Returns
Kubernetes Pod Yaml
-
write
(self, session: Session = None)¶ Write instance to database
- Parameters
session -- SqlAlchemy Session
-
classmethod
delete_old_records
(cls, task_id: str, dag_id: str, num_to_keep=conf.getint('core', 'max_num_rendered_ti_fields_per_task', fallback=0), session: Session = None)¶ Keep only Last X (num_to_keep) number of records for a task by deleting others
- Parameters
task_id -- Task ID
dag_id -- Dag ID
num_to_keep -- Number of Records to keep
session -- SqlAlchemy Session
-
-
class
airflow.models.
SensorInstance
(ti)[source]¶ Bases:
airflow.models.base.Base
SensorInstance support the smart sensor service. It stores the sensor task states and context that required for poking include poke context and execution context. In sensor_instance table we also save the sensor operator classpath so that inside smart sensor there is no need to import the dagbag and create task object for each sensor task.
SensorInstance include another set of columns to support the smart sensor shard on large number of sensor instance. The key idea is to generate the hash code from the poke context and use it to map to a shorter shard code which can be used as an index. Every smart sensor process takes care of tasks whose shardcode are in a certain range.
-
__tablename__
= sensor_instance¶
-
id
¶
-
task_id
¶
-
dag_id
¶
-
execution_date
¶
-
state
¶
-
start_date
¶
-
operator
¶
-
op_classpath
¶
-
hashcode
¶
-
shardcode
¶
-
poke_context
¶
-
execution_context
¶
-
created_at
¶
-
updated_at
¶
-
__table_args__
¶
-
try_number
¶ Return the try number that this task number will be when it is actually run. If the TI is currently running, this will match the column in the database, in all other cases this will be incremented.
-
static
get_classpath
(obj)¶ Get the object dotted class path. Used for getting operator classpath.
- Parameters
obj --
- Returns
The class path of input object
- Return type
-
classmethod
register
(cls, ti, poke_context, execution_context, session=None)¶ Register task instance ti for a sensor in sensor_instance table. Persist the context used for a sensor and set the sensor_instance table state to sensing.
- Parameters
- Type
ti:
- Returns
True if the ti was registered successfully.
- Return type
Boolean
-
__repr__
(self)¶
-
-
class
airflow.models.
SkipMixin
[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
A Mixin to skip Tasks Instances
-
skip
(self, dag_run: 'DagRun', execution_date: 'timezone.DateTime', tasks: 'Iterable[BaseOperator]', session: 'Session' = None)¶ Sets tasks instances to skipped from the same dag run.
If this instance has a task_id attribute, store the list of skipped task IDs to XCom so that NotPreviouslySkippedDep knows these tasks should be skipped when they are cleared.
- Parameters
dag_run -- the DagRun for which to set the tasks to skipped
execution_date -- execution_date
tasks -- tasks to skip (not task_ids)
session -- db session to use
-
skip_all_except
(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable[str]])¶ This method implements the logic for a branching operator; given a single task ID or list of task IDs to follow, this skips all other tasks immediately downstream of this operator.
branch_task_ids is stored to XCom so that NotPreviouslySkippedDep knows skipped tasks or newly added tasks should be skipped when they are cleared.
-
-
class
airflow.models.
SlaMiss
[source]¶ Bases:
airflow.models.base.Base
Model that stores a history of the SLA that have been missed. It is used to keep track of SLA failures over time and to avoid double triggering alert emails.
-
__tablename__
= sla_miss¶
-
task_id
¶
-
dag_id
¶
-
execution_date
¶
-
email_sent
¶
-
timestamp
¶
-
description
¶
-
notification_sent
¶
-
__table_args__
¶
-
__repr__
(self)¶
-
-
class
airflow.models.
TaskFail
(task, execution_date, start_date, end_date)[source]¶ Bases:
airflow.models.base.Base
TaskFail tracks the failed run durations of each task instance.
-
__tablename__
= task_fail¶
-
id
¶
-
task_id
¶
-
dag_id
¶
-
execution_date
¶
-
start_date
¶
-
end_date
¶
-
duration
¶
-
__table_args__
¶
-
-
class
airflow.models.
TaskInstance
(task, execution_date: Optional[datetime] = None, run_id: str = None, state: Optional[str] = None)[source]¶ Bases:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the state they are in.
The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions.
Database transactions on this table should insure double triggers and any confusion around what task instances are or aren't ready to run even while multiple schedulers may be firing task instances.
-
__tablename__
= task_instance¶
-
task_id
¶
-
dag_id
¶
-
run_id
¶
-
start_date
¶
-
end_date
¶
-
duration
¶
-
state
¶
-
max_tries
¶
-
hostname
¶
-
unixname
¶
-
job_id
¶
-
pool
¶
-
pool_slots
¶
-
queue
¶
-
priority_weight
¶
-
operator
¶
-
queued_dttm
¶
-
queued_by_job_id
¶
-
pid
¶
-
executor_config
¶
-
external_executor_id
¶
-
trigger_id
¶
-
trigger_timeout
¶
-
next_method
¶
-
next_kwargs
¶
-
__table_args__
¶
-
dag_model
¶
-
trigger
¶
-
dag_run
¶
-
execution_date
¶
-
try_number
¶ Return the try number that this task number will be when it is actually run.
If the TaskInstance is currently running, this will match the column in the database, in all other cases this will be incremented.
-
prev_attempted_tries
¶ Based on this instance's try_number, this will calculate the number of previously attempted tries, defaulting to 0.
-
next_try_number
¶ Setting Next Try Number
-
log_url
¶ Log URL for TaskInstance
-
mark_success_url
¶ URL to mark TI success
-
key
¶ Returns a tuple that identifies the task instance uniquely
-
is_premature
¶ Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed.
-
previous_ti
¶ This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_ti method.
-
previous_ti_success
¶ This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_ti method.
-
previous_start_date_success
¶ This attribute is deprecated. Please use airflow.models.taskinstance.TaskInstance.get_previous_start_date method.
-
init_on_load
(self)¶ Initialize the attributes that aren't stored in the DB
-
command_as_list
(self, mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)¶ Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator.
-
static
generate_command
(dag_id: str, task_id: str, run_id: str = None, mark_success: bool = False, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, local: bool = False, pickle_id: Optional[int] = None, file_path: Optional[str] = None, raw: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, cfg_path: Optional[str] = None)¶ Generates the shell command required to execute this task instance.
- Parameters
dag_id (str) -- DAG ID
task_id (str) -- Task ID
run_id (datetime) -- The run_id of this task's DagRun
mark_success (bool) -- Whether to mark the task as successful
ignore_all_deps (bool) -- Ignore all ignorable dependencies. Overrides the other ignore_* parameters.
ignore_depends_on_past (bool) -- Ignore depends_on_past parameter of DAGs (e.g. for Backfills)
ignore_task_deps (bool) -- Ignore task-specific dependencies such as depends_on_past and trigger rule
ignore_ti_state (bool) -- Ignore the task instance's previous failure/success
local (bool) -- Whether to run the task locally
pickle_id (Optional[int]) -- If the DAG was serialized to the DB, the ID associated with the pickled DAG
file_path (Optional[str]) -- path to the file containing the DAG definition
raw (Optional[bool]) -- raw mode (needs more details)
job_id (Optional[int]) -- job ID (needs more details)
pool (Optional[str]) -- the Airflow pool that the task should run in
cfg_path (Optional[str]) -- the Path to the configuration file
- Returns
shell command that can be used to run the task instance
- Return type
-
current_state
(self, session=None)¶ Get the very latest state from the database, if a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used.
- Parameters
session (Session) -- SQLAlchemy ORM Session
-
error
(self, session=None)¶ Forces the task instance's state to FAILED in the database.
- Parameters
session (Session) -- SQLAlchemy ORM Session
-
refresh_from_db
(self, session=None, lock_for_update=False)¶ Refreshes the task instance from the database based on the primary key
- Parameters
session (Session) -- SQLAlchemy ORM Session
lock_for_update (bool) -- if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed.
-
refresh_from_task
(self, task, pool_override=None)¶ Copy common attributes from the given task.
- Parameters
task (airflow.models.BaseOperator) -- The task object to copy from
pool_override (str) -- Use the pool_override instead of task's pool
-
clear_xcom_data
(self, session=None)¶ Clears all XCom data from the database for the task instance
- Parameters
session (Session) -- SQLAlchemy ORM Session
-
set_state
(self, state: str, session=None)¶ Set TaskInstance state.
- Parameters
state (str) -- State to set for the TI
session (Session) -- SQLAlchemy ORM Session
-
are_dependents_done
(self, session=None)¶ Checks whether the immediate dependents of this task instance have succeeded or have been skipped. This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table.
- Parameters
session (Session) -- SQLAlchemy ORM Session
-
get_previous_dagrun
(self, state: Optional[str] = None, session: Optional[Session] = None)¶ The DagRun that ran before this task instance's DagRun.
- Parameters
state -- If passed, it only take into account instances of a specific state.
session -- SQLAlchemy ORM Session.
-
get_previous_ti
(self, state: Optional[str] = None, session: Session = None)¶ The task instance for the task that ran before this task instance.
- Parameters
state -- If passed, it only take into account instances of a specific state.
session -- SQLAlchemy ORM Session
-
get_previous_execution_date
(self, state: Optional[str] = None, session: Session = None)¶ The execution date from property previous_ti_success.
- Parameters
state -- If passed, it only take into account instances of a specific state.
session -- SQLAlchemy ORM Session
-
get_previous_start_date
(self, state: Optional[str] = None, session: Session = None)¶ The start date from property previous_ti_success.
- Parameters
state -- If passed, it only take into account instances of a specific state.
session -- SQLAlchemy ORM Session
-
are_dependencies_met
(self, dep_context=None, session=None, verbose=False)¶ Returns whether or not all the conditions are met for this task instance to be run given the context for the dependencies (e.g. a task instance being force run from the UI will ignore some dependencies).
- Parameters
dep_context (DepContext) -- The execution context that determines the dependencies that should be evaluated.
session (sqlalchemy.orm.session.Session) -- database session
verbose (bool) -- whether log details on failed dependencies on info or debug log level
-
get_failed_dep_statuses
(self, dep_context=None, session=None)¶ Get failed Dependencies
-
__repr__
(self)¶
-
next_retry_datetime
(self)¶ Get datetime of the next retry if the task instance fails. For exponential backoff, retry_delay is used as base and will be converted to seconds.
-
ready_for_retry
(self)¶ Checks on whether the task instance is in the right state and timeframe to be retried.
-
get_dagrun
(self, session: Session = None)¶ Returns the DagRun for this TaskInstance
- Parameters
session -- SQLAlchemy ORM Session
- Returns
DagRun
-
check_and_change_state_before_execution
(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, external_executor_id: Optional[str] = None, session=None)¶ Checks dependencies and then sets state to RUNNING if they are met. Returns True if and only if state is set to RUNNING, which implies that task should be executed, in preparation for _run_raw_task
- Parameters
verbose (bool) -- whether to turn on more verbose logging
ignore_all_deps (bool) -- Ignore all of the non-critical dependencies, just runs
ignore_depends_on_past (bool) -- Ignore depends_on_past DAG attribute
ignore_task_deps (bool) -- Don't check the dependencies of this TaskInstance's task
ignore_ti_state (bool) -- Disregards previous task instance state
mark_success (bool) -- Don't run the task, mark its state as success
test_mode (bool) -- Doesn't record success or failure in the DB
job_id (str) -- Job (BackfillJob / LocalTaskJob / SchedulerJob) ID
pool (str) -- specifies the pool to use to run the task instance
external_executor_id (str) -- The identifier of the celery executor
session (Session) -- SQLAlchemy ORM Session
- Returns
whether the state was changed to running or not
- Return type
-
clear_next_method_args
(self)¶
-
run
(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None)¶ Run TaskInstance
-
dry_run
(self)¶ Only Renders Templates for the TI
-
handle_failure
(self, error: Union[str, Exception], test_mode: Optional[bool] = None, force_fail: bool = False, error_file: Optional[str] = None, session=None)¶ Handle Failure for the TaskInstance
-
handle_failure_with_callback
(self, error: Union[str, Exception], test_mode: Optional[bool] = None, force_fail: bool = False, session=None)¶
-
is_eligible_to_retry
(self)¶ Is task instance is eligible for retry
-
get_template_context
(self, session: Session = None, ignore_param_exceptions: bool = True)¶ Return TI Context
-
get_rendered_template_fields
(self, session=None)¶ Fetch rendered template fields from DB
-
get_rendered_k8s_spec
(self, session=None)¶ Fetch rendered template fields from DB
-
overwrite_params_with_dag_run_conf
(self, params, dag_run)¶ Overwrite Task Params with DagRun.conf
-
render_templates
(self, context: Optional[Context] = None)¶ Render templates in the operator fields.
-
render_k8s_pod_yaml
(self)¶ Render k8s pod yaml
-
get_email_subject_content
(self, exception)¶ Get the email subject content for exceptions.
-
email_alert
(self, exception)¶ Send alert email with exception information.
-
set_duration
(self)¶ Set TI duration
-
xcom_push
(self, key: str, value: Any, execution_date: Optional[datetime] = None, session: Session = None)¶ Make an XCom available for tasks to pull.
- Parameters
key (str) -- A key for the XCom
value (any picklable object) -- A value for the XCom. The value is pickled and stored in the database.
execution_date (datetime) -- if provided, the XCom will not be visible until this date. This can be used, for example, to send a message to a task on a future date without it being immediately visible.
session (Session) -- Sqlalchemy ORM Session
-
xcom_pull
(self, task_ids: Optional[Union[str, Iterable[str]]] = None, dag_id: Optional[str] = None, key: str = XCOM_RETURN_KEY, include_prior_dates: bool = False, session: Session = None)¶ Pull XComs that optionally meet certain criteria.
The default value for key limits the search to XComs that were returned by other tasks (as opposed to those that were pushed manually). To remove this filter, pass key=None (or any desired value).
If a single task_id string is provided, the result is the value of the most recent matching XCom from that task_id. If multiple task_ids are provided, a tuple of matching values is returned. None is returned whenever no matches are found.
- Parameters
key (str) -- A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as a constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass key=None.
task_ids (str or iterable of strings (representing task_ids)) -- Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.
dag_id (str) -- If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used.
include_prior_dates (bool) -- If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.
session (Session) -- Sqlalchemy ORM Session
-
get_num_running_task_instances
(self, session)¶ Return Number of running TIs from the DB
-
init_run_context
(self, raw=False)¶ Sets the log context.
-
static
filter_for_tis
(tis: Iterable[Union['TaskInstance', TaskInstanceKey]])¶ Returns SQLAlchemy filter to query selected task instances
-
-
airflow.models.
clear_task_instances
(tis, session, activate_dag_runs=None, dag=None, dag_run_state: Union[DagRunState, Literal[False]] = DagRunState.QUEUED)[source]¶ -
Clears a set of task instances, but makes sure the running ones
-
get killed.
- Parameters
tis -- a list of task instances
session -- current session
dag_run_state -- state to set DagRun to. If set to False, dagrun state will not be changed.
dag -- DAG object
activate_dag_runs -- Deprecated parameter, do not pass
-
class
airflow.models.
TaskReschedule
(task, run_id, try_number, start_date, end_date, reschedule_date)[source]¶ Bases:
airflow.models.base.Base
TaskReschedule tracks rescheduled task instances.
-
__tablename__
= task_reschedule¶
-
id
¶
-
task_id
¶
-
dag_id
¶
-
run_id
¶
-
try_number
¶
-
start_date
¶
-
end_date
¶
-
duration
¶
-
reschedule_date
¶
-
__table_args__
¶
-
dag_run
¶
-
execution_date
¶
-
static
query_for_task_instance
(task_instance, descending=False, session=None, try_number=None)¶ Returns query for task reschedules for a given the task instance.
- Parameters
session (sqlalchemy.orm.session.Session) -- the database session object
task_instance (airflow.models.TaskInstance) -- the task instance to find task reschedules for
descending (bool) -- If True then records are returned in descending order
try_number (int) -- Look for TaskReschedule of the given try_number. Default is None which looks for the same try_number of the given task_instance.
-
static
find_for_task_instance
(task_instance, session=None, try_number=None)¶ Returns all task reschedules for the task instance and try number, in ascending order.
- Parameters
session (sqlalchemy.orm.session.Session) -- the database session object
task_instance (airflow.models.TaskInstance) -- the task instance to find task reschedules for
try_number (int) -- Look for TaskReschedule of the given try_number. Default is None which looks for the same try_number of the given task_instance.
-
-
class
airflow.models.
Trigger
(classpath: str, kwargs: Dict[str, Any], created_date: Optional[datetime.datetime] = None)[source]¶ Bases:
airflow.models.base.Base
Triggers are a workload that run in an asynchronous event loop shared with other Triggers, and fire off events that will unpause deferred Tasks, start linked DAGs, etc.
They are persisted into the database and then re-hydrated into a "triggerer" process, where many are run at once. We model it so that there is a many-to-one relationship between Task and Trigger, for future deduplication logic to use.
Rows will be evicted from the database when the triggerer detects no active Tasks/DAGs using them. Events are not stored in the database; when an Event is fired, the triggerer will directly push its data to the appropriate Task/DAG.
-
__tablename__
= trigger¶
-
id
¶
-
classpath
¶
-
kwargs
¶
-
created_date
¶
-
triggerer_id
¶
-
classmethod
from_object
(cls, trigger: BaseTrigger)¶ Alternative constructor that creates a trigger row based directly off of a Trigger object.
-
classmethod
bulk_fetch
(cls, ids: List[int], session=None)¶ Fetches all of the Triggers by ID and returns a dict mapping ID -> Trigger instance
-
classmethod
clean_unused
(cls, session=None)¶ Deletes all triggers that have no tasks/DAGs dependent on them (triggers have a one-to-many relationship to both)
-
classmethod
submit_event
(cls, trigger_id, event, session=None)¶ Takes an event from an instance of itself, and triggers all dependent tasks to resume.
-
classmethod
submit_failure
(cls, trigger_id, session=None)¶ Called when a trigger has failed unexpectedly, and we need to mark everything that depended on it as failed. Notably, we have to actually run the failure code from a worker as it may have linked callbacks, so hilariously we have to re-schedule the task instances to a worker just so they can then fail.
We use a special __fail__ value for next_method to achieve this that the runtime code understands as immediate-fail, and pack the error into next_kwargs.
TODO: Once we have shifted callback (and email) handling to run on workers as first-class concepts, we can run the failure code here in-process, but we can't do that right now.
-
classmethod
ids_for_triggerer
(cls, triggerer_id, session=None)¶ Retrieves a list of triggerer_ids.
-
classmethod
assign_unassigned
(cls, triggerer_id, capacity, session=None)¶ Takes a triggerer_id and the capacity for that triggerer and assigns unassigned triggers until that capacity is reached, or there are no more unassigned triggers.
-
-
class
airflow.models.
Variable
(key=None, val=None, description=None)[source]¶ Bases:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow.
-
__tablename__
= variable¶
-
id
¶
-
key
¶
-
description
¶
-
is_encrypted
¶
-
val
¶ Get Airflow Variable from Metadata DB and decode it using the Fernet Key
-
on_db_load
(self)¶
-
__repr__
(self)¶
-
get_val
(self)¶ Get Airflow Variable from Metadata DB and decode it using the Fernet Key
-
set_val
(self, value)¶ Encode the specified value with Fernet Key and store it in Variables Table.
-
classmethod
setdefault
(cls, key, default, description=None, deserialize_json=False)¶ Like a Python builtin dict object, setdefault returns the current value for a key, and if it isn't there, stores the default value and returns it.
- Parameters
key (str) -- Dict key for this Variable
default (Mixed) -- Default value to set and return if the variable isn't already in the DB
deserialize_json -- Store this as a JSON encoded value in the DB and un-encode it when retrieving a value
- Returns
Mixed
-
classmethod
get
(cls, key: str, default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False)¶ Gets a value for an Airflow Variable Key
- Parameters
key -- Variable Key
default_var -- Default value of the Variable if the Variable doesn't exists
deserialize_json -- Deserialize the value to a Python dict
-
classmethod
set
(cls, key: str, value: Any, description: str = None, serialize_json: bool = False, session: Session = None)¶ Sets a value for an Airflow Variable with a given Key. This operation will overwrite an existing variable.
- Parameters
key -- Variable Key
value -- Value to set for the Variable
description -- Value to set for the Variable
serialize_json -- Serialize the value to a JSON string
session -- SQL Alchemy Sessions
-
classmethod
update
(cls, key: str, value: Any, serialize_json: bool = False, session: Session = None)¶ Updates a given Airflow Variable with the Provided value
- Parameters
key -- Variable Key
value -- Value to set for the Variable
serialize_json -- Serialize the value to a JSON string
session -- SQL Alchemy Session
-
classmethod
delete
(cls, key: str, session: Session = None)¶ Delete an Airflow Variable for a given key
- Parameters
key -- Variable Key
session -- SQL Alchemy Sessions
-
rotate_fernet_key
(self)¶ Rotate Fernet Key
-
check_for_write_conflict
(key: str)¶ Logs a warning if a variable exists outside of the metastore.
If we try to write a variable to the metastore while the same key exists in an environment variable or custom secrets backend, then subsequent reads will not read the set value.
- Parameters
key -- Variable Key
-