Source code for airflow.example_dags.plugins.workday
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Plugin to demostrate timetable registration and accomdate example DAGs."""# [START howto_timetable]fromdatetimeimporttimedeltafromtypingimportOptionalfrompendulumimportDate,DateTime,Time,timezonefromairflow.plugins_managerimportAirflowPluginfromairflow.timetables.baseimportDagRunInfo,DataInterval,TimeRestriction,TimetableUTC=timezone("UTC")classAfterWorkdayTimetable(Timetable):# [START howto_timetable_infer_data_interval]definfer_data_interval(self,run_after:DateTime)->DataInterval:weekday=run_after.weekday()ifweekdayin(0,6):# Monday and Sunday -- interval is last Friday.days_since_friday=(run_after.weekday()-4)%7delta=timedelta(days=days_since_friday)else:# Otherwise the interval is yesterday.delta=timedelta(days=1)start=DateTime.combine((run_after-delta).date(),Time.min).replace(tzinfo=UTC)returnDataInterval(start=start,end=(start+timedelta(days=1)))# [END howto_timetable_infer_data_interval]# [START howto_timetable_next_dagrun_info]defnext_dagrun_info(self,*,last_automated_data_interval:Optional[DataInterval],restriction:TimeRestriction,)->Optional[DagRunInfo]:iflast_automated_data_intervalisnotNone:# There was a previous run on the regular schedule.last_start=last_automated_data_interval.startlast_start_weekday=last_start.weekday()if0<=last_start_weekday<4:# Last run on Monday through Thursday -- next is tomorrow.delta=timedelta(days=1)else:# Last run on Friday -- skip to next Monday.delta=timedelta(days=(7-last_start_weekday))next_start=DateTime.combine((last_start+delta).date(),Time.min).replace(tzinfo=UTC)else:# This is the first ever run on the regular schedule.next_start=restriction.earliestifnext_startisNone:# No start_date. Don't schedule.returnNoneifnotrestriction.catchup:# If the DAG has catchup=False, today is the earliest to consider.next_start=max(next_start,DateTime.combine(Date.today(),Time.min).replace(tzinfo=UTC))elifnext_start.time()!=Time.min:# If earliest does not fall on midnight, skip to the next day.next_day=next_start.date()+timedelta(days=1)next_start=DateTime.combine(next_day,Time.min).replace(tzinfo=UTC)next_start_weekday=next_start.weekday()ifnext_start_weekdayin(5,6):# If next start is in the weekend, go to next Monday.delta=timedelta(days=(7-next_start_weekday))next_start=next_start+deltaifrestriction.latestisnotNoneandnext_start>restriction.latest:returnNone# Over the DAG's scheduled end; don't schedule.returnDagRunInfo.interval(start=next_start,end=(next_start+timedelta(days=1)))# [END howto_timetable_next_dagrun_info]classWorkdayTimetablePlugin(AirflowPlugin):name="workday_timetable_plugin"timetables=[AfterWorkdayTimetable]# [END howto_timetable]