# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportoperatorfromtypingimportTYPE_CHECKING,Any,CollectionfrompendulumimportDateTimefromairflow.timetables.baseimportDagRunInfo,DataInterval,TimetableifTYPE_CHECKING:fromsqlalchemyimportSessionfromairflow.models.datasetimportDatasetEventfromairflow.timetables.baseimportTimeRestrictionfromairflow.utils.typesimportDagRunTypeclass_TrivialTimetable(Timetable):"""Some code reuse for "trivial" timetables that has nothing complex."""periodic=Falserun_ordering=("execution_date",)@classmethoddefdeserialize(cls,data:dict[str,Any])->Timetable:returncls()def__eq__(self,other:Any)->bool:"""As long as *other* is of the same type. This is only for testing purposes and should not be relied on otherwise. """ifnotisinstance(other,type(self)):returnNotImplementedreturnTruedefserialize(self)->dict[str,Any]:return{}definfer_manual_data_interval(self,*,run_after:DateTime)->DataInterval:returnDataInterval.exact(run_after)
[docs]classNullTimetable(_TrivialTimetable):"""Timetable that never schedules anything. This corresponds to ``schedule=None``. """
[docs]classOnceTimetable(_TrivialTimetable):"""Timetable that schedules the execution once as soon as possible. This corresponds to ``schedule="@once"``. """
[docs]defnext_dagrun_info(self,*,last_automated_data_interval:DataInterval|None,restriction:TimeRestriction,)->DagRunInfo|None:iflast_automated_data_intervalisnotNone:returnNone# Already run, no more scheduling.ifrestriction.earliestisNone:# No start date, won't run.returnNone# "@once" always schedule to the start_date determined by the DAG and# tasks, regardless of catchup or not. This has been the case since 1.10# and we're inheriting it. See AIRFLOW-1928.run_after=restriction.earliestifrestriction.latestisnotNoneandrun_after>restriction.latest:returnNonereturnDagRunInfo.exact(run_after)
[docs]classContinuousTimetable(_TrivialTimetable):"""Timetable that schedules continually, while still respecting start_date and end_date. This corresponds to ``schedule="@continuous"``. """
[docs]description:str="As frequently as possible, but only one run at a time."
[docs]active_runs_limit=1# Continuous DAGRuns should be constrained to one run at a time
[docs]defnext_dagrun_info(self,*,last_automated_data_interval:DataInterval|None,restriction:TimeRestriction,)->DagRunInfo|None:ifrestriction.earliestisNone:# No start date, won't run.returnNoneiflast_automated_data_intervalisnotNone:# has already run oncestart=last_automated_data_interval.endend=DateTime.utcnow()else:# first runstart=restriction.earliestend=max(restriction.earliest,DateTime.utcnow())# won't run any earlier than start_dateifrestriction.latestisnotNoneandend>restriction.latest:returnNonereturnDagRunInfo.interval(start,end)
classDatasetTriggeredTimetable(_TrivialTimetable):"""Timetable that never schedules anything. This should not be directly used anywhere, but only set if a DAG is triggered by datasets. :meta private: """description:str="Triggered by datasets"@propertydefsummary(self)->str:return"Dataset"defgenerate_run_id(self,*,run_type:DagRunType,logical_date:DateTime,data_interval:DataInterval|None,session:Session|None=None,events:Collection[DatasetEvent]|None=None,**extra,)->str:fromairflow.models.dagrunimportDagRunreturnDagRun.generate_run_id(run_type,logical_date)defdata_interval_for_events(self,logical_date:DateTime,events:Collection[DatasetEvent],)->DataInterval:ifnotevents:returnDataInterval(logical_date,logical_date)start=min(events,key=operator.attrgetter("source_dag_run.data_interval_start")).source_dag_run.data_interval_startend=max(events,key=operator.attrgetter("source_dag_run.data_interval_end")).source_dag_run.data_interval_endreturnDataInterval(start,end)defnext_dagrun_info(self,*,last_automated_data_interval:DataInterval|None,restriction:TimeRestriction,)->DagRunInfo|None:returnNone