Source code for

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import itertools
from typing import Iterable

import pendulum
from pendulum import DateTime

from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable

[docs]class EventsTimetable(Timetable): """ Timetable that schedules DAG runs at specific listed datetimes. Suitable for predictable but truly irregular scheduling such as sporting events. :param event_dates: List of datetimes for the DAG to run at. Duplicates will be ignored. Must be finite and of reasonable size as it will be loaded in its entirety. :param restrict_to_events: Whether manual runs should use the most recent event or the current time :param presorted: if True, event_dates will be assumed to be in ascending order. Provides modest performance improvement for larger lists of event_dates. :param description: A name for the timetable to display in the UI. Default None will be shown as "X Events" where X is the len of event_dates """ def __init__( self, event_dates: Iterable[DateTime], restrict_to_events: bool = False, presorted: bool = False, description: str | None = None, ): self.event_dates = list(event_dates) # Must be reversible and indexable if not presorted: # For long lists this could take a while, so only want to do it once self.event_dates = sorted(self.event_dates) self.restrict_to_events = restrict_to_events if description is None: self.description = ( f"{len(self.event_dates)} Events between {self.event_dates[0]} and {self.event_dates[-1]}" ) self._summary = f"{len(self.event_dates)} Events" else: self._summary = description self.description = description @property
[docs] def summary(self) -> str: return self._summary
[docs] def __repr__(self): return self.summary
[docs] def next_dagrun_info( self, *, last_automated_data_interval: DataInterval | None, restriction: TimeRestriction, ) -> DagRunInfo | None: if last_automated_data_interval is None: next_event = self.event_dates[0] else: future_dates = itertools.dropwhile( lambda when: when <= last_automated_data_interval.end, self.event_dates # type: ignore ) next_event = next(future_dates, None) # type: ignore if next_event is None: return None return DagRunInfo.exact(next_event)
[docs] def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # If Timetable not restricted to events, run for the time specified if not self.restrict_to_events: return DataInterval.exact(run_after) # If restricted to events, run for the most recent past event # or for the first event if all events are in the future if run_after < self.event_dates[0]: return DataInterval.exact(self.event_dates[0]) else: past_events = itertools.dropwhile(lambda when: when > run_after, self.event_dates[::-1]) most_recent_event = next(past_events) return DataInterval.exact(most_recent_event)
[docs] def serialize(self): return { "event_dates": [str(x) for x in self.event_dates], "restrict_to_events": self.restrict_to_events,
} @classmethod
[docs] def deserialize(cls, data) -> Timetable: return cls( [pendulum.DateTime.fromisoformat(x) for x in data["event_dates"]], data["restrict_to_events"], presorted=True,

Was this entry helpful?