Source code for airflow.example_dags.plugins.workday
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Plugin to demonstrate timetable registration and accommodate example DAGs."""
from __future__ import annotations
from datetime import timedelta
# [START howto_timetable]
from pandas.tseries.holiday import USFederalHolidayCalendar
from pendulum import UTC, Date, DateTime, Time
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
[docs]class AfterWorkdayTimetable(Timetable):
[docs] def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
cal = USFederalHolidayCalendar()
next_start = d
while True:
if next_start.weekday() in (5, 6): # If next start is in the weekend go to next day
next_start = next_start + incr * timedelta(days=1)
continue
holidays = cal.holidays(start=next_start, end=next_start).to_pydatetime()
if next_start in holidays: # If next start is a holiday go to next day
next_start = next_start + incr * timedelta(days=1)
continue
break
return next_start
# [START howto_timetable_infer_manual_data_interval]
[docs] def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
# [END howto_timetable_infer_manual_data_interval]
# [START howto_timetable_next_dagrun_info]
[docs] def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min).replace(
tzinfo=UTC
)
else: # This is the first ever run on the regular schedule.
next_start = restriction.earliest
if next_start is None: # No start_date. Don't schedule.
return None
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
elif next_start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(next_start.date() + timedelta(days=1), Time.min).replace(
tzinfo=UTC
)
# Skip weekends and holidays
next_start = self.get_next_workday(next_start)
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
# [END howto_timetable_next_dagrun_info]
# [END howto_timetable]