Source code for airflow.timetables.events
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import itertools
from typing import TYPE_CHECKING, Iterable
import pendulum
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone
if TYPE_CHECKING:
    from pendulum import DateTime
    from airflow.timetables.base import TimeRestriction
[docs]class EventsTimetable(Timetable):
    """
    Timetable that schedules DAG runs at specific listed datetimes.
    Suitable for predictable but truly irregular scheduling such as sporting events.
    :param event_dates: List of datetimes for the DAG to run at. Duplicates will be ignored. Must be finite
                        and of reasonable size as it will be loaded in its entirety.
    :param restrict_to_events: Whether manual runs should use the most recent event or
        the current time
    :param presorted: if True, event_dates will be assumed to be in ascending order. Provides modest
        performance improvement for larger lists of event_dates.
    :param description: A name for the timetable to display in the UI. Default None will be shown as
                        "X Events" where X is the len of event_dates
    """
    def __init__(
        self,
        event_dates: Iterable[DateTime],
        restrict_to_events: bool = False,
        presorted: bool = False,
        description: str | None = None,
    ):
        self.event_dates = list(event_dates)  # Must be reversible and indexable
        if not presorted:
            # For long lists this could take a while, so only want to do it once
            self.event_dates.sort()
        self.restrict_to_events = restrict_to_events
        if description is None:
            if self.event_dates:
                self.description = (
                    f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}"
                )
            else:
                self.description = "No events"
            self._summary = f"{len(self.event_dates)} events"
        else:
            self._summary = description
            self.description = description
    @property
[docs]    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        earliest = restriction.earliest
        if not restriction.catchup:
            current_time = timezone.utcnow()
            if earliest is None or current_time > earliest:
                earliest = pendulum.instance(current_time)
        for next_event in self.event_dates:
            if earliest and next_event < earliest:
                continue
            if last_automated_data_interval and next_event <= last_automated_data_interval.end:
                continue
            break
        else:
            # We need to return None if self.event_dates is empty or,
            # if not empty, when no suitable event can be found.
            return None
        if restriction.latest is not None and next_event > restriction.latest:
            return None
        return DagRunInfo.exact(next_event)
[docs]    def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
        # If Timetable not restricted to events, run for the time specified
        if not self.restrict_to_events or not self.event_dates:
            return DataInterval.exact(run_after)
        # If restricted to events, run for the most recent past event
        # or for the first event if all events are in the future
        if run_after < self.event_dates[0]:
            return DataInterval.exact(self.event_dates[0])
        else:
            past_events = itertools.dropwhile(lambda when: when > run_after, self.event_dates[::-1])
            most_recent_event = next(past_events)
            return DataInterval.exact(most_recent_event)
[docs]    def serialize(self):
        return {
            "event_dates": [str(x) for x in self.event_dates],
            "restrict_to_events": self.restrict_to_events,
        }
    @classmethod
[docs]    def deserialize(cls, data) -> Timetable:
        return cls(
            [pendulum.DateTime.fromisoformat(x) for x in data["event_dates"]],
            data["restrict_to_events"],
            presorted=True,
        )