## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromdatetimeimportdatetimefromtypingimportTYPE_CHECKING,Any,Iterable,List,NamedTuple,Optional,Tuple,Unionfromsqlalchemyimport(Boolean,Column,DateTime,Index,Integer,PickleType,String,UniqueConstraint,and_,func,or_,)fromsqlalchemy.excimportIntegrityErrorfromsqlalchemy.ext.declarativeimportdeclared_attrfromsqlalchemy.ormimportbackref,relationship,synonymfromsqlalchemy.orm.sessionimportSessionfromairflowimportsettingsfromairflow.configurationimportconfasairflow_conffromairflow.exceptionsimportAirflowException,TaskNotFoundfromairflow.models.baseimportID_LEN,Basefromairflow.models.taskinstanceimportTaskInstanceasTIfromairflow.statsimportStatsfromairflow.ti_deps.dep_contextimportDepContextfromairflow.ti_deps.dependencies_statesimportSCHEDULEABLE_STATESfromairflow.utilsimportcallback_requests,timezonefromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.sessionimportprovide_sessionfromairflow.utils.sqlalchemyimportUtcDateTime,nulls_first,skip_locked,with_row_locksfromairflow.utils.stateimportStatefromairflow.utils.typesimportDagRunTypeifTYPE_CHECKING:fromairflow.models.dagimportDAG
[docs]classTISchedulingDecision(NamedTuple):"""Type of return for DagRun.task_instance_scheduling_decisions"""
[docs]classDagRun(Base,LoggingMixin):""" DagRun describes an instance of a Dag. It can be created by the scheduler (for regular runs) or by an external trigger """
[docs]defrefresh_from_db(self,session:Session=None):""" Reloads the current dagrun from the database :param session: database session :type session: Session """DR=DagRunexec_date=func.cast(self.execution_date,DateTime)dr=(session.query(DR).filter(DR.dag_id==self.dag_id,func.cast(DR.execution_date,DateTime)==exec_date,DR.run_id==self.run_id,).one())self.id=dr.idself.state=dr.state
@classmethod
[docs]defnext_dagruns_to_examine(cls,session:Session,max_number:Optional[int]=None,):""" Return the next DagRuns that the scheduler should attempt to schedule. This will return zero or more DagRun rows that are row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. :rtype: list[airflow.models.DagRun] """fromairflow.models.dagimportDagModelifmax_numberisNone:max_number=cls.DEFAULT_DAGRUNS_TO_EXAMINE# TODO: Bake this query, it is run _A lot_query=(session.query(cls).filter(cls.state==State.RUNNING,cls.run_type!=DagRunType.BACKFILL_JOB).join(DagModel,DagModel.dag_id==cls.dag_id,).filter(DagModel.is_paused.is_(False),DagModel.is_active.is_(True),).order_by(nulls_first(cls.last_scheduling_decision,session=session),cls.execution_date,))ifnotsettings.ALLOW_FUTURE_EXEC_DATES:query=query.filter(DagRun.execution_date<=func.now())returnwith_row_locks(query.limit(max_number),of=cls,session=session,**skip_locked(session=session)
)@staticmethod@provide_session
[docs]deffind(dag_id:Optional[Union[str,List[str]]]=None,run_id:Optional[str]=None,execution_date:Optional[datetime]=None,state:Optional[str]=None,external_trigger:Optional[bool]=None,no_backfills:bool=False,run_type:Optional[DagRunType]=None,session:Session=None,execution_start_date:Optional[datetime]=None,execution_end_date:Optional[datetime]=None,)->List["DagRun"]:""" Returns a set of dag runs for the given search criteria. :param dag_id: the dag_id or list of dag_id to find dag runs for :type dag_id: str or list[str] :param run_id: defines the run id for this dag run :type run_id: str :param run_type: type of DagRun :type run_type: airflow.utils.types.DagRunType :param execution_date: the execution date :type execution_date: datetime.datetime or list[datetime.datetime] :param state: the state of the dag run :type state: str :param external_trigger: whether this dag run is externally triggered :type external_trigger: bool :param no_backfills: return no backfills (True), return all (False). Defaults to False :type no_backfills: bool :param session: database session :type session: sqlalchemy.orm.session.Session :param execution_start_date: dag run that was executed from this date :type execution_start_date: datetime.datetime :param execution_end_date: dag run that was executed until this date :type execution_end_date: datetime.datetime """DR=DagRunqry=session.query(DR)dag_ids=[dag_id]ifisinstance(dag_id,str)elsedag_idifdag_ids:qry=qry.filter(DR.dag_id.in_(dag_ids))ifrun_id:qry=qry.filter(DR.run_id==run_id)ifexecution_date:ifisinstance(execution_date,list):qry=qry.filter(DR.execution_date.in_(execution_date))else:qry=qry.filter(DR.execution_date==execution_date)ifexecution_start_dateandexecution_end_date:qry=qry.filter(DR.execution_date.between(execution_start_date,execution_end_date))elifexecution_start_date:qry=qry.filter(DR.execution_date>=execution_start_date)elifexecution_end_date:qry=qry.filter(DR.execution_date<=execution_end_date)ifstate:qry=qry.filter(DR.state==state)ifexternal_triggerisnotNone:qry=qry.filter(DR.external_trigger==external_trigger)ifrun_type:qry=qry.filter(DR.run_type==run_type)ifno_backfills:qry=qry.filter(DR.run_type!=DagRunType.BACKFILL_JOB)returnqry.order_by(DR.execution_date).all()
@staticmethod
[docs]defgenerate_run_id(run_type:DagRunType,execution_date:datetime)->str:"""Generate Run ID based on Run Type and Execution Date"""returnf"{run_type}__{execution_date.isoformat()}"
@provide_session
[docs]defget_task_instances(self,state=None,session=None)->Iterable[TI]:"""Returns the task instances for this dag run"""tis=session.query(TI).filter(TI.dag_id==self.dag_id,TI.execution_date==self.execution_date,)ifstate:ifisinstance(state,str):tis=tis.filter(TI.state==state)else:# this is required to deal with NULL valuesifNoneinstate:ifall(xisNoneforxinstate):tis=tis.filter(TI.state.is_(None))else:not_none_state=[sforsinstateifs]tis=tis.filter(or_(TI.state.in_(not_none_state),TI.state.is_(None)))else:tis=tis.filter(TI.state.in_(state))ifself.dagandself.dag.partial:tis=tis.filter(TI.task_id.in_(self.dag.task_ids))returntis.all()
@provide_session
[docs]defget_task_instance(self,task_id:str,session:Session=None)->Optional[TI]:""" Returns the task instance specified by task_id for this dag run :param task_id: the task id :type task_id: str :param session: Sqlalchemy ORM Session :type session: Session """return(session.query(TI).filter(TI.dag_id==self.dag_id,TI.execution_date==self.execution_date,TI.task_id==task_id)
.first())
[docs]defget_dag(self)->"DAG":""" Returns the Dag associated with this DagRun. :return: DAG """ifnotself.dag:raiseAirflowException(f"The DAG (.dag) for {self} needs to be set")returnself.dag
@provide_session
[docs]defget_previous_dagrun(self,state:Optional[str]=None,session:Session=None)->Optional['DagRun']:"""The previous DagRun, if there is one"""filters=[DagRun.dag_id==self.dag_id,DagRun.execution_date<self.execution_date,]ifstateisnotNone:filters.append(DagRun.state==state)returnsession.query(DagRun).filter(*filters).order_by(DagRun.execution_date.desc()).first()
@provide_session
[docs]defget_previous_scheduled_dagrun(self,session:Session=None)->Optional['DagRun']:"""The previous, SCHEDULED DagRun, if there is one"""dag=self.get_dag()return(session.query(DagRun).filter(DagRun.dag_id==self.dag_id,DagRun.execution_date==dag.previous_schedule(self.execution_date),
).first())@provide_session
[docs]defupdate_state(self,session:Session=None,execute_callbacks:bool=True)->Tuple[List[TI],Optional[callback_requests.DagCallbackRequest]]:""" Determines the overall state of the DagRun based on the state of its TaskInstances. :param session: Sqlalchemy ORM Session :type session: Session :param execute_callbacks: Should dag callbacks (success/failure, SLA etc) be invoked directly (default: true) or recorded as a pending request in the ``callback`` property :type execute_callbacks: bool :return: Tuple containing tis that can be scheduled in the current loop & `callback` that needs to be executed """# Callback to execute in case of Task Failurescallback:Optional[callback_requests.DagCallbackRequest]=Nonestart_dttm=timezone.utcnow()self.last_scheduling_decision=start_dttmwithStats.timer(f"dagrun.dependency-check.{self.dag_id}"):dag=self.get_dag()info=self.task_instance_scheduling_decisions(session)tis=info.tisschedulable_tis=info.schedulable_tischanged_tis=info.changed_tisfinished_tasks=info.finished_tasksunfinished_tasks=info.unfinished_tasksnone_depends_on_past=all(nott.task.depends_on_pastfortinunfinished_tasks)none_task_concurrency=all(t.task.task_concurrencyisNonefortinunfinished_tasks)ifunfinished_tasksandnone_depends_on_pastandnone_task_concurrency:# small speed upare_runnable_tasks=(schedulable_tisorself._are_premature_tis(unfinished_tasks,finished_tasks,session)orchanged_tis)leaf_task_ids={t.task_idfortindag.leaves}leaf_tis=[tifortiintisifti.task_idinleaf_task_ids]# if all roots finished and at least one failed, the run failedifnotunfinished_tasksandany(leaf_ti.stateinState.failed_statesforleaf_tiinleaf_tis):self.log.error('Marking run %s failed',self)self.set_state(State.FAILED)ifexecute_callbacks:dag.handle_callback(self,success=False,reason='task_failure',session=session)elifdag.has_on_failure_callback:callback=callback_requests.DagCallbackRequest(full_filepath=dag.fileloc,dag_id=self.dag_id,execution_date=self.execution_date,is_failure_callback=True,msg='task_failure',)# if all leaves succeeded and no unfinished tasks, the run succeededelifnotunfinished_tasksandall(leaf_ti.stateinState.success_statesforleaf_tiinleaf_tis):self.log.info('Marking run %s successful',self)self.set_state(State.SUCCESS)ifexecute_callbacks:dag.handle_callback(self,success=True,reason='success',session=session)elifdag.has_on_success_callback:callback=callback_requests.DagCallbackRequest(full_filepath=dag.fileloc,dag_id=self.dag_id,execution_date=self.execution_date,is_failure_callback=False,msg='success',)# if *all tasks* are deadlocked, the run failedelifunfinished_tasksandnone_depends_on_pastandnone_task_concurrencyandnotare_runnable_tasks:self.log.error('Deadlock; marking run %s failed',self)self.set_state(State.FAILED)ifexecute_callbacks:dag.handle_callback(self,success=False,reason='all_tasks_deadlocked',session=session)elifdag.has_on_failure_callback:callback=callback_requests.DagCallbackRequest(full_filepath=dag.fileloc,dag_id=self.dag_id,execution_date=self.execution_date,is_failure_callback=True,msg='all_tasks_deadlocked',)# finally, if the roots aren't done, the dag is still runningelse:self.set_state(State.RUNNING)self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)self._emit_duration_stats_for_finished_state()session.merge(self)returnschedulable_tis,callback
@provide_session
[docs]deftask_instance_scheduling_decisions(self,session:Session=None)->TISchedulingDecision:schedulable_tis:List[TI]=[]changed_tis=Falsetis=list(self.get_task_instances(session=session,state=State.task_states+(State.SHUTDOWN,)))self.log.debug("number of tis tasks for %s: %s task(s)",self,len(tis))fortiintis:try:ti.task=self.get_dag().get_task(ti.task_id)exceptTaskNotFound:self.log.warning("Failed to get task '%s' for dag '%s'. Marking it as removed.",ti,ti.dag_id)ti.state=State.REMOVEDsession.flush()unfinished_tasks=[tfortintisift.stateinState.unfinished]finished_tasks=[tfortintisift.stateinState.finished]ifunfinished_tasks:scheduleable_tasks=[utforutinunfinished_tasksifut.stateinSCHEDULEABLE_STATES]self.log.debug("number of scheduleable tasks for %s: %s task(s)",self,len(scheduleable_tasks))schedulable_tis,changed_tis=self._get_ready_tis(scheduleable_tasks,finished_tasks,session)returnTISchedulingDecision(tis=tis,schedulable_tis=schedulable_tis,changed_tis=changed_tis,unfinished_tasks=unfinished_tasks,finished_tasks=finished_tasks,
)
[docs]def_get_ready_tis(self,scheduleable_tasks:List[TI],finished_tasks:List[TI],session:Session,)->Tuple[List[TI],bool]:old_states={}ready_tis:List[TI]=[]changed_tis=Falseifnotscheduleable_tasks:returnready_tis,changed_tis# Check dependenciesforstinscheduleable_tasks:old_state=st.stateifst.are_dependencies_met(dep_context=DepContext(flag_upstream_failed=True,finished_tasks=finished_tasks),session=session,):ready_tis.append(st)else:old_states[st.key]=old_state# Check if any ti changed statetis_filter=TI.filter_for_tis(old_states.keys())iftis_filterisnotNone:fresh_tis=session.query(TI).filter(tis_filter).all()changed_tis=any(ti.state!=old_states[ti.key]fortiinfresh_tis)returnready_tis,changed_tis
[docs]def_are_premature_tis(self,unfinished_tasks:List[TI],finished_tasks:List[TI],session:Session,)->bool:# there might be runnable tasks that are up for retry and for some reason(retry delay, etc) are# not ready yet so we set the flags to count them inforutinunfinished_tasks:ifut.are_dependencies_met(dep_context=DepContext(flag_upstream_failed=True,ignore_in_retry_period=True,ignore_in_reschedule_period=True,finished_tasks=finished_tasks,),session=session,):returnTruereturnFalse
[docs]def_emit_true_scheduling_delay_stats_for_finished_state(self,finished_tis):""" This is a helper method to emit the true scheduling delay stats, which is defined as the time when the first task in DAG starts minus the expected DAG run datetime. This method will be used in the update_state method when the state of the DagRun is updated to a completed status (either success or failure). The method will find the first started task within the DAG and calculate the expected DagRun start time (based on dag.execution_date & dag.schedule_interval), and minus these two values to get the delay. The emitted data may contains outlier (e.g. when the first task was cleared, so the second task's start_date will be used), but we can get rid of the outliers on the stats side through the dashboards tooling built. Note, the stat will only be emitted if the DagRun is a scheduler triggered one (i.e. external_trigger is False). """ifself.state==State.RUNNING:returnifself.external_trigger:returnifnotfinished_tis:returntry:dag=self.get_dag()ifnotself.dag.schedule_intervalorself.dag.schedule_interval=="@once":# We can't emit this metric if there is no following schedule to calculate from!returnordered_tis_by_start_date=[tifortiinfinished_tisifti.start_date]ordered_tis_by_start_date.sort(key=lambdati:ti.start_date,reverse=False)first_start_date=ordered_tis_by_start_date[0].start_dateiffirst_start_date:# dag.following_schedule calculates the expected start datetime for a scheduled dagrun# i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,# and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparisontrue_delay=first_start_date-dag.following_schedule(self.execution_date)iftrue_delay.total_seconds()>0:Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay',true_delay)exceptExceptionase:self.log.warning(f'Failed to record first_task_scheduling_delay metric:\n{e}')
[docs]def_emit_duration_stats_for_finished_state(self):ifself.state==State.RUNNING:returnifself.start_dateisNone:self.log.warning('Failed to record duration of %s: start_date is not set.',self)returnifself.end_dateisNone:self.log.warning('Failed to record duration of %s: end_date is not set.',self)returnduration=self.end_date-self.start_dateifself.state==State.SUCCESS:Stats.timing(f'dagrun.duration.success.{self.dag_id}',duration)elifself.state==State.FAILED:Stats.timing(f'dagrun.duration.failed.{self.dag_id}',duration)
@provide_session
[docs]defverify_integrity(self,session:Session=None):""" Verifies the DagRun by checking for removed tasks or tasks that are not in the database yet. It will set state to removed or add the task if required. :param session: Sqlalchemy ORM Session :type session: Session """fromairflow.settingsimporttask_instance_mutation_hookdag=self.get_dag()tis=self.get_task_instances(session=session)# check for removed or restored taskstask_ids=set()fortiintis:task_instance_mutation_hook(ti)task_ids.add(ti.task_id)task=Nonetry:task=dag.get_task(ti.task_id)exceptAirflowException:ifti.state==State.REMOVED:pass# ti has already been removed, just ignore itelifself.state!=State.RUNNINGandnotdag.partial:self.log.warning("Failed to get task '%s' for dag '%s'. Marking it as removed.",ti,dag)Stats.incr(f"task_removed_from_dag.{dag.dag_id}",1,1)ti.state=State.REMOVEDshould_restore_task=(taskisnotNone)andti.state==State.REMOVEDifshould_restore_task:self.log.info("Restoring task '%s' which was previously removed from DAG '%s'",ti,dag)Stats.incr(f"task_restored_to_dag.{dag.dag_id}",1,1)ti.state=State.NONEsession.merge(ti)# check for missing tasksfortaskindag.task_dict.values():iftask.start_date>self.execution_dateandnotself.is_backfill:continueiftask.task_idnotintask_ids:Stats.incr(f"task_instance_created-{task.task_type}",1,1)ti=TI(task,self.execution_date)task_instance_mutation_hook(ti)session.add(ti)try:session.flush()exceptIntegrityErroraserr:self.log.info(str(err))self.log.info('Hit IntegrityError while creating the TIs for 'f'{dag.dag_id} - {self.execution_date}.')self.log.info('Doing session rollback.')# TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.session.rollback()
@staticmethod
[docs]defget_run(session:Session,dag_id:str,execution_date:datetime)->Optional['DagRun']:""" Get a single DAG Run :param session: Sqlalchemy ORM Session :type session: Session :param dag_id: DAG ID :type dag_id: unicode :param execution_date: execution date :type execution_date: datetime :return: DagRun corresponding to the given dag_id and execution date if one exists. None otherwise. :rtype: airflow.models.DagRun """return(session.query(DagRun).filter(DagRun.dag_id==dag_id,DagRun.external_trigger==False,# noqa pylint: disable=singleton-comparisonDagRun.execution_date==execution_date,
[docs]defget_latest_runs(cls,session=None)->List['DagRun']:"""Returns the latest DagRun for each DAG"""subquery=(session.query(cls.dag_id,func.max(cls.execution_date).label('execution_date')).group_by(cls.dag_id).subquery())return(session.query(cls).join(subquery,and_(cls.dag_id==subquery.c.dag_id,cls.execution_date==subquery.c.execution_date),
).all())@provide_session
[docs]defschedule_tis(self,schedulable_tis:Iterable[TI],session:Session=None)->int:""" Set the given task instances in to the scheduled state. Each element of ``schedulable_tis`` should have it's ``task`` attribute already set. Any DummyOperator without callbacks is instead set straight to the success state. All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it is the caller's responsibility to call this function only with TIs from a single dag run. """# Get list of TI IDs that do not need to executed, these are# tasks using DummyOperator and without on_execute_callback / on_success_callbackdummy_ti_ids=[]schedulable_ti_ids=[]fortiinschedulable_tis:if(ti.task.inherits_from_dummy_operatorandnotti.task.on_execute_callbackandnotti.task.on_success_callback):dummy_ti_ids.append(ti.task_id)else:schedulable_ti_ids.append(ti.task_id)count=0ifschedulable_ti_ids:count+=(session.query(TI).filter(TI.dag_id==self.dag_id,TI.execution_date==self.execution_date,TI.task_id.in_(schedulable_ti_ids),).update({TI.state:State.SCHEDULED},synchronize_session=False))# Tasks using DummyOperator should not be executed, mark them as successifdummy_ti_ids:count+=(session.query(TI).filter(TI.dag_id==self.dag_id,TI.execution_date==self.execution_date,TI.task_id.in_(dummy_ti_ids),).update({TI.state:State.SUCCESS,TI.start_date:timezone.utcnow(),TI.end_date:timezone.utcnow(),TI.duration:0,},synchronize_session=False,))returncount