Customizing DAG Scheduling with Timetables

For our example, let’s say a company wants to run a job after each weekday to process data collected during the work day. The first intuitive answer to this would be schedule_interval="0 0 * * 1-5" (midnight on Monday to Friday), but this means data collected on Friday will not be processed right after Friday ends, but on the next Monday, and that run’s interval would be from midnight Friday to midnight Monday. What we want is:

  • Schedule a run for each Monday, Tuesday, Wednesday, Thursday, and Friday. The run’s data interval would cover from midnight of each day, to midnight of the next day (e.g. 2021-01-01 00:00:00 to 2021-01-02 00:00:00).

  • Each run would be created right after the data interval ends. The run covering Monday happens on midnight Tuesday and so on. The run covering Friday happens on midnight Saturday. No runs happen on midnights Sunday and Monday.

For simplicity, we will only deal with UTC datetimes in this example.

Note

All datetime values returned by a custom timetable MUST be “aware”, i.e. contains timezone information. Furthermore, they must use pendulum’s datetime and timezone types.

Timetable Registration

A timetable must be a subclass of Timetable, and be registered as a part of a plugin. The following is a skeleton for us to implement a new timetable:

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable


class AfterWorkdayTimetable(Timetable):
    pass


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]

Next, we’ll start putting code into AfterWorkdayTimetable. After the implementation is finished, we should be able to use the timetable in our DAG file:

import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable


with DAG(
    dag_id="example_after_workday_timetable_dag",
    start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
    timetable=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
) as dag:
    ...

Define Scheduling Logic

When Airflow’s scheduler encounters a DAG, it calls one of the two methods to know when to schedule the DAG’s next run.

  • next_dagrun_info: The scheduler uses this to learn the timetable’s regular schedule, i.e. the “one for every workday, run at the end of it” part in our example.

  • infer_manual_data_interval: When a DAG run is manually triggered (from the web UI, for example), the scheduler uses this method to learn about how to reverse-infer the out-of-schedule run’s data interval.

We’ll start with infer_manual_data_interval since it’s the easier of the two:

airflow/example_dags/plugins/workday.py[source]

def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
    weekday = run_after.weekday()
    if weekday in (0, 6):  # Monday and Sunday -- interval is last Friday.
        days_since_friday = (run_after.weekday() - 4) % 7
        delta = timedelta(days=days_since_friday)
    else:  # Otherwise the interval is yesterday.
        delta = timedelta(days=1)
    start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
    return DataInterval(start=start, end=(start + timedelta(days=1)))

The method accepts one argument run_after, a pendulum.DateTime object that indicates when the DAG is externally triggered. Since our timetable creates a data interval for each complete work day, the data interval inferred here should usually start at the midnight one day prior to run_after, but if run_after falls on a Sunday or Monday (i.e. the prior day is Saturday or Sunday), it should be pushed further back to the previous Friday. Once we know the start of the interval, the end is simply one full day after it. We then create a DataInterval object to describe this interval.

Next is the implementation of next_dagrun_info:

airflow/example_dags/plugins/workday.py[source]

def next_dagrun_info(
    self,
    *,
    last_automated_data_interval: Optional[DataInterval],
    restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
    if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
        last_start = last_automated_data_interval.start
        last_start_weekday = last_start.weekday()
        if 0 <= last_start_weekday < 4:  # Last run on Monday through Thursday -- next is tomorrow.
            delta = timedelta(days=1)
        else:  # Last run on Friday -- skip to next Monday.
            delta = timedelta(days=(7 - last_start_weekday))
        next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC)
    else:  # This is the first ever run on the regular schedule.
        next_start = restriction.earliest
        if next_start is None:  # No start_date. Don't schedule.
            return None
        if not restriction.catchup:
            # If the DAG has catchup=False, today is the earliest to consider.
            next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
        elif next_start.time() != Time.min:
            # If earliest does not fall on midnight, skip to the next day.
            next_day = next_start.date() + timedelta(days=1)
            next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
        next_start_weekday = next_start.weekday()
        if next_start_weekday in (5, 6):  # If next start is in the weekend, go to next Monday.
            delta = timedelta(days=(7 - next_start_weekday))
            next_start = next_start + delta
    if restriction.latest is not None and next_start > restriction.latest:
        return None  # Over the DAG's scheduled end; don't schedule.
    return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

This method accepts two arguments. last_automated_dagrun is a DataInterval instance indicating the data interval of this DAG’s previous non-manually-triggered run, or None if this is the first time ever the DAG is being scheduled. restriction encapsulates how the DAG and its tasks specify the schedule, and contains three attributes:

  • earliest: The earliest time the DAG may be scheduled. This is a pendulum.DateTime calculated from all the start_date arguments from the DAG and its tasks, or None if there are no start_date arguments found at all.

  • latest: Similar to earliest, this is the latest time the DAG may be scheduled, calculated from end_date arguments.

  • catchup: A boolean reflecting the DAG’s catchup argument.

Note

Both earliest and latest apply to the DAG run’s logical date (the start of the data interval), not when the run will be scheduled (usually after the end of the data interval).

If there was a run scheduled previously, we should now schedule for the next weekday, i.e. plus one day if the previous run was on Monday through Thursday, or three days if it was on Friday. If there was not a previous scheduled run, however, we pick the next workday’s midnight after restriction.earliest (unless it is a workday’s midnight; in which case it’s used directly). restriction.catchup also needs to be considered—if it’s False, we can’t schedule before the current time, even if start_date values are in the past. Finally, if our calculated data interval is later than restriction.latest, we must respect it and not schedule a run by returning None.

If we decide to schedule a run, we need to describe it with a DagRunInfo. This type has two arguments and attributes:

  • data_interval: A DataInterval instance describing the next run’s data interval.

  • run_after: A pendulum.DateTime instance that tells the scheduler when the DAG run can be scheduled.

A DagRunInfo can be created like this:

info = DagRunInfo(
    data_interval=DataInterval(start=start, end=end),
    run_after=run_after,
)

Since we typically want to schedule a run as soon as the data interval ends, end and run_after above are generally the same. DagRunInfo therefore provides a shortcut for this:

info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after  # Always True.

For reference, here’s our plugin and DAG files in their entirety:

airflow/example_dags/plugins/workday.py[source]

from datetime import timedelta
from typing import Optional

from pendulum import Date, DateTime, Time, timezone

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable

UTC = timezone("UTC")


class AfterWorkdayTimetable(Timetable):
    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        weekday = run_after.weekday()
        if weekday in (0, 6):  # Monday and Sunday -- interval is last Friday.
            days_since_friday = (run_after.weekday() - 4) % 7
            delta = timedelta(days=days_since_friday)
        else:  # Otherwise the interval is yesterday.
            delta = timedelta(days=1)
        start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
        return DataInterval(start=start, end=(start + timedelta(days=1)))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            last_start = last_automated_data_interval.start
            last_start_weekday = last_start.weekday()
            if 0 <= last_start_weekday < 4:  # Last run on Monday through Thursday -- next is tomorrow.
                delta = timedelta(days=1)
            else:  # Last run on Friday -- skip to next Monday.
                delta = timedelta(days=(7 - last_start_weekday))
            next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC)
        else:  # This is the first ever run on the regular schedule.
            next_start = restriction.earliest
            if next_start is None:  # No start_date. Don't schedule.
                return None
            if not restriction.catchup:
                # If the DAG has catchup=False, today is the earliest to consider.
                next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
            elif next_start.time() != Time.min:
                # If earliest does not fall on midnight, skip to the next day.
                next_day = next_start.date() + timedelta(days=1)
                next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
            next_start_weekday = next_start.weekday()
            if next_start_weekday in (5, 6):  # If next start is in the weekend, go to next Monday.
                delta = timedelta(days=(7 - next_start_weekday))
                next_start = next_start + delta
        if restriction.latest is not None and next_start > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]


import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.operators.dummy import DummyOperator


with DAG(
    dag_id="example_workday_timetable",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    timetable=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
) as dag:
    DummyOperator(task_id="run_this")

Parameterized Timetables

Sometimes we need to pass some run-time arguments to the timetable. Continuing with our AfterWorkdayTimetable example, maybe we have DAGs running on different timezones, and we want to schedule some DAGs at 8am the next day, instead of on midnight. Instead of creating a separate timetable for each purpose, we’d want to do something like:

class SometimeAfterWorkdayTimetable(Timetable):
    def __init__(self, schedule_at: Time) -> None:
        self._schedule_at = schedule_at

    def next_dagrun_info(self, last_automated_dagrun, restriction):
        ...
        end = start + timedelta(days=1)
        return DagRunInfo(
            data_interval=DataInterval(start=start, end=end),
            run_after=DateTime.combine(end.date(), self._schedule_at),
        )

However, since the timetable is a part of the DAG, we need to tell Airflow how to serialize it with the context we provide in __init__. This is done by implementing two additional methods on our timetable class:

class SometimeAfterWorkdayTimetable(Timetable):
    ...

    def serialize(self) -> Dict[str, Any]:
        return {"schedule_at": self._schedule_at.isoformat()}

    @classmethod
    def deserialize(cls, value: Dict[str, Any]) -> Timetable:
        return cls(Time.fromisoformat(value["schedule_at"]))

When the DAG is being serialized, serialize is called to obtain a JSON-serializable value. That value is passed to deserialize when the serialized DAG is accessed by the scheduler to reconstruct the timetable.

Timetable Display in UI

By default, a custom timetable is displayed by their class name in the UI (e.g. the Schedule column in the “DAGs” table. It is possible to customize this by overriding the summary property. This is especially useful for parameterized timetables to include arguments provided in __init__. For our SometimeAfterWorkdayTimetable class, for example, we could have:

@property
def summary(self) -> str:
    return f"after each workday, at {self._schedule_at}"

So for a DAG declared like this:

with DAG(
    timetable=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
) as dag:
    ...

The Schedule column would say after each workday, at 08:00:00.

See also

Module airflow.timetables.base

The public interface is heavily documented to explain what should be implemented by subclasses.

Was this entry helpful?