Deferrable Operators & Triggers¶
Standard Operators and Sensors take up a full worker slot for the entire time they are running, even if they are idle. For example, if you only have 100 worker slots available to run tasks, and you have 100 DAGs waiting on a sensor that’s currently running but idle, then you cannot run anything else - even though your entire Airflow cluster is essentially idle. reschedule
mode for sensors solves some of this, by allowing sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not other criteria.
This is where Deferrable Operators can be used. When it has nothing to do but wait, an operator can suspend itself and free up the worker for other processes by deferring. When an operator defers, execution moves to the triggerer, where the trigger specified by the operator will run. The trigger can do the polling or waiting required by the operator. Then, when the trigger finishes polling or waiting, it sends a signal for the operator to resume its execution. During the deferred phase of execution, since work has been offloaded to the triggerer, the task no longer occupies a worker slot, and you have more free workload capacity. By default, tasks in a deferred state don’t occupy pool slots. If you would like them to, you can change this by editing the pool in question.
Triggers are small, asynchronous pieces of Python code designed to run in a single Python process. Because they are asynchronous, they can all co-exist efficiently in the triggerer Airflow component.
An overview of how this process works:
A task instance (running operator) reaches a point where it has to wait for other operations or conditions, and defers itself with a trigger tied to an event to resume it. This frees up the worker to run something else.
The new trigger instance is registered by Airflow, and picked up by a triggerer process.
The trigger runs until it fires, at which point its source task is re-scheduled by the scheduler.
The scheduler queues the task to resume on a worker node.
You can either use pre-written deferrable operators as a DAG author or write your own. Writing them, however, requires that they meet certain design criteria.
Using Deferrable Operators¶
If you want to use pre-written deferrable operators that come with Airflow, such as TimeSensorAsync
, then you only need to complete two steps:
Ensure your Airflow installation runs at least one
triggerer
process, as well as the normalscheduler
Use deferrable operators/sensors in your DAGs
Airflow automatically handles and implements the deferral processes for you.
If you’re upgrading existing DAGs to use deferrable operators, Airflow contains API-compatible sensor variants, like TimeSensorAsync
for TimeSensor
. Add these variants into your DAG to use deferrable operators with no other changes required.
Note that you can’t use the deferral ability from inside custom PythonOperator or TaskFlow Python functions. Deferral is only available to traditional, class-based operators.
Writing Deferrable Operators¶
When writing a deferrable operators these are the main points to consider:
Your operator must defer itself with a trigger. You can use a trigger included in core Airflow, or you can write a custom one.
Your operator will be stopped and removed from its worker while deferred, and no state persists automatically. You can persist state by instructing Airflow to resume the operator at a certain method or by passing certain kwargs.
You can defer multiple times, and you can defer before or after your operator does significant work. Or, you can defer if certain conditions are met. For example, if a system does not have an immediate answer. Deferral is entirely under your control.
Any operator can defer; no special marking on its class is needed, and it’s not limited to sensors.
If you want to add an operator or sensor that supports both deferrable and non-deferrable modes, it’s suggested to add
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)
to the__init__
method of the operator and use it to decide whether to run the operator in deferrable mode. You can configure the default value ofdeferrable
for all the operators and sensors that support switching between deferrable and non-deferrable mode throughdefault_deferrable
in theoperator
section. Here’s an example of a sensor that supports both modes.
import time
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferrable
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete",
)
else:
time.sleep(3600)
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return
Writing Triggers¶
A Trigger is written as a class that inherits from BaseTrigger
, and implements three methods:
__init__
: A method to receive arguments from operators instantiating it. Since 2.10.0, we’re able to start task execution directly from a pre-defined trigger. To utilize this feature, all the arguments in__init__
must be serializable.run
: An asynchronous method that runs its logic and yields one or moreTriggerEvent
instances as an asynchronous generator.serialize
: Returns the information needed to re-construct this trigger, as a tuple of the classpath, and keyword arguments to pass to__init__
.
This example shows the structure of a basic trigger, a very simplified version of Airflow’s DateTimeTrigger
:
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
class DateTimeTrigger(BaseTrigger):
def __init__(self, moment):
super().__init__()
self.moment = moment
def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
await asyncio.sleep(1)
yield TriggerEvent(self.moment)
The code example shows several things:
__init__
andserialize
are written as a pair. The trigger is instantiated once when it is submitted by the operator as part of its deferral request, then serialized and re-instantiated on any triggerer process that runs the trigger.The
run
method is declared as anasync def
, as it must be asynchronous, and usesasyncio.sleep
rather than the regulartime.sleep
(because that would block the process).When it emits its event it packs
self.moment
in there, so if this trigger is being run redundantly on multiple hosts, the event can be de-duplicated.
Triggers can be as complex or as simple as you want, provided they meet the design constraints. They can run in a highly-available fashion, and are auto-distributed among hosts running the triggerer. We encourage you to avoid any kind of persistent state in a trigger. Triggers should get everything they need from their __init__
, so they can be serialized and moved around freely.
If you are new to writing asynchronous Python, be very careful when writing your run()
method. Python’s async model means that code can block the entire process if it does not correctly await
when it does a blocking operation. Airflow attempts to detect process blocking code and warn you in the triggerer logs when it happens. You can enable extra checks by Python by setting the variable PYTHONASYNCIODEBUG=1
when you are writing your trigger to make sure you’re writing non-blocking code. Be especially careful when doing filesystem calls, because if the underlying filesystem is network-backed, it can be blocking.
There’s some design constraints to be aware of when writing your own trigger:
The
run
method must be asynchronous (using Python’s asyncio), and correctlyawait
whenever it does a blocking operation.run
mustyield
its TriggerEvents, not return them. If it returns before yielding at least one event, Airflow will consider this an error and fail any Task Instances waiting on it. If it throws an exception, Airflow will also fail any dependent task instances.You should assume that a trigger instance can run more than once. This can happen if a network partition occurs and Airflow re-launches a trigger on a separated machine. So, you must be mindful about side effects. For example you might not want to use a trigger to insert database rows.
If your trigger is designed to emit more than one event (not currently supported), then each emitted event must contain a payload that can be used to deduplicate events if the trigger is running in multiple places. If you only fire one event and don’t need to pass information back to the operator, you can just set the payload to
None
.A trigger can suddenly be removed from one triggerer service and started on a new one. For example, if subnets are changed and a network partition results or if there is a deployment. If desired, you can implement the
cleanup
method, which is always called afterrun
, whether the trigger exits cleanly or otherwise.In order for any changes to a trigger to be reflected, the triggerer needs to be restarted whenever the trigger is modified.
Note
Currently triggers are only used until their first event, because they are only used for resuming deferred tasks, and tasks resume after the first event fires. However, Airflow plans to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful.
Sensitive information in triggers¶
Since Airflow 2.9.0, triggers kwargs are serialized and encrypted before being stored in the database. This means that any sensitive information you pass to a trigger will be stored in the database in an encrypted form, and decrypted when it is read from the database.
Triggering Deferral¶
If you want to trigger deferral, at any place in your operator, you can call self.defer(trigger, method_name, kwargs, timeout)
. This raises a special exception for Airflow. The arguments are:
trigger
: An instance of a trigger that you want to defer to. It will be serialized into the database.method_name
: The method name on your operator that you want Airflow to call when it resumes.kwargs
: (Optional) Additional keyword arguments to pass to the method when it is called. Defaults to{}
.timeout
: (Optional) A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Defaults toNone
, which means no timeout.
Here’s a basic example of how a sensor might trigger deferral:
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context: Context) -> None:
self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
When you opt to defer, your operator will stop executing at that point and be removed from its current worker. No state will persist, such as local variables or attributes set on self
. When your operator resumes, it resumes as a new instance of it. The only way you can pass state from the old instance of the operator to the new one is with method_name
and kwargs
.
When your operator resumes, Airflow adds a context
object and an event
object to the kwargs passed to the method_name
method. This event
object contains the payload from the trigger event that resumed your operator. Depending on the trigger, this can be useful to your operator, like it’s a status code or URL to fetch results. Or, it might be unimportant information, like a datetime. Your method_name
method, however, must accept context
and event
as a keyword argument.
If your operator returns from either its first execute()
method when it’s new, or a subsequent method specified by method_name
, it will be considered complete and finish executing.
You can set method_name
to execute
if you want your operator to have one entrypoint, but it must also accept event
as an optional keyword argument.
Let’s take a deeper look into the WaitOneHourSensor
example above. This sensor is just a thin wrapper around the trigger. It defers to the trigger, and specifies a different method to come back to when the trigger fires. When it returns immediately, it marks the sensor as successful.
The self.defer
call raises the TaskDeferred
exception, so it can work anywhere inside your operator’s code, even when nested many calls deep inside execute()
. You can also raise TaskDeferred
manually, which uses the same arguments as self.defer
.
execution_timeout
on operators is determined from the total runtime, not individual executions between deferrals. This means that if execution_timeout
is set, an operator can fail while it’s deferred or while it’s running after a deferral, even if it’s only been resumed for a few seconds.
Triggering Deferral from Task Start¶
New in version 2.10.0.
If you want to defer your task directly to the triggerer without going into the worker, you can set class level attribute start_from_trigger
to True
and add a class level attribute start_trigger_args
with an StartTriggerArgs
object with the following 4 attributes to your deferrable operator:
trigger_cls
: An importable path to your trigger class.trigger_kwargs
: Keyword arguments to pass to thetrigger_cls
when it’s initialized. Note that all the arguments need to be serializable. It’s the main limitation of this feature.next_method
: The method name on your operator that you want Airflow to call when it resumes.next_kwargs
: Additional keyword arguments to pass to thenext_method
when it is called.timeout
: (Optional) A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Defaults toNone
, which means no timeout.
This is particularly useful when deferring is the only thing the execute
method does. Here’s a basic refinement of the previous example. In the previous example, we used DateTimeTrigger
which takes an argument delta
with type datetime.timedelta
which is not serializable. Thus, we need to create a new trigger with serializable arguments.
from __future__ import annotations
import datetime
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
class HourDeltaTrigger(DateTimeTrigger):
def __init__(self, hours: int):
moment = timezone.utcnow() + datetime.timedelta(hours=hours)
super().__init__(moment=moment)
In the sensor part, we’ll need to provide the path to HourDeltaTrigger
as trigger_cls
.
from __future__ import annotations
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
start_from_trigger
and trigger_kwargs
can also be modified at the instance level for more flexible configuration.
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitTwoHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = {"hours": 2}
self.start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define start_from_trigger
and trigger_kwargs
in the __init__
method. Note that you don’t need to define both of them to use this feature, but you need to use the exact same parameter name. For example, if you define an argument as t_kwargs
and assign this value to self.start_trigger_args.trigger_kwargs
, it will not have any effect. The entire __init__
method will be skipped when mapping a task whose start_from_trigger
is set to True. The scheduler will use the provided start_from_trigger
and trigger_kwargs
from partial
and expand
(fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won’t be resolved at this stage.
After the trigger has finished executing, the task may be sent back to the worker to execute the next_method
, or the task instance may end directly. (Refer to Exiting deferred task from Triggers) If the task is sent back to the worker, the arguments in the __init__
method will still take effect before the next_method
is executed, but they will not affect the execution of the trigger.
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitHoursSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(
self,
*args: list[Any],
trigger_kwargs: dict[str, Any] | None,
start_from_trigger: bool,
**kwargs: dict[str, Any],
) -> None:
# This whole method will be skipped during dynamic task mapping.
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
This will be expanded into 2 tasks, with their “hours” arguments set to 1 and 2 respectively.
WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)
Exiting deferred task from Triggers¶
New in version 2.10.0.
If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute end_from_trigger
with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker.
Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the method_name
does not matter and can be None
. Otherwise, provide method_name
that should be used when resuming execution in the task.
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
TaskSuccessEvent
and TaskFailureEvent
are the two events that can be used to end the task instance directly. This marks the task with the state task_instance_state
and optionally pushes xcom if applicable. Here’s an example of how to use these events:
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
In the above example, the trigger will end the task instance directly if end_from_trigger
is set to True
by yielding TaskSuccessEvent
. Otherwise, it will resume the task instance with the method specified in the operator.
Note
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the end_from_trigger
attribute set to True
and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the end_from_trigger
attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.
High Availability¶
Triggers are designed to work in a high availability (HA) architecture. If you want to run a high availability setup, run multiple copies of triggerer
on multiple hosts. Much like scheduler
, they automatically co-exist with correct locking and HA.
Depending on how much work the triggers are doing, you can fit hundreds to tens of thousands of triggers on a single triggerer
host. By default, every triggerer
has a capacity of 1000 triggers that it can try to run at once. You can change the number of triggers that can run simultaneously with the --capacity
argument. If you have more triggers trying to run than you have capacity across all of your triggerer
processes, some triggers will be delayed from running until others have completed.
Airflow tries to only run triggers in one place at once, and maintains a heartbeat to all triggerers
that are currently running. If a triggerer
dies, or becomes partitioned from the network where Airflow’s database is running, Airflow automatically re-schedules triggers that were on that host to run elsewhere. Airflow waits (2.1 * triggerer.job_heartbeat_sec
) seconds for the machine to re-appear before rescheduling the triggers.
This means it’s possible, but unlikely, for triggers to run in multiple places at once. This behavior is designed into the trigger contract, however, and is expected behavior. Airflow de-duplicates events fired when a trigger is running in multiple places simultaneously, so this process is transparent to your operators.
Note that every extra triggerer
you run results in an extra persistent connection to your database.
Difference between Mode=’reschedule’ and Deferrable=True in Sensors¶
In Airflow, sensors wait for specific conditions to be met before proceeding with downstream tasks. Sensors have two options for managing idle periods: mode='reschedule'
and deferrable=True
. Because mode='reschedule'
is a parameter specific to the BaseSensorOperator in Airflow, it allows the sensor to reschedule itself if the condition is not met. 'deferrable=True'
is a convention used by some operators to indicate that the task can be retried (or deferred) later, but it is not a built-in parameter or mode in Airflow. The actual behavior of retrying the task varies depending on the specific operator implementation.
mode=’reschedule’ |
deferrable=True |
---|---|
Continuously reschedules itself until condition is met |
Pauses execution when idle, resumes when condition changes |
Resource use is higher (repeated execution) |
Resource use is lower (pauses when idle, frees up worker slots) |
Conditions expected to change over time (e.g. file creation) |
Waiting for external events or resources (e.g. API response) |
Built-in functionality for rescheduling |
Requires custom logic to defer task and handle external changes |