# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimefromtracebackimportformat_exceptionfromtypingimportAny,IterablefromsqlalchemyimportColumn,Integer,String,func,or_fromsqlalchemy.ormimportrelationshipfromairflow.models.baseimportBasefromairflow.models.taskinstanceimportTaskInstancefromairflow.triggers.baseimportBaseTriggerfromairflow.utilsimporttimezonefromairflow.utils.retriesimportrun_with_db_retriesfromairflow.utils.sessionimportprovide_sessionfromairflow.utils.sqlalchemyimportExtendedJSON,UtcDateTime,with_row_locksfromairflow.utils.stateimportState
[docs]classTrigger(Base):""" Triggers are a workload that run in an asynchronous event loop shared with other Triggers, and fire off events that will unpause deferred Tasks, start linked DAGs, etc. They are persisted into the database and then re-hydrated into a "triggerer" process, where many are run at once. We model it so that there is a many-to-one relationship between Task and Trigger, for future deduplication logic to use. Rows will be evicted from the database when the triggerer detects no active Tasks/DAGs using them. Events are not stored in the database; when an Event is fired, the triggerer will directly push its data to the appropriate Task/DAG. """
[docs]deffrom_object(cls,trigger:BaseTrigger):""" Alternative constructor that creates a trigger row based directly off of a Trigger object. """classpath,kwargs=trigger.serialize()returncls(classpath=classpath,kwargs=kwargs)
@classmethod@provide_session
[docs]defbulk_fetch(cls,ids:Iterable[int],session=None)->dict[int,Trigger]:""" Fetches all of the Triggers by ID and returns a dict mapping ID -> Trigger instance """return{obj.id:objforobjinsession.query(cls).filter(cls.id.in_(ids)).all()}
@classmethod@provide_session
[docs]defclean_unused(cls,session=None):""" Deletes all triggers that have no tasks/DAGs dependent on them (triggers have a one-to-many relationship to both) """# Update all task instances with trigger IDs that are not DEFERRED to remove themforattemptinrun_with_db_retries():withattempt:session.query(TaskInstance).filter(TaskInstance.state!=State.DEFERRED,TaskInstance.trigger_id.isnot(None)).update({TaskInstance.trigger_id:None})# Get all triggers that have no task instances depending on them...ids=[trigger_idfor(trigger_id,)in(session.query(cls.id).join(TaskInstance,cls.id==TaskInstance.trigger_id,isouter=True).group_by(cls.id).having(func.count(TaskInstance.trigger_id)==0))]# ...and delete them (we can't do this in one query due to MySQL)session.query(Trigger).filter(Trigger.id.in_(ids)).delete(synchronize_session=False)
@classmethod@provide_session
[docs]defsubmit_event(cls,trigger_id,event,session=None):""" Takes an event from an instance of itself, and triggers all dependent tasks to resume. """fortask_instanceinsession.query(TaskInstance).filter(TaskInstance.trigger_id==trigger_id,TaskInstance.state==State.DEFERRED):# Add the event's payload into the kwargs for the tasknext_kwargs=task_instance.next_kwargsor{}next_kwargs["event"]=event.payloadtask_instance.next_kwargs=next_kwargs# Remove ourselves as its triggertask_instance.trigger_id=None# Finally, mark it as scheduled so it gets re-queuedtask_instance.state=State.SCHEDULED
@classmethod@provide_session
[docs]defsubmit_failure(cls,trigger_id,exc=None,session=None):""" Called when a trigger has failed unexpectedly, and we need to mark everything that depended on it as failed. Notably, we have to actually run the failure code from a worker as it may have linked callbacks, so hilariously we have to re-schedule the task instances to a worker just so they can then fail. We use a special __fail__ value for next_method to achieve this that the runtime code understands as immediate-fail, and pack the error into next_kwargs. TODO: Once we have shifted callback (and email) handling to run on workers as first-class concepts, we can run the failure code here in-process, but we can't do that right now. """fortask_instanceinsession.query(TaskInstance).filter(TaskInstance.trigger_id==trigger_id,TaskInstance.state==State.DEFERRED):# Add the error and set the next_method to the fail statetraceback=format_exception(type(exc),exc,exc.__traceback__)ifexcelseNonetask_instance.next_method="__fail__"task_instance.next_kwargs={"error":"Trigger failure","traceback":traceback}# Remove ourselves as its triggertask_instance.trigger_id=None# Finally, mark it as scheduled so it gets re-queuedtask_instance.state=State.SCHEDULED
@classmethod@provide_session
[docs]defids_for_triggerer(cls,triggerer_id,session=None):"""Retrieves a list of triggerer_ids."""return[row[0]forrowinsession.query(cls.id).filter(cls.triggerer_id==triggerer_id)]
@classmethod@provide_session
[docs]defassign_unassigned(cls,triggerer_id,capacity,session=None):""" Takes a triggerer_id and the capacity for that triggerer and assigns unassigned triggers until that capacity is reached, or there are no more unassigned triggers. """fromairflow.jobs.base_jobimportBaseJob# To avoid circular importcount=session.query(func.count(cls.id)).filter(cls.triggerer_id==triggerer_id).scalar()capacity-=countifcapacity<=0:returnalive_triggerer_ids=[row[0]forrowinsession.query(BaseJob.id).filter(BaseJob.end_date.is_(None),BaseJob.latest_heartbeat>timezone.utcnow()-datetime.timedelta(seconds=30),BaseJob.job_type=="TriggererJob",)]# Find triggers who do NOT have an alive triggerer_id, and then assign# up to `capacity` of those to us.trigger_ids_query=with_row_locks(session.query(cls.id).filter(or_(cls.triggerer_id.is_(None),cls.triggerer_id.notin_(alive_triggerer_ids))).limit(capacity),session,skip_locked=True,).all()iftrigger_ids_query:session.query(cls).filter(cls.id.in_([i.idforiintrigger_ids_query])).update({cls.triggerer_id:triggerer_id},synchronize_session=False,)session.commit()