Source code for airflow.timetables.events

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import itertools
from typing import TYPE_CHECKING, Iterable

import pendulum

from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone

if TYPE_CHECKING:
    from pendulum import DateTime

    from airflow.timetables.base import TimeRestriction


[docs]class EventsTimetable(Timetable): """ Timetable that schedules DAG runs at specific listed datetimes. Suitable for predictable but truly irregular scheduling such as sporting events. :param event_dates: List of datetimes for the DAG to run at. Duplicates will be ignored. Must be finite and of reasonable size as it will be loaded in its entirety. :param restrict_to_events: Whether manual runs should use the most recent event or the current time :param presorted: if True, event_dates will be assumed to be in ascending order. Provides modest performance improvement for larger lists of event_dates. :param description: A name for the timetable to display in the UI. Default None will be shown as "X Events" where X is the len of event_dates """ def __init__( self, event_dates: Iterable[DateTime], restrict_to_events: bool = False, presorted: bool = False, description: str | None = None, ): self.event_dates = list(event_dates) # Must be reversible and indexable if not presorted: # For long lists this could take a while, so only want to do it once self.event_dates.sort() self.restrict_to_events = restrict_to_events if description is None: if self.event_dates: self.description = ( f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}" ) else: self.description = "No events" self._summary = f"{len(self.event_dates)} events" else: self._summary = description self.description = description @property
[docs] def summary(self) -> str: return self._summary
[docs] def __repr__(self): return self.summary
[docs] def next_dagrun_info( self, *, last_automated_data_interval: DataInterval | None, restriction: TimeRestriction, ) -> DagRunInfo | None: earliest = restriction.earliest if not restriction.catchup: current_time = timezone.utcnow() if earliest is None or current_time > earliest: earliest = pendulum.instance(current_time) for next_event in self.event_dates: if earliest and next_event < earliest: continue if last_automated_data_interval and next_event <= last_automated_data_interval.end: continue break else: # We need to return None if self.event_dates is empty or, # if not empty, when no suitable event can be found. return None if restriction.latest is not None and next_event > restriction.latest: return None return DagRunInfo.exact(next_event)
[docs] def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # If Timetable not restricted to events, run for the time specified if not self.restrict_to_events or not self.event_dates: return DataInterval.exact(run_after) # If restricted to events, run for the most recent past event # or for the first event if all events are in the future if run_after < self.event_dates[0]: return DataInterval.exact(self.event_dates[0]) else: past_events = itertools.dropwhile(lambda when: when > run_after, self.event_dates[::-1]) most_recent_event = next(past_events) return DataInterval.exact(most_recent_event)
[docs] def serialize(self): return { "event_dates": [str(x) for x in self.event_dates], "restrict_to_events": self.restrict_to_events, }
@classmethod
[docs] def deserialize(cls, data) -> Timetable: return cls( [pendulum.DateTime.fromisoformat(x) for x in data["event_dates"]], data["restrict_to_events"], presorted=True, )

Was this entry helpful?