## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportcopyimportfunctoolsimportitertoolsimportloggingimportosimportpathlibimportpickleimportsysimporttimeimporttracebackimportwarningsimportweakreffromcollectionsimportabc,defaultdict,dequefromcontextlibimportExitStackfromdatetimeimportdatetime,timedeltafrominspectimportsignaturefromtypingimport(TYPE_CHECKING,Any,Callable,Collection,Container,Generator,Iterable,Iterator,List,Pattern,Sequence,Union,cast,overload,)fromurllib.parseimporturlsplitimportjinja2importpendulumimportre2importsqlalchemy_jsonfieldfromdateutil.relativedeltaimportrelativedeltafrompackagingimportversionaspackaging_versionfromsqlalchemyimport(Boolean,Column,ForeignKey,Index,Integer,String,Text,and_,case,func,not_,or_,select,update,)fromsqlalchemy.ext.associationproxyimportassociation_proxyfromsqlalchemy.ext.hybridimporthybrid_propertyfromsqlalchemy.ormimportbackref,joinedload,load_only,relationshipfromsqlalchemy.sqlimportSelect,expressionimportairflow.templatesfromairflowimportsettings,utilsfromairflow.api_internal.internal_api_callimportinternal_api_callfromairflow.configurationimportconfasairflow_conf,secrets_backend_listfromairflow.datasetsimportBaseDataset,Dataset,DatasetAlias,DatasetAllfromairflow.datasets.managerimportdataset_managerfromairflow.exceptionsimport(AirflowDagInconsistent,AirflowException,DuplicateTaskIdFound,FailStopDagInvalidTriggerRule,ParamValidationError,RemovedInAirflow3Warning,TaskDeferred,TaskNotFound,UnknownExecutorException,)fromairflow.executors.executor_loaderimportExecutorLoaderfromairflow.jobs.jobimportrun_jobfromairflow.models.abstractoperatorimportAbstractOperator,TaskStateChangeCallbackfromairflow.models.baseimportBase,StringIDfromairflow.models.baseoperatorimportBaseOperatorfromairflow.models.dagcodeimportDagCodefromairflow.models.dagpickleimportDagPicklefromairflow.models.dagrunimportRUN_ID_REGEX,DagRunfromairflow.models.datasetimport(DatasetAliasModel,DatasetDagRunQueue,DatasetModel,)fromairflow.models.paramimportDagParam,ParamsDictfromairflow.models.taskinstanceimport(Context,TaskInstance,TaskInstanceKey,clear_task_instances,)fromairflow.models.tasklogimportLogTemplatefromairflow.providers.fabimport__version__asFAB_VERSIONfromairflow.secrets.local_filesystemimportLocalFilesystemBackendfromairflow.securityimportpermissionsfromairflow.settingsimportjsonfromairflow.statsimportStatsfromairflow.timetables.baseimportDagRunInfo,DataInterval,TimeRestriction,Timetablefromairflow.timetables.intervalimportCronDataIntervalTimetable,DeltaDataIntervalTimetablefromairflow.timetables.simpleimport(ContinuousTimetable,DatasetTriggeredTimetable,NullTimetable,OnceTimetable,)fromairflow.timetables.triggerimportCronTriggerTimetablefromairflow.utilsimporttimezonefromairflow.utils.dag_cycle_testerimportcheck_cyclefromairflow.utils.datesimportcron_presets,date_rangeasutils_date_rangefromairflow.utils.decoratorsimportfixup_decorator_warning_stackfromairflow.utils.helpersimportat_most_one,exactly_one,validate_instance_args,validate_keyfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromairflow.utils.sqlalchemyimport(Interval,UtcDateTime,lock_rows,tuple_in_condition,with_row_locks,)fromairflow.utils.stateimportDagRunState,State,TaskInstanceStatefromairflow.utils.trigger_ruleimportTriggerRulefromairflow.utils.typesimportNOTSET,ArgNotSet,DagRunType,EdgeInfoTypeifTYPE_CHECKING:fromtypesimportModuleTypefrompendulum.tz.timezoneimportFixedTimezone,Timezonefromsqlalchemy.orm.queryimportQueryfromsqlalchemy.orm.sessionimportSessionfromairflow.decoratorsimportTaskDecoratorCollectionfromairflow.models.dagbagimportDagBagfromairflow.models.operatorimportOperatorfromairflow.models.slamissimportSlaMissfromairflow.serialization.pydantic.dagimportDagModelPydanticfromairflow.serialization.pydantic.dag_runimportDagRunPydanticfromairflow.typing_compatimportLiteralfromairflow.utils.task_groupimportTaskGroup
# FIXME: Ideally this should be Union[Literal[NOTSET], ScheduleInterval],# but Mypy cannot handle that right now. Track progress of PEP 661 for progress.# See also: https://discuss.python.org/t/9126/7
[docs]classInconsistentDataInterval(AirflowException):""" Exception raised when a model populates data interval fields incorrectly. The data interval fields should either both be None (for runs scheduled prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is implemented). This is raised if exactly one of the fields is None. """_template=("Inconsistent {cls}: {start[0]}={start[1]!r}, {end[0]}={end[1]!r}, ""they must be either both None or both datetime")def__init__(self,instance:Any,start_field_name:str,end_field_name:str)->None:self._class_name=type(instance).__name__self._start_field=(start_field_name,getattr(instance,start_field_name))self._end_field=(end_field_name,getattr(instance,end_field_name))
[docs]defcreate_timetable(interval:ScheduleIntervalArg,timezone:Timezone|FixedTimezone)->Timetable:"""Create a Timetable instance from a ``schedule_interval`` argument."""ifintervalisNOTSET:returnDeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)ifintervalisNone:returnNullTimetable()ifinterval=="@once":returnOnceTimetable()ifinterval=="@continuous":returnContinuousTimetable()ifisinstance(interval,(timedelta,relativedelta)):returnDeltaDataIntervalTimetable(interval)ifisinstance(interval,str):ifairflow_conf.getboolean("scheduler","create_cron_data_intervals"):returnCronDataIntervalTimetable(interval,timezone)else:returnCronTriggerTimetable(interval,timezone=timezone)raiseValueError(f"{interval!r} is not a valid schedule_interval.")
[docs]defget_last_dagrun(dag_id,session,include_externally_triggered=False):""" Return the last dag run for a dag, None if there was none. Last dag run can be any type of run e.g. scheduled or backfilled. Overridden DagRuns are ignored. """DR=DagRunquery=select(DR).where(DR.dag_id==dag_id)ifnotinclude_externally_triggered:query=query.where(DR.external_trigger==expression.false())query=query.order_by(DR.execution_date.desc())returnsession.scalar(query.limit(1))
[docs]defget_dataset_triggered_next_run_info(dag_ids:list[str],*,session:Session)->dict[str,dict[str,int|str]]:""" Get next run info for a list of dag_ids. Given a list of dag_ids, get string representing how close any that are dataset triggered are their next run, e.g. "1 of 2 datasets updated". """fromairflow.models.datasetimportDagScheduleDatasetReference,DatasetDagRunQueueasDDRQ,DatasetModelreturn{x.dag_id:{"uri":x.uri,"ready":x.ready,"total":x.total,}forxinsession.execute(select(DagScheduleDatasetReference.dag_id,# This is a dirty hack to workaround group by requiring an aggregate,# since grouping by dataset is not what we want to do here...but it workscase((func.count()==1,func.max(DatasetModel.uri)),else_="").label("uri"),func.count().label("total"),func.sum(case((DDRQ.target_dag_id.is_not(None),1),else_=0)).label("ready"),).join(DDRQ,and_(DDRQ.dataset_id==DagScheduleDatasetReference.dataset_id,DDRQ.target_dag_id==DagScheduleDatasetReference.dag_id,),isouter=True,).join(DatasetModel,DatasetModel.id==DagScheduleDatasetReference.dataset_id).group_by(DagScheduleDatasetReference.dag_id).where(DagScheduleDatasetReference.dag_id.in_(dag_ids))).all()}
def_triggerer_is_healthy():fromairflow.jobs.triggerer_job_runnerimportTriggererJobRunnerjob=TriggererJobRunner.most_recent_job()returnjobandjob.is_alive()@provide_sessiondef_create_orm_dagrun(dag,dag_id,run_id,logical_date,start_date,external_trigger,conf,state,run_type,dag_hash,creating_job_id,data_interval,session,):run=DagRun(dag_id=dag_id,run_id=run_id,execution_date=logical_date,start_date=start_date,external_trigger=external_trigger,conf=conf,state=state,run_type=run_type,dag_hash=dag_hash,creating_job_id=creating_job_id,data_interval=data_interval,)# Load defaults into the following two fields to ensure result can be serialized detachedrun.log_template_id=int(session.scalar(select(func.max(LogTemplate.__table__.c.id))))run.consumed_dataset_events=[]session.add(run)session.flush()run.dag=dag# create the associated task instances# state is None at the moment of creationrun.verify_integrity(session=session)returnrun# TODO: The following mapping is used to validate that the arguments passed to the DAG are of the correct# type. This is a temporary solution until we find a more sophisticated method for argument validation.# One potential method is to use `get_type_hints` from the typing module. However, this is not fully# compatible with future annotations for Python versions below 3.10. Once we require a minimum Python# version that supports `get_type_hints` effectively or find a better approach, we can replace this# manual type-checking method.
[docs]classDAG(LoggingMixin):""" A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can't run until their previous schedule (and upstream tasks) are completed. DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG. Note that if you plan to use time zones all the dates provided should be pendulum dates. See :ref:`timezone_aware_dags`. .. versionadded:: 2.4 The *schedule* argument to specify either time-based scheduling logic (timetable), or dataset-driven triggers. .. deprecated:: 2.4 The arguments *schedule_interval* and *timetable*. Their functionalities are merged into the new *schedule* argument. :param dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) :param description: The description for the DAG to e.g. be shown on the webserver :param schedule: Defines the rules according to which DAG runs are scheduled. Can accept cron string, timedelta object, Timetable, or list of Dataset objects. If this is not provided, the DAG will be set to the default schedule ``timedelta(days=1)``. See also :doc:`/howto/timetable`. :param start_date: The timestamp from which the scheduler will attempt to backfill :param end_date: A date beyond which your DAG won't run, leave to None for open-ended scheduling :param template_searchpath: This list of folders (non-relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default :param template_undefined: Template undefined type. :param user_defined_macros: a dictionary of macros that will be exposed in your jinja templates. For example, passing ``dict(foo='bar')`` to this argument allows you to ``{{ foo }}`` in all jinja templates related to this DAG. Note that you can pass any type of object here. :param user_defined_filters: a dictionary of filters that will be exposed in your jinja templates. For example, passing ``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows you to ``{{ 'world' | hello }}`` in all jinja templates related to this DAG. :param default_args: A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains `'depends_on_past': True` here and `'depends_on_past': False` in the operator's call `default_args`, the actual value will be `False`. :param params: a dictionary of DAG level parameters that are made accessible in templates, namespaced under `params`. These params can be overridden at the task level. :param max_active_tasks: the number of task instances allowed to run concurrently :param max_active_runs: maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs :param max_consecutive_failed_dag_runs: (experimental) maximum number of consecutive failed DAG runs, beyond this the scheduler will disable the DAG :param dagrun_timeout: specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. :param sla_miss_callback: specify a function or list of functions to call when reporting SLA timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for more information about the function signature and parameters that are passed to the callback. :param default_view: Specify DAG default view (grid, graph, duration, gantt, landing_times), default grid :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True :param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function. :param on_success_callback: Much like the ``on_failure_callback`` except that it is executed when the dag succeeds. :param access_control: Specify optional DAG-level actions, e.g., "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}" or it can specify the resource name if there is a DAGs Run resource, e.g., "{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': {'can_read', 'can_edit', 'can_delete'}}" :param is_paused_upon_creation: Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used. :param jinja_environment_kwargs: additional configuration options to be passed to Jinja ``Environment`` for template rendering **Example**: to avoid Jinja from removing a trailing newline from template strings :: DAG( dag_id="my-dag", jinja_environment_kwargs={ "keep_trailing_newline": True, # some other jinja2 Environment options here }, ) **See**: `Jinja Environment documentation <https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_ :param render_template_as_native_obj: If True, uses a Jinja ``NativeEnvironment`` to render templates as native Python types. If False, a Jinja ``Environment`` is used to render templates as string values. :param tags: List of tags to help filtering DAGs in the UI. :param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI. Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. e.g: {"dag_owner": "https://airflow.apache.org/"} :param auto_register: Automatically register this DAG when it is used in a ``with`` block :param fail_stop: Fails currently running tasks when task in DAG fails. **Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success"). An exception will be thrown if any task in a fail stop dag has a non default trigger rule. :param dag_display_name: The display name of the DAG which appears on the UI. """_comps={"dag_id","task_ids","parent_dag","start_date","end_date","schedule_interval","fileloc","template_searchpath","last_loaded",}__serialized_fields:frozenset[str]|None=None
""" File path that needs to be imported to load this DAG or subdag. This may not be an actual file on disk in the case when this DAG is loaded from a ZIP file or other DAG distribution format. """
[docs]parent_dag:DAG|None=None# Gets set when DAGs are loaded
# NOTE: When updating arguments here, please also keep arguments in @dag()# below in sync. (Search for 'def dag(' in this file.)def__init__(self,dag_id:str,description:str|None=None,schedule:ScheduleArg=NOTSET,schedule_interval:ScheduleIntervalArg=NOTSET,timetable:Timetable|None=None,start_date:datetime|None=None,end_date:datetime|None=None,full_filepath:str|None=None,template_searchpath:str|Iterable[str]|None=None,template_undefined:type[jinja2.StrictUndefined]=jinja2.StrictUndefined,user_defined_macros:dict|None=None,user_defined_filters:dict|None=None,default_args:dict|None=None,concurrency:int|None=None,max_active_tasks:int=airflow_conf.getint("core","max_active_tasks_per_dag"),max_active_runs:int=airflow_conf.getint("core","max_active_runs_per_dag"),max_consecutive_failed_dag_runs:int=airflow_conf.getint("core","max_consecutive_failed_dag_runs_per_dag"),dagrun_timeout:timedelta|None=None,sla_miss_callback:None|SLAMissCallback|list[SLAMissCallback]=None,default_view:str=airflow_conf.get_mandatory_value("webserver","dag_default_view").lower(),orientation:str=airflow_conf.get_mandatory_value("webserver","dag_orientation"),catchup:bool=airflow_conf.getboolean("scheduler","catchup_by_default"),on_success_callback:None|DagStateChangeCallback|list[DagStateChangeCallback]=None,on_failure_callback:None|DagStateChangeCallback|list[DagStateChangeCallback]=None,doc_md:str|None=None,params:abc.MutableMapping|None=None,access_control:dict[str,dict[str,Collection[str]]]|dict[str,Collection[str]]|None=None,is_paused_upon_creation:bool|None=None,jinja_environment_kwargs:dict|None=None,render_template_as_native_obj:bool=False,tags:list[str]|None=None,owner_links:dict[str,str]|None=None,auto_register:bool=True,fail_stop:bool=False,dag_display_name:str|None=None,):fromairflow.utils.task_groupimportTaskGroupiftagsandany(len(tag)>TAG_MAX_LENfortagintags):raiseAirflowException(f"tag cannot be longer than {TAG_MAX_LEN} characters")self.owner_links=owner_linksor{}self.user_defined_macros=user_defined_macrosself.user_defined_filters=user_defined_filtersifdefault_argsandnotisinstance(default_args,dict):raiseTypeError("default_args must be a dict")self.default_args=copy.deepcopy(default_argsor{})params=paramsor{}# merging potentially conflicting default_args['params'] into paramsif"params"inself.default_args:params.update(self.default_args["params"])delself.default_args["params"]# check self.params and convert them into ParamsDictself.params=ParamsDict(params)iffull_filepath:warnings.warn("Passing full_filepath to DAG() is deprecated and has no effect",RemovedInAirflow3Warning,stacklevel=2,)validate_key(dag_id)self._dag_id=dag_idself._dag_display_property_value=dag_display_nameifconcurrency:# TODO: Remove in Airflow 3.0warnings.warn("The 'concurrency' parameter is deprecated. Please use 'max_active_tasks'.",RemovedInAirflow3Warning,stacklevel=2,)max_active_tasks=concurrencyself._max_active_tasks=max_active_tasksself._pickle_id:int|None=Noneself._description=description# set file location to caller source pathback=sys._getframe().f_backself.fileloc=back.f_code.co_filenameifbackelse""self.task_dict:dict[str,Operator]={}# set timezone from start_datetz=Noneifstart_dateandstart_date.tzinfo:tzinfo=Noneifstart_date.tzinfoelsesettings.TIMEZONEtz=pendulum.instance(start_date,tz=tzinfo).timezoneelifdate:=self.default_args.get("start_date"):ifnotisinstance(date,datetime):date=timezone.parse(date)self.default_args["start_date"]=datestart_date=datetzinfo=Noneifdate.tzinfoelsesettings.TIMEZONEtz=pendulum.instance(date,tz=tzinfo).timezoneself.timezone:Timezone|FixedTimezone=tzorsettings.TIMEZONE# Apply the timezone we settled on to end_date if it wasn't suppliedifisinstance(_end_date:=self.default_args.get("end_date"),str):self.default_args["end_date"]=timezone.parse(_end_date,timezone=self.timezone)self.start_date=timezone.convert_to_utc(start_date)self.end_date=timezone.convert_to_utc(end_date)# also convert tasksif"start_date"inself.default_args:self.default_args["start_date"]=timezone.convert_to_utc(self.default_args["start_date"])if"end_date"inself.default_args:self.default_args["end_date"]=timezone.convert_to_utc(self.default_args["end_date"])# sort out DAG's scheduling behaviorscheduling_args=[schedule_interval,timetable,schedule]has_scheduling_args=any(aisnotNOTSETandbool(a)forainscheduling_args)has_empty_start_date=not("start_date"inself.default_argsorself.start_date)ifhas_scheduling_argsandhas_empty_start_date:raiseValueError("DAG is missing the start_date parameter")ifnotat_most_one(*scheduling_args):raiseValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.")ifschedule_intervalisnotNOTSET:warnings.warn("Param `schedule_interval` is deprecated and will be removed in a future release. ""Please use `schedule` instead. ",RemovedInAirflow3Warning,stacklevel=2,)iftimetableisnotNone:warnings.warn("Param `timetable` is deprecated and will be removed in a future release. ""Please use `schedule` instead. ",RemovedInAirflow3Warning,stacklevel=2,)iftimetableisnotNone:schedule=timetableelifschedule_intervalisnotNOTSET:schedule=schedule_interval# Kept for compatibility. Do not use in new code.self.schedule_interval:ScheduleIntervalifisinstance(schedule,Timetable):self.timetable=scheduleself.schedule_interval=schedule.summaryelifisinstance(schedule,BaseDataset):self.timetable=DatasetTriggeredTimetable(schedule)self.schedule_interval=self.timetable.summaryelifisinstance(schedule,Collection)andnotisinstance(schedule,str):ifnotall(isinstance(x,(Dataset,DatasetAlias))forxinschedule):raiseValueError("All elements in 'schedule' should be datasets or dataset aliases")self.timetable=DatasetTriggeredTimetable(DatasetAll(*schedule))self.schedule_interval=self.timetable.summaryelifisinstance(schedule,ArgNotSet):warnings.warn("Creating a DAG with an implicit schedule is deprecated, and will stop working ""in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.",RemovedInAirflow3Warning,stacklevel=2,)self.timetable=create_timetable(schedule,self.timezone)self.schedule_interval=DEFAULT_SCHEDULE_INTERVALelse:self.timetable=create_timetable(schedule,self.timezone)self.schedule_interval=scheduleifisinstance(template_searchpath,str):template_searchpath=[template_searchpath]self.template_searchpath=template_searchpathself.template_undefined=template_undefinedself.last_loaded:datetime=timezone.utcnow()self.safe_dag_id=dag_id.replace(".","__dot__")self.max_active_runs=max_active_runsself.max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runsifself.max_consecutive_failed_dag_runs==0:self.max_consecutive_failed_dag_runs=airflow_conf.getint("core","max_consecutive_failed_dag_runs_per_dag")ifself.max_consecutive_failed_dag_runs<0:raiseAirflowException(f"Invalid max_consecutive_failed_dag_runs: {self.max_consecutive_failed_dag_runs}."f"Requires max_consecutive_failed_dag_runs >= 0")ifself.timetable.active_runs_limitisnotNone:ifself.timetable.active_runs_limit<self.max_active_runs:raiseAirflowException(f"Invalid max_active_runs: {type(self.timetable)} "f"requires max_active_runs <= {self.timetable.active_runs_limit}")self.dagrun_timeout=dagrun_timeoutself.sla_miss_callback=sla_miss_callbackifdefault_viewinDEFAULT_VIEW_PRESETS:self._default_view:str=default_viewelifdefault_view=="tree":warnings.warn("`default_view` of 'tree' has been renamed to 'grid' -- please update your DAG",RemovedInAirflow3Warning,stacklevel=2,)self._default_view="grid"else:raiseAirflowException(f"Invalid values of dag.default_view: only support "f"{DEFAULT_VIEW_PRESETS}, but get {default_view}")iforientationinORIENTATION_PRESETS:self.orientation=orientationelse:raiseAirflowException(f"Invalid values of dag.orientation: only support "f"{ORIENTATION_PRESETS}, but get {orientation}")self.catchup:bool=catchupself.partial:bool=Falseself.on_success_callback=on_success_callbackself.on_failure_callback=on_failure_callback# Keeps track of any extra edge metadata (sparse; will not contain all# edges, so do not iterate over it for that). Outer key is upstream# task ID, inner key is downstream task ID.self.edge_info:dict[str,dict[str,EdgeInfoType]]={}# To keep it in parity with Serialized DAGs# and identify if DAG has on_*_callback without actually storing them in Serialized JSONself.has_on_success_callback:bool=self.on_success_callbackisnotNoneself.has_on_failure_callback:bool=self.on_failure_callbackisnotNoneself._access_control=DAG._upgrade_outdated_dag_access_control(access_control)self.is_paused_upon_creation=is_paused_upon_creationself.auto_register=auto_registerself.fail_stop:bool=fail_stopself.jinja_environment_kwargs=jinja_environment_kwargsself.render_template_as_native_obj=render_template_as_native_objself.doc_md=self.get_doc_md(doc_md)self.tags=tagsor[]self._task_group=TaskGroup.create_root(self)self.validate_schedule_and_params()wrong_links=dict(self.iter_invalid_owner_links())ifwrong_links:raiseAirflowException("Wrong link format was used for the owner. Use a valid link \n"f"Bad formatted links are: {wrong_links}")# this will only be set at serialization time# it's only use is for determining the relative# fileloc based only on the serialize dagself._processor_dags_folder=Nonevalidate_instance_args(self,DAG_ARGS_EXPECTED_TYPES)
def_check_schedule_interval_matches_timetable(self)->bool:""" Check ``schedule_interval`` and ``timetable`` match. This is done as a part of the DAG validation done before it's bagged, to guard against the DAG's ``timetable`` (or ``schedule_interval``) from being changed after it's created, e.g. .. code-block:: python dag1 = DAG("d1", timetable=MyTimetable()) dag1.schedule_interval = "@once" dag2 = DAG("d2", schedule="@once") dag2.timetable = MyTimetable() Validation is done by creating a timetable and check its summary matches ``schedule_interval``. The logic is not bullet-proof, especially if a custom timetable does not provide a useful ``summary``. But this is the best we can do. """ifself.schedule_interval==self.timetable.summary:returnTruetry:timetable=create_timetable(self.schedule_interval,self.timezone)exceptValueError:returnFalsereturntimetable.summary==self.timetable.summary
[docs]defvalidate(self):""" Validate the DAG has a coherent setup. This is called by the DAG bag before bagging the DAG. """ifnotself._check_schedule_interval_matches_timetable():raiseAirflowDagInconsistent(f"inconsistent schedule: timetable {self.timetable.summary!r} "f"does not match schedule_interval {self.schedule_interval!r}",)self.validate_executor_field()self.validate_schedule_and_params()self.timetable.validate()self.validate_setup_teardown()
[docs]defvalidate_executor_field(self):fortaskinself.tasks:iftask.executor:try:ExecutorLoader.lookup_executor_name_by_str(task.executor)exceptUnknownExecutorException:raiseUnknownExecutorException(f"The specified executor {task.executor} for task {task.task_id} is not ""configured. Review the core.executors Airflow configuration to add it or ""update the executor configuration for this task.")
defvalidate_setup_teardown(self):""" Validate that setup and teardown tasks are configured properly. :meta private: """fortaskinself.tasks:iftask.is_setup:fordown_taskintask.downstream_list:ifnotdown_task.is_teardownanddown_task.trigger_rule!=TriggerRule.ALL_SUCCESS:# todo: we can relax this to allow out-of-scope tasks to have other trigger rules# this is required to ensure consistent behavior of dag# when clearing an indirect setupraiseValueError("Setup tasks must be followed with trigger rule ALL_SUCCESS.")FailStopDagInvalidTriggerRule.check(dag=self,trigger_rule=task.trigger_rule)
[docs]def__eq__(self,other):iftype(self)istype(other):# Use getattr() instead of __dict__ as __dict__ doesn't return# correct values for properties.returnall(getattr(self,c,None)==getattr(other,c,None)forcinself._comps)returnFalse
[docs]def__hash__(self):hash_components=[type(self)]forcinself._comps:# task_ids returns a list and lists can't be hashedifc=="task_ids":val=tuple(self.task_dict)else:val=getattr(self,c,None)try:hash(val)hash_components.append(val)exceptTypeError:hash_components.append(repr(val))returnhash(tuple(hash_components))
# /Context Manager ----------------------------------------------@staticmethoddef_upgrade_outdated_dag_access_control(access_control=None):""" Look for outdated dag level actions in DAG access_controls and replace them with updated actions. For example, in DAG access_control {'role1': {'can_dag_read'}} 'can_dag_read' will be replaced with 'can_read', in {'role2': {'can_dag_read', 'can_dag_edit'}} 'can_dag_edit' will be replaced with 'can_edit', etc. """ifaccess_controlisNone:returnNonenew_dag_perm_mapping={permissions.DEPRECATED_ACTION_CAN_DAG_READ:permissions.ACTION_CAN_READ,permissions.DEPRECATED_ACTION_CAN_DAG_EDIT:permissions.ACTION_CAN_EDIT,}defupdate_old_perm(permission:str):new_perm=new_dag_perm_mapping.get(permission,permission)ifnew_perm!=permission:warnings.warn(f"The '{permission}' permission is deprecated. Please use '{new_perm}'.",RemovedInAirflow3Warning,stacklevel=3,)returnnew_permupdated_access_control={}forrole,permsinaccess_control.items():ifpackaging_version.parse(FAB_VERSION)>=packaging_version.parse("1.3.0"):updated_access_control[role]=updated_access_control.get(role,{})ifisinstance(perms,(set,list)):# Support for old-style access_control where only the actions are specifiedupdated_access_control[role][permissions.RESOURCE_DAG]=set(perms)else:updated_access_control[role]=permsifpermissions.RESOURCE_DAGinupdated_access_control[role]:updated_access_control[role][permissions.RESOURCE_DAG]={update_old_perm(perm)forperminupdated_access_control[role][permissions.RESOURCE_DAG]}elifisinstance(perms,dict):# Not allow new access control format with old FAB versionsraiseAirflowException("Please upgrade the FAB provider to a version >= 1.3.0 to allow ""use the Dag Level Access Control new format.")else:updated_access_control[role]={update_old_perm(perm)forperminperms}returnupdated_access_control
[docs]defdate_range(self,start_date:pendulum.DateTime,num:int|None=None,end_date:datetime|None=None,)->list[datetime]:message="`DAG.date_range()` is deprecated."ifnumisnotNone:warnings.warn(message,category=RemovedInAirflow3Warning,stacklevel=2)withwarnings.catch_warnings():warnings.simplefilter("ignore",RemovedInAirflow3Warning)returnutils_date_range(start_date=start_date,num=num,delta=self.normalized_schedule_interval)message+=" Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead."warnings.warn(message,category=RemovedInAirflow3Warning,stacklevel=2)ifend_dateisNone:coerced_end_date=timezone.utcnow()else:coerced_end_date=end_dateit=self.iter_dagrun_infos_between(start_date,pendulum.instance(coerced_end_date),align=False)return[info.logical_dateforinfoinit]
[docs]defis_fixed_time_schedule(self):""" Figures out if the schedule has a fixed time (e.g. 3 AM every day). Detection is done by "peeking" the next two cron trigger time; if the two times have the same minute and hour value, the schedule is fixed, and we *don't* need to perform the DST fix. This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). Do not try to understand what this actually means. It is old logic that should not be used anywhere. """warnings.warn("`DAG.is_fixed_time_schedule()` is deprecated.",category=RemovedInAirflow3Warning,stacklevel=2,)fromairflow.timetables._cronimportCronMixinifnotisinstance(self.timetable,CronMixin):returnTruefromcroniterimportcronitercron=croniter(self.timetable._expression)next_a=cron.get_next(datetime)next_b=cron.get_next(datetime)returnnext_b.minute==next_a.minuteandnext_b.hour==next_a.hour
[docs]deffollowing_schedule(self,dttm):""" Calculate the following schedule for this dag in UTC. :param dttm: utc datetime :return: utc datetime """warnings.warn("`DAG.following_schedule()` is deprecated. Use `DAG.next_dagrun_info(restricted=False)` instead.",category=RemovedInAirflow3Warning,stacklevel=2,)data_interval=self.infer_automated_data_interval(timezone.coerce_datetime(dttm))next_info=self.next_dagrun_info(data_interval,restricted=False)ifnext_infoisNone:returnNonereturnnext_info.data_interval.start
[docs]defprevious_schedule(self,dttm):fromairflow.timetables.intervalimport_DataIntervalTimetablewarnings.warn("`DAG.previous_schedule()` is deprecated.",category=RemovedInAirflow3Warning,stacklevel=2,)ifnotisinstance(self.timetable,_DataIntervalTimetable):returnNonereturnself.timetable._get_prev(timezone.coerce_datetime(dttm))
defget_next_data_interval(self,dag_model:DagModel)->DataInterval|None:""" Get the data interval of the next scheduled run. For compatibility, this method infers the data interval from the DAG's schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39. This function is private to Airflow core and should not be depended on as a part of the Python API. :meta private: """ifself.dag_id!=dag_model.dag_id:raiseValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")ifdag_model.next_dagrunisNone:# Next run not scheduled.returnNonedata_interval=dag_model.next_dagrun_data_intervalifdata_intervalisnotNone:returndata_interval# Compatibility: A run was scheduled without an explicit data interval.# This means the run was scheduled before AIP-39 implementation. Try to# infer from the logical date.returnself.infer_automated_data_interval(dag_model.next_dagrun)defget_run_data_interval(self,run:DagRun|DagRunPydantic)->DataInterval:""" Get the data interval of this run. For compatibility, this method infers the data interval from the DAG's schedule if the run does not have an explicit one set, which is possible for runs created prior to AIP-39. This function is private to Airflow core and should not be depended on as a part of the Python API. :meta private: """ifrun.dag_idisnotNoneandrun.dag_id!=self.dag_id:raiseValueError(f"Arguments refer to different DAGs: {self.dag_id} != {run.dag_id}")data_interval=_get_model_data_interval(run,"data_interval_start","data_interval_end")ifdata_intervalisnotNone:returndata_interval# Compatibility: runs created before AIP-39 implementation don't have an# explicit data interval. Try to infer from the logical date.returnself.infer_automated_data_interval(run.execution_date)definfer_automated_data_interval(self,logical_date:datetime)->DataInterval:""" Infer a data interval for a run against this DAG. This method is used to bridge runs created prior to AIP-39 implementation, which do not have an explicit data interval. Therefore, this method only considers ``schedule_interval`` values valid prior to Airflow 2.2. DO NOT call this method if there is a known data interval. :meta private: """timetable_type=type(self.timetable)ifissubclass(timetable_type,(NullTimetable,OnceTimetable,DatasetTriggeredTimetable)):returnDataInterval.exact(timezone.coerce_datetime(logical_date))start=timezone.coerce_datetime(logical_date)ifissubclass(timetable_type,CronDataIntervalTimetable):end=cast(CronDataIntervalTimetable,self.timetable)._get_next(start)elifissubclass(timetable_type,DeltaDataIntervalTimetable):end=cast(DeltaDataIntervalTimetable,self.timetable)._get_next(start)# Contributors: When the exception below is raised, you might want to# add an 'elif' block here to handle custom timetables. Stop! The bug# you're looking for is instead at when the DAG run (represented by# logical_date) was created. See GH-31969 for an example:# * Wrong fix: GH-32074 (modifies this function).# * Correct fix: GH-32118 (modifies the DAG run creation code).else:raiseValueError(f"Not a valid timetable: {self.timetable!r}")returnDataInterval(start,end)
[docs]defnext_dagrun_info(self,last_automated_dagrun:None|datetime|DataInterval,*,restricted:bool=True,)->DagRunInfo|None:""" Get information about the next DagRun of this dag after ``date_last_automated_dagrun``. This calculates what time interval the next DagRun should operate on (its execution date) and when it can be scheduled, according to the dag's timetable, start_date, end_date, etc. This doesn't check max active run or any other "max_active_tasks" type limits, but only performs calculations based on the various date and interval fields of this dag and its tasks. :param last_automated_dagrun: The ``max(execution_date)`` of existing "automated" DagRuns for this dag (scheduled or backfill, but not manual). :param restricted: If set to *False* (default is *True*), ignore ``start_date``, ``end_date``, and ``catchup`` specified on the DAG or tasks. :return: DagRunInfo of the next dagrun, or None if a dagrun is not going to be scheduled. """# Never schedule a subdag. It will be scheduled by its parent dag.ifself.is_subdag:returnNonedata_interval=Noneifisinstance(last_automated_dagrun,datetime):warnings.warn("Passing a datetime to DAG.next_dagrun_info is deprecated. Use a DataInterval instead.",RemovedInAirflow3Warning,stacklevel=2,)data_interval=self.infer_automated_data_interval(timezone.coerce_datetime(last_automated_dagrun))else:data_interval=last_automated_dagrunifrestricted:restriction=self._time_restrictionelse:restriction=TimeRestriction(earliest=None,latest=None,catchup=True)try:info=self.timetable.next_dagrun_info(last_automated_data_interval=data_interval,restriction=restriction,)exceptException:self.log.exception("Failed to fetch run info after data interval %s for DAG %r",data_interval,self.dag_id,)info=Nonereturninfo
[docs]defnext_dagrun_after_date(self,date_last_automated_dagrun:pendulum.DateTime|None):warnings.warn("`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.",category=RemovedInAirflow3Warning,stacklevel=2,)ifdate_last_automated_dagrunisNone:data_interval=Noneelse:data_interval=self.infer_automated_data_interval(date_last_automated_dagrun)info=self.next_dagrun_info(data_interval)ifinfoisNone:returnNonereturninfo.run_after
@functools.cached_propertydef_time_restriction(self)->TimeRestriction:start_dates=[t.start_datefortinself.tasksift.start_date]ifself.start_dateisnotNone:start_dates.append(self.start_date)earliest=Noneifstart_dates:earliest=timezone.coerce_datetime(min(start_dates))latest=self.end_dateend_dates=[t.end_datefortinself.tasksift.end_date]iflen(end_dates)==len(self.tasks):# not exists null end_dateifself.end_dateisnotNone:end_dates.append(self.end_date)ifend_dates:latest=timezone.coerce_datetime(max(end_dates))returnTimeRestriction(earliest,latest,self.catchup)
[docs]defiter_dagrun_infos_between(self,earliest:pendulum.DateTime|None,latest:pendulum.DateTime,*,align:bool=True,)->Iterable[DagRunInfo]:""" Yield DagRunInfo using this DAG's timetable between given interval. DagRunInfo instances yielded if their ``logical_date`` is not earlier than ``earliest``, nor later than ``latest``. The instances are ordered by their ``logical_date`` from earliest to latest. If ``align`` is ``False``, the first run will happen immediately on ``earliest``, even if it does not fall on the logical timetable schedule. The default is ``True``, but subdags will ignore this value and always behave as if this is set to ``False`` for backward compatibility. Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00`` if ``align=True``. """ifearliestisNone:earliest=self._time_restriction.earliestifearliestisNone:raiseValueError("earliest was None and we had no value in time_restriction to fallback on")earliest=timezone.coerce_datetime(earliest)latest=timezone.coerce_datetime(latest)restriction=TimeRestriction(earliest,latest,catchup=True)# HACK: Sub-DAGs are currently scheduled differently. For example, say# the schedule is @daily and start is 2021-06-03 22:16:00, a top-level# DAG should be first scheduled to run on midnight 2021-06-04, but a# sub-DAG should be first scheduled to run RIGHT NOW. We can change# this, but since sub-DAGs are going away in 3.0 anyway, let's keep# compatibility for now and remove this entirely later.ifself.is_subdag:align=Falsetry:info=self.timetable.next_dagrun_info(last_automated_data_interval=None,restriction=restriction,)exceptException:self.log.exception("Failed to fetch run info after data interval %s for DAG %r",None,self.dag_id,)info=NoneifinfoisNone:# No runs to be scheduled between the user-supplied timeframe. But# if align=False, "invent" a data interval for the timeframe itself.ifnotalign:yieldDagRunInfo.interval(earliest,latest)return# If align=False and earliest does not fall on the timetable's logical# schedule, "invent" a data interval for it.ifnotalignandinfo.logical_date!=earliest:yieldDagRunInfo.interval(earliest,info.data_interval.start)# Generate naturally according to schedule.whileinfoisnotNone:yieldinfotry:info=self.timetable.next_dagrun_info(last_automated_data_interval=info.data_interval,restriction=restriction,)exceptException:self.log.exception("Failed to fetch run info after data interval %s for DAG %r",info.data_intervalifinfoelse"<NONE>",self.dag_id,)break
[docs]defget_run_dates(self,start_date,end_date=None)->list:""" Return a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates. :param start_date: The start date of the interval. :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``. :return: A list of dates within the interval following the dag's schedule. """warnings.warn("`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.",category=RemovedInAirflow3Warning,stacklevel=2,)earliest=timezone.coerce_datetime(start_date)ifend_dateisNone:latest=pendulum.now(timezone.utc)else:latest=timezone.coerce_datetime(end_date)return[info.logical_dateforinfoinself.iter_dagrun_infos_between(earliest,latest)]
[docs]defnormalize_schedule(self,dttm):warnings.warn("`DAG.normalize_schedule()` is deprecated.",category=RemovedInAirflow3Warning,stacklevel=2,)withwarnings.catch_warnings():warnings.simplefilter("ignore",RemovedInAirflow3Warning)following=self.following_schedule(dttm)ifnotfollowing:# in case of @oncereturndttmwithwarnings.catch_warnings():warnings.simplefilter("ignore",RemovedInAirflow3Warning)previous_of_following=self.previous_schedule(following)ifprevious_of_following!=dttm:returnfollowingreturndttm
@propertydeffull_filepath(self)->str:""" Full file path to the DAG. :meta private: """warnings.warn("DAG.full_filepath is deprecated in favour of fileloc",RemovedInAirflow3Warning,stacklevel=2,)returnself.fileloc@full_filepath.setterdeffull_filepath(self,value)->None:warnings.warn("DAG.full_filepath is deprecated in favour of fileloc",RemovedInAirflow3Warning,stacklevel=2,)self.fileloc=value@property
[docs]defconcurrency(self)->int:# TODO: Remove in Airflow 3.0warnings.warn("The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.",RemovedInAirflow3Warning,stacklevel=2,)returnself._max_active_tasks
[docs]defparam(self,name:str,default:Any=NOTSET)->DagParam:""" Return a DagParam object for current dag. :param name: dag parameter name. :param default: fallback value for dag parameter. :return: DagParam instance for specified name and current dag. """returnDagParam(current_dag=self,name=name,default=default)
@propertydeffilepath(self)->str:""" Relative file path to the DAG. :meta private: """warnings.warn("filepath is deprecated, use relative_fileloc instead",RemovedInAirflow3Warning,stacklevel=2,)returnstr(self.relative_fileloc)@property
[docs]defrelative_fileloc(self)->pathlib.Path:"""File location of the importable dag 'file' relative to the configured DAGs folder."""path=pathlib.Path(self.fileloc)try:rel_path=path.relative_to(self._processor_dags_folderorsettings.DAGS_FOLDER)ifrel_path==pathlib.Path("."):returnpathelse:returnrel_pathexceptValueError:# Not relative to DAGS_FOLDER.returnpath
@property
[docs]deffolder(self)->str:"""Folder location of where the DAG object is instantiated."""returnos.path.dirname(self.fileloc)
@property
[docs]defowner(self)->str:""" Return list of all owners found in DAG tasks. :return: Comma separated list of owners in DAG tasks """return", ".join({t.ownerfortinself.tasks})
[docs]defget_concurrency_reached(self,session=NEW_SESSION)->bool:"""Return a boolean indicating whether the max_active_tasks limit for this DAG has been reached."""TI=TaskInstancetotal_tasks=session.scalar(select(func.count(TI.task_id)).where(TI.dag_id==self.dag_id,TI.state==TaskInstanceState.RUNNING,))returntotal_tasks>=self.max_active_tasks
@property
[docs]defconcurrency_reached(self):"""Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated."""warnings.warn("This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.",RemovedInAirflow3Warning,stacklevel=2,)returnself.get_concurrency_reached()
@provide_session
[docs]defget_is_active(self,session=NEW_SESSION)->None:"""Return a boolean indicating whether this DAG is active."""returnsession.scalar(select(DagModel.is_active).where(DagModel.dag_id==self.dag_id))
@provide_session
[docs]defget_is_paused(self,session=NEW_SESSION)->None:"""Return a boolean indicating whether this DAG is paused."""returnsession.scalar(select(DagModel.is_paused).where(DagModel.dag_id==self.dag_id))
@property
[docs]defis_paused(self):"""Use `airflow.models.DAG.get_is_paused`, this attribute is deprecated."""warnings.warn("This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method.",RemovedInAirflow3Warning,stacklevel=2,)returnself.get_is_paused()
@property
[docs]defnormalized_schedule_interval(self)->ScheduleInterval:warnings.warn("DAG.normalized_schedule_interval() is deprecated.",category=RemovedInAirflow3Warning,stacklevel=2,)ifisinstance(self.schedule_interval,str)andself.schedule_intervalincron_presets:_schedule_interval:ScheduleInterval=cron_presets.get(self.schedule_interval)elifself.schedule_interval=="@once":_schedule_interval=Noneelse:_schedule_interval=self.schedule_intervalreturn_schedule_interval
@staticmethod@internal_api_call@provide_session
[docs]deffetch_callback(dag:DAG,dag_run_id:str,success:bool=True,reason:str|None=None,*,session:Session=NEW_SESSION,)->tuple[list[TaskStateChangeCallback],Context]|None:""" Fetch the appropriate callbacks depending on the value of success. This method gets the context of a single TaskInstance part of this DagRun and returns it along the list of callbacks. :param dag: DAG object :param dag_run_id: The DAG run ID :param success: Flag to specify if failure or success callback should be called :param reason: Completion reason :param session: Database session """callbacks=dag.on_success_callbackifsuccesselsedag.on_failure_callbackifcallbacks:dagrun=DAG.fetch_dagrun(dag_id=dag.dag_id,run_id=dag_run_id,session=session)callbacks=callbacksifisinstance(callbacks,list)else[callbacks]tis=dagrun.get_task_instances(session=session)# tis from a dagrun may not be a part of dag.partial_subset,# since dag.partial_subset is a subset of the dag.# This ensures that we will only use the accessible TI# context for the callback.ifdag.partial:tis=[tifortiintisifnotti.state==State.NONE]# filter out removed taskstis=[tifortiintisifti.state!=TaskInstanceState.REMOVED]ti=tis[-1]# get first TaskInstance of DagRunti.task=dag.get_task(ti.task_id)context=ti.get_template_context(session=session)context["reason"]=reasonreturncallbacks,contextreturnNone
@provide_session
[docs]defhandle_callback(self,dagrun:DagRun,success=True,reason=None,session=NEW_SESSION):""" Triggers on_failure_callback or on_success_callback as appropriate. This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a 'reason', primarily to differentiate DagRun failures. .. note: The logs end up in ``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log`` :param dagrun: DagRun object :param success: Flag to specify if failure or success callback should be called :param reason: Completion reason :param session: Database session """callbacks,context=DAG.fetch_callback(dag=self,dag_run_id=dagrun.run_id,success=success,reason=reason,session=session)or(None,None)DAG.execute_callback(callbacks,context,self.dag_id)
@classmethod
[docs]defexecute_callback(cls,callbacks:list[Callable]|None,context:Context|None,dag_id:str):""" Triggers the callbacks with the given context. :param callbacks: List of callbacks to call :param context: Context to pass to all callbacks :param dag_id: The dag_id of the DAG to find. """ifcallbacksandcontext:forcallbackincallbacks:cls.logger().info("Executing dag callback function: %s",callback)try:callback(context)exceptException:cls.logger().exception("failed to invoke dag state update callback")Stats.incr("dag.callback_exceptions",tags={"dag_id":dag_id})
[docs]defget_active_runs(self):""" Return a list of dag run execution dates currently running. :return: List of execution dates """runs=DagRun.find(dag_id=self.dag_id,state=DagRunState.RUNNING)active_dates=[]forruninruns:active_dates.append(run.execution_date)returnactive_dates
@provide_session
[docs]defget_num_active_runs(self,external_trigger=None,only_running=True,session=NEW_SESSION):""" Return the number of active "running" dag runs. :param external_trigger: True for externally triggered active dag runs :param session: :return: number greater than 0 for active dag runs """query=select(func.count()).where(DagRun.dag_id==self.dag_id)ifonly_running:query=query.where(DagRun.state==DagRunState.RUNNING)else:query=query.where(DagRun.state.in_({DagRunState.RUNNING,DagRunState.QUEUED}))ifexternal_triggerisnotNone:query=query.where(DagRun.external_trigger==(expression.true()ifexternal_triggerelseexpression.false()))returnsession.scalar(query)
@staticmethod@internal_api_call@provide_session
[docs]deffetch_dagrun(dag_id:str,execution_date:datetime|None=None,run_id:str|None=None,session:Session=NEW_SESSION,)->DagRun|DagRunPydantic:""" Return the dag run for a given execution date or run_id if it exists, otherwise none. :param dag_id: The dag_id of the DAG to find. :param execution_date: The execution date of the DagRun to find. :param run_id: The run_id of the DagRun to find. :param session: :return: The DagRun if found, otherwise None. """ifnot(execution_dateorrun_id):raiseTypeError("You must provide either the execution_date or the run_id")query=select(DagRun)ifexecution_date:query=query.where(DagRun.dag_id==dag_id,DagRun.execution_date==execution_date)ifrun_id:query=query.where(DagRun.dag_id==dag_id,DagRun.run_id==run_id)returnsession.scalar(query)
[docs]defget_dagruns_between(self,start_date,end_date,session=NEW_SESSION):""" Return the list of dag runs between start_date (inclusive) and end_date (inclusive). :param start_date: The starting execution date of the DagRun to find. :param end_date: The ending execution date of the DagRun to find. :param session: :return: The list of DagRuns found. """dagruns=session.scalars(select(DagRun).where(DagRun.dag_id==self.dag_id,DagRun.execution_date>=start_date,DagRun.execution_date<=end_date,)).all()returndagruns
@provide_session
[docs]defget_latest_execution_date(self,session:Session=NEW_SESSION)->pendulum.DateTime|None:"""Return the latest date for which at least one dag run exists."""returnsession.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id==self.dag_id))
@property
[docs]deflatest_execution_date(self):"""Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated."""warnings.warn("This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.",RemovedInAirflow3Warning,stacklevel=2,)returnself.get_latest_execution_date()
@property
[docs]defsubdags(self):"""Return a list of the subdag objects associated to this DAG."""# Check SubDag for class but don't check class directlyfromairflow.operators.subdagimportSubDagOperatorsubdag_lst=[]fortaskinself.tasks:if(isinstance(task,SubDagOperator)or# TODO remove in Airflow 2.0type(task).__name__=="SubDagOperator"ortask.task_type=="SubDagOperator"):subdag_lst.append(task.subdag)subdag_lst+=task.subdag.subdagsreturnsubdag_lst
[docs]defget_template_env(self,*,force_sandboxed:bool=False)->jinja2.Environment:"""Build a Jinja2 environment."""# Collect directories to search for template filessearchpath=[self.folder]ifself.template_searchpath:searchpath+=self.template_searchpath# Default values (for backward compatibility)jinja_env_options={"loader":jinja2.FileSystemLoader(searchpath),"undefined":self.template_undefined,"extensions":["jinja2.ext.do"],"cache_size":0,}ifself.jinja_environment_kwargs:jinja_env_options.update(self.jinja_environment_kwargs)env:jinja2.Environmentifself.render_template_as_native_objandnotforce_sandboxed:env=airflow.templates.NativeEnvironment(**jinja_env_options)else:env=airflow.templates.SandboxedEnvironment(**jinja_env_options)# Add any user defined items. Safe to edit globals as long as no templates are rendered yet.# http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globalsifself.user_defined_macros:env.globals.update(self.user_defined_macros)ifself.user_defined_filters:env.filters.update(self.user_defined_filters)returnenv
[docs]defset_dependency(self,upstream_task_id,downstream_task_id):"""Set dependency between two tasks that already have been added to the DAG using add_task()."""self.get_task(upstream_task_id).set_downstream(self.get_task(downstream_task_id))
@provide_session
[docs]defget_task_instances_before(self,base_date:datetime,num:int,*,session:Session=NEW_SESSION,)->list[TaskInstance]:""" Get ``num`` task instances before (including) ``base_date``. The returned list may contain exactly ``num`` task instances corresponding to any DagRunType. It can have less if there are less than ``num`` scheduled DAG runs before ``base_date``. """execution_dates:list[Any]=session.execute(select(DagRun.execution_date).where(DagRun.dag_id==self.dag_id,DagRun.execution_date<=base_date,).order_by(DagRun.execution_date.desc()).limit(num)).all()ifnotexecution_dates:returnself.get_task_instances(start_date=base_date,end_date=base_date,session=session)min_date:datetime|None=execution_dates[-1]._mapping.get("execution_date")# getting the last value from the listreturnself.get_task_instances(start_date=min_date,end_date=base_date,session=session)
@overloaddef_get_task_instances(self,*,task_ids:Collection[str|tuple[str,int]]|None,start_date:datetime|None,end_date:datetime|None,run_id:str|None,state:TaskInstanceState|Sequence[TaskInstanceState],include_subdags:bool,include_parentdag:bool,include_dependent_dags:bool,exclude_task_ids:Collection[str|tuple[str,int]]|None,session:Session,dag_bag:DagBag|None=...,)->Iterable[TaskInstance]:...# pragma: no cover@overloaddef_get_task_instances(self,*,task_ids:Collection[str|tuple[str,int]]|None,as_pk_tuple:Literal[True],start_date:datetime|None,end_date:datetime|None,run_id:str|None,state:TaskInstanceState|Sequence[TaskInstanceState],include_subdags:bool,include_parentdag:bool,include_dependent_dags:bool,exclude_task_ids:Collection[str|tuple[str,int]]|None,session:Session,dag_bag:DagBag|None=...,recursion_depth:int=...,max_recursion_depth:int=...,visited_external_tis:set[TaskInstanceKey]=...,)->set[TaskInstanceKey]:...# pragma: no coverdef_get_task_instances(self,*,task_ids:Collection[str|tuple[str,int]]|None,as_pk_tuple:Literal[True,None]=None,start_date:datetime|None,end_date:datetime|None,run_id:str|None,state:TaskInstanceState|Sequence[TaskInstanceState],include_subdags:bool,include_parentdag:bool,include_dependent_dags:bool,exclude_task_ids:Collection[str|tuple[str,int]]|None,session:Session,dag_bag:DagBag|None=None,recursion_depth:int=0,max_recursion_depth:int|None=None,visited_external_tis:set[TaskInstanceKey]|None=None,)->Iterable[TaskInstance]|set[TaskInstanceKey]:TI=TaskInstance# If we are looking at subdags/dependent dags we want to avoid UNION calls# in SQL (it doesn't play nice with fields that have no equality operator,# like JSON types), we instead build our result set separately.## This will be empty if we are only looking at one dag, in which case# we can return the filtered TI query object directly.result:set[TaskInstanceKey]=set()# Do we want full objects, or just the primary columns?ifas_pk_tuple:tis=select(TI.dag_id,TI.task_id,TI.run_id,TI.map_index)else:tis=select(TaskInstance)tis=tis.join(TaskInstance.dag_run)ifinclude_subdags:# Crafting the right filter for dag_id and task_ids comboconditions=[]fordagin[*self.subdags,self]:conditions.append((TaskInstance.dag_id==dag.dag_id)&TaskInstance.task_id.in_(dag.task_ids))tis=tis.where(or_(*conditions))elifself.partial:tis=tis.where(TaskInstance.dag_id==self.dag_id,TaskInstance.task_id.in_(self.task_ids))else:tis=tis.where(TaskInstance.dag_id==self.dag_id)ifrun_id:tis=tis.where(TaskInstance.run_id==run_id)ifstart_date:tis=tis.where(DagRun.execution_date>=start_date)iftask_idsisnotNone:tis=tis.where(TaskInstance.ti_selector_condition(task_ids))# This allows allow_trigger_in_future config to take affect, rather than mandating exec_date <= UTCifend_dateornotself.allow_future_exec_dates:end_date=end_dateortimezone.utcnow()tis=tis.where(DagRun.execution_date<=end_date)ifstate:ifisinstance(state,(str,TaskInstanceState)):tis=tis.where(TaskInstance.state==state)eliflen(state)==1:tis=tis.where(TaskInstance.state==state[0])else:# this is required to deal with NULL valuesifNoneinstate:ifall(xisNoneforxinstate):tis=tis.where(TaskInstance.state.is_(None))else:not_none_state=[sforsinstateifs]tis=tis.where(or_(TaskInstance.state.in_(not_none_state),TaskInstance.state.is_(None)))else:tis=tis.where(TaskInstance.state.in_(state))# Next, get any of them from our parent DAG (if there is one)ifinclude_parentdagandself.parent_dagisnotNone:ifvisited_external_tisisNone:visited_external_tis=set()p_dag=self.parent_dag.partial_subset(task_ids_or_regex=r"^{}$".format(self.dag_id.split(".")[1]),include_upstream=False,include_downstream=True,)result.update(p_dag._get_task_instances(task_ids=task_ids,start_date=start_date,end_date=end_date,run_id=None,state=state,include_subdags=include_subdags,include_parentdag=False,include_dependent_dags=include_dependent_dags,as_pk_tuple=True,exclude_task_ids=exclude_task_ids,session=session,dag_bag=dag_bag,recursion_depth=recursion_depth,max_recursion_depth=max_recursion_depth,visited_external_tis=visited_external_tis,))ifinclude_dependent_dags:# Recursively find external tasks indicated by ExternalTaskMarkerfromairflow.sensors.external_taskimportExternalTaskMarkerquery=tisifas_pk_tuple:all_tis=session.execute(query).all()condition=TI.filter_for_tis(TaskInstanceKey(*cols)forcolsinall_tis)ifconditionisnotNone:query=select(TI).where(condition)ifvisited_external_tisisNone:visited_external_tis=set()external_tasks=session.scalars(query.where(TI.operator==ExternalTaskMarker.__name__))fortiinexternal_tasks:ti_key=ti.key.primaryifti_keyinvisited_external_tis:continuevisited_external_tis.add(ti_key)task:ExternalTaskMarker=cast(ExternalTaskMarker,copy.copy(self.get_task(ti.task_id)))ti.task=taskifmax_recursion_depthisNone:# Maximum recursion depth allowed is the recursion_depth of the first# ExternalTaskMarker in the tasks to be visited.max_recursion_depth=task.recursion_depthifrecursion_depth+1>max_recursion_depth:# Prevent cycles or accidents.raiseAirflowException(f"Maximum recursion depth {max_recursion_depth} reached for "f"{ExternalTaskMarker.__name__}{ti.task_id}. "f"Attempted to clear too many tasks or there may be a cyclic dependency.")ti.render_templates()external_tis=session.scalars(select(TI).join(TI.dag_run).where(TI.dag_id==task.external_dag_id,TI.task_id==task.external_task_id,DagRun.execution_date==pendulum.parse(task.execution_date),))fortiiinexternal_tis:ifnotdag_bag:fromairflow.models.dagbagimportDagBagdag_bag=DagBag(read_dags_from_db=True)external_dag=dag_bag.get_dag(tii.dag_id,session=session)ifnotexternal_dag:raiseAirflowException(f"Could not find dag {tii.dag_id}")downstream=external_dag.partial_subset(task_ids_or_regex=[tii.task_id],include_upstream=False,include_downstream=True,)result.update(downstream._get_task_instances(task_ids=None,run_id=tii.run_id,start_date=None,end_date=None,state=state,include_subdags=include_subdags,include_dependent_dags=include_dependent_dags,include_parentdag=False,as_pk_tuple=True,exclude_task_ids=exclude_task_ids,dag_bag=dag_bag,session=session,recursion_depth=recursion_depth+1,max_recursion_depth=max_recursion_depth,visited_external_tis=visited_external_tis,))ifresultoras_pk_tuple:# Only execute the `ti` query if we have also collected some other results (i.e. subdags etc.)ifas_pk_tuple:tis_query=session.execute(tis).all()result.update(TaskInstanceKey(**cols._mapping)forcolsintis_query)else:result.update(ti.keyfortiinsession.scalars(tis))ifexclude_task_idsisnotNone:result={taskfortaskinresultiftask.task_idnotinexclude_task_idsand(task.task_id,task.map_index)notinexclude_task_ids}ifas_pk_tuple:returnresultifresult:# We've been asked for objects, lets combine it all back in to a result setti_filters=TI.filter_for_tis(result)ifti_filtersisnotNone:tis=select(TI).where(ti_filters)elifexclude_task_idsisNone:pass# Disable filter if not set.elifisinstance(next(iter(exclude_task_ids),None),str):tis=tis.where(TI.task_id.notin_(exclude_task_ids))else:tis=tis.where(not_(tuple_in_condition((TI.task_id,TI.map_index),exclude_task_ids)))returntis@provide_session
[docs]defset_task_instance_state(self,*,task_id:str,map_indexes:Collection[int]|None=None,execution_date:datetime|None=None,run_id:str|None=None,state:TaskInstanceState,upstream:bool=False,downstream:bool=False,future:bool=False,past:bool=False,commit:bool=True,session=NEW_SESSION,)->list[TaskInstance]:""" Set the state of a TaskInstance and clear downstream tasks in failed or upstream_failed state. :param task_id: Task ID of the TaskInstance :param map_indexes: Only set TaskInstance if its map_index matches. If None (default), all mapped TaskInstances of the task are set. :param execution_date: Execution date of the TaskInstance :param run_id: The run_id of the TaskInstance :param state: State to set the TaskInstance to :param upstream: Include all upstream tasks of the given task_id :param downstream: Include all downstream tasks of the given task_id :param future: Include all future TaskInstances of the given task_id :param commit: Commit changes :param past: Include all past TaskInstances of the given task_id """fromairflow.api.common.mark_tasksimportset_stateifnotexactly_one(execution_date,run_id):raiseValueError("Exactly one of execution_date or run_id must be provided")task=self.get_task(task_id)task.dag=selftasks_to_set_state:list[Operator|tuple[Operator,int]]ifmap_indexesisNone:tasks_to_set_state=[task]else:tasks_to_set_state=[(task,map_index)formap_indexinmap_indexes]altered=set_state(tasks=tasks_to_set_state,execution_date=execution_date,run_id=run_id,upstream=upstream,downstream=downstream,future=future,past=past,state=state,commit=commit,session=session,)ifnotcommit:returnaltered# Clear downstream tasks that are in failed/upstream_failed state to resume them.# Flush the session so that the tasks marked success are reflected in the db.session.flush()subdag=self.partial_subset(task_ids_or_regex={task_id},include_downstream=True,include_upstream=False,)ifexecution_dateisNone:dag_run=session.scalars(select(DagRun).where(DagRun.run_id==run_id,DagRun.dag_id==self.dag_id)).one()# Raises an error if not foundresolve_execution_date=dag_run.execution_dateelse:resolve_execution_date=execution_dateend_date=resolve_execution_dateifnotfutureelseNonestart_date=resolve_execution_dateifnotpastelseNonesubdag.clear(start_date=start_date,end_date=end_date,include_subdags=True,include_parentdag=True,only_failed=True,session=session,# Exclude the task itself from being clearedexclude_task_ids=frozenset({task_id}),)returnaltered
@provide_session
[docs]defset_task_group_state(self,*,group_id:str,execution_date:datetime|None=None,run_id:str|None=None,state:TaskInstanceState,upstream:bool=False,downstream:bool=False,future:bool=False,past:bool=False,commit:bool=True,session:Session=NEW_SESSION,)->list[TaskInstance]:""" Set TaskGroup to the given state and clear downstream tasks in failed or upstream_failed state. :param group_id: The group_id of the TaskGroup :param execution_date: Execution date of the TaskInstance :param run_id: The run_id of the TaskInstance :param state: State to set the TaskInstance to :param upstream: Include all upstream tasks of the given task_id :param downstream: Include all downstream tasks of the given task_id :param future: Include all future TaskInstances of the given task_id :param commit: Commit changes :param past: Include all past TaskInstances of the given task_id :param session: new session """fromairflow.api.common.mark_tasksimportset_stateifnotexactly_one(execution_date,run_id):raiseValueError("Exactly one of execution_date or run_id must be provided")tasks_to_set_state:list[BaseOperator|tuple[BaseOperator,int]]=[]task_ids:list[str]=[]ifexecution_dateisNone:dag_run=session.scalars(select(DagRun).where(DagRun.run_id==run_id,DagRun.dag_id==self.dag_id)).one()# Raises an error if not foundresolve_execution_date=dag_run.execution_dateelse:resolve_execution_date=execution_dateend_date=resolve_execution_dateifnotfutureelseNonestart_date=resolve_execution_dateifnotpastelseNonetask_group_dict=self.task_group.get_task_group_dict()task_group=task_group_dict.get(group_id)iftask_groupisNone:raiseValueError("TaskGroup {group_id} could not be found")tasks_to_set_state=[taskfortaskintask_group.iter_tasks()ifisinstance(task,BaseOperator)]task_ids=[task.task_idfortaskintask_group.iter_tasks()]dag_runs_query=select(DagRun.id).where(DagRun.dag_id==self.dag_id)ifstart_dateisNoneandend_dateisNone:dag_runs_query=dag_runs_query.where(DagRun.execution_date==start_date)else:ifstart_dateisnotNone:dag_runs_query=dag_runs_query.where(DagRun.execution_date>=start_date)ifend_dateisnotNone:dag_runs_query=dag_runs_query.where(DagRun.execution_date<=end_date)withlock_rows(dag_runs_query,session):altered=set_state(tasks=tasks_to_set_state,execution_date=execution_date,run_id=run_id,upstream=upstream,downstream=downstream,future=future,past=past,state=state,commit=commit,session=session,)ifnotcommit:returnaltered# Clear downstream tasks that are in failed/upstream_failed state to resume them.# Flush the session so that the tasks marked success are reflected in the db.session.flush()task_subset=self.partial_subset(task_ids_or_regex=task_ids,include_downstream=True,include_upstream=False,)task_subset.clear(start_date=start_date,end_date=end_date,include_subdags=True,include_parentdag=True,only_failed=True,session=session,# Exclude the task from the current group from being clearedexclude_task_ids=frozenset(task_ids),)returnaltered
@property
[docs]defroots(self)->list[Operator]:"""Return nodes with no parents. These are first to execute and are called roots or root nodes."""return[taskfortaskinself.tasksifnottask.upstream_list]
@property
[docs]defleaves(self)->list[Operator]:"""Return nodes with no children. These are last to execute and are called leaves or leaf nodes."""return[taskfortaskinself.tasksifnottask.downstream_list]
[docs]deftopological_sort(self,include_subdag_tasks:bool=False):""" Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies. Deprecated in place of ``task_group.topological_sort`` """fromairflow.utils.task_groupimportTaskGroupdefnested_topo(group):fornodeingroup.topological_sort(_include_subdag_tasks=include_subdag_tasks):ifisinstance(node,TaskGroup):yield fromnested_topo(node)else:yieldnodereturntuple(nested_topo(self.task_group))
@provide_session
[docs]defset_dag_runs_state(self,state:DagRunState=DagRunState.RUNNING,session:Session=NEW_SESSION,start_date:datetime|None=None,end_date:datetime|None=None,dag_ids:list[str]|None=None,)->None:warnings.warn("This method is deprecated and will be removed in a future version.",RemovedInAirflow3Warning,stacklevel=3,)dag_ids=dag_idsor[self.dag_id]query=update(DagRun).where(DagRun.dag_id.in_(dag_ids))ifstart_date:query=query.where(DagRun.execution_date>=start_date)ifend_date:query=query.where(DagRun.execution_date<=end_date)session.execute(query.values(state=state).execution_options(synchronize_session="fetch"))
@provide_session
[docs]defclear(self,task_ids:Collection[str|tuple[str,int]]|None=None,start_date:datetime|None=None,end_date:datetime|None=None,only_failed:bool=False,only_running:bool=False,confirm_prompt:bool=False,include_subdags:bool=True,include_parentdag:bool=True,dag_run_state:DagRunState=DagRunState.QUEUED,dry_run:bool=False,session:Session=NEW_SESSION,get_tis:bool=False,recursion_depth:int=0,max_recursion_depth:int|None=None,dag_bag:DagBag|None=None,exclude_task_ids:frozenset[str]|frozenset[tuple[str,int]]|None=frozenset(),)->int|Iterable[TaskInstance]:""" Clear a set of task instances associated with the current dag for a specified date range. :param task_ids: List of task ids or (``task_id``, ``map_index``) tuples to clear :param start_date: The minimum execution_date to clear :param end_date: The maximum execution_date to clear :param only_failed: Only clear failed tasks :param only_running: Only clear running tasks. :param confirm_prompt: Ask for confirmation :param include_subdags: Clear tasks in subdags and clear external tasks indicated by ExternalTaskMarker :param include_parentdag: Clear tasks in the parent dag of the subdag. :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not be changed. :param dry_run: Find the tasks to clear but don't clear them. :param session: The sqlalchemy session to use :param dag_bag: The DagBag used to find the dags subdags (Optional) :param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``) tuples that should not be cleared """ifget_tis:warnings.warn("Passing `get_tis` to dag.clear() is deprecated. Use `dry_run` parameter instead.",RemovedInAirflow3Warning,stacklevel=2,)dry_run=Trueifrecursion_depth:warnings.warn("Passing `recursion_depth` to dag.clear() is deprecated.",RemovedInAirflow3Warning,stacklevel=2,)ifmax_recursion_depth:warnings.warn("Passing `max_recursion_depth` to dag.clear() is deprecated.",RemovedInAirflow3Warning,stacklevel=2,)state:list[TaskInstanceState]=[]ifonly_failed:state+=[TaskInstanceState.FAILED,TaskInstanceState.UPSTREAM_FAILED]ifonly_running:# Yes, having `+=` doesn't make sense, but this was the existing behaviourstate+=[TaskInstanceState.RUNNING]tis=self._get_task_instances(task_ids=task_ids,start_date=start_date,end_date=end_date,run_id=None,state=state,include_subdags=include_subdags,include_parentdag=include_parentdag,include_dependent_dags=include_subdags,# compat, yes this is not a typosession=session,dag_bag=dag_bag,exclude_task_ids=exclude_task_ids,)ifdry_run:returnsession.scalars(tis).all()tis=session.scalars(tis).all()count=len(list(tis))do_it=Trueifcount==0:return0ifconfirm_prompt:ti_list="\n".join(str(t)fortintis)question=f"You are about to delete these {count} tasks:\n{ti_list}\n\nAre you sure? [y/n]"do_it=utils.helpers.ask_yesno(question)ifdo_it:clear_task_instances(list(tis),session,dag=self,dag_run_state=dag_run_state,)else:count=0print("Cancelled, nothing was cleared.")session.flush()returncount
@classmethod
[docs]defclear_dags(cls,dags,start_date=None,end_date=None,only_failed=False,only_running=False,confirm_prompt=False,include_subdags=True,include_parentdag=False,dag_run_state=DagRunState.QUEUED,dry_run=False,):all_tis=[]fordagindags:tis=dag.clear(start_date=start_date,end_date=end_date,only_failed=only_failed,only_running=only_running,confirm_prompt=False,include_subdags=include_subdags,include_parentdag=include_parentdag,dag_run_state=dag_run_state,dry_run=True,)all_tis.extend(tis)ifdry_run:returnall_tiscount=len(all_tis)do_it=Trueifcount==0:print("Nothing to clear.")return0ifconfirm_prompt:ti_list="\n".join(str(t)fortinall_tis)question=f"You are about to delete these {count} tasks:\n{ti_list}\n\nAre you sure? [y/n]"do_it=utils.helpers.ask_yesno(question)ifdo_it:fordagindags:dag.clear(start_date=start_date,end_date=end_date,only_failed=only_failed,only_running=only_running,confirm_prompt=False,include_subdags=include_subdags,dag_run_state=dag_run_state,dry_run=False,)else:count=0print("Cancelled, nothing was cleared.")returncount
[docs]def__deepcopy__(self,memo):# Switcharoo to go around deepcopying objects coming through the# backdoorcls=self.__class__result=cls.__new__(cls)memo[id(self)]=resultfork,vinself.__dict__.items():ifknotin("user_defined_macros","user_defined_filters","_log"):setattr(result,k,copy.deepcopy(v,memo))result.user_defined_macros=self.user_defined_macrosresult.user_defined_filters=self.user_defined_filtersifhasattr(self,"_log"):result._log=self._logreturnresult
[docs]defsub_dag(self,*args,**kwargs):"""Use `airflow.models.DAG.partial_subset`, this method is deprecated."""warnings.warn("This method is deprecated and will be removed in a future version. Please use partial_subset",RemovedInAirflow3Warning,stacklevel=2,)returnself.partial_subset(*args,**kwargs)
[docs]defpartial_subset(self,task_ids_or_regex:str|Pattern|Iterable[str],include_downstream=False,include_upstream=True,include_direct_upstream=False,):""" Return a subset of the current dag based on regex matching one or more tasks. Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed. :param task_ids_or_regex: Either a list of task_ids, or a regex to match against task ids (as a string, or compiled regex pattern). :param include_downstream: Include all downstream tasks of matched tasks, in addition to matched tasks. :param include_upstream: Include all upstream tasks of matched tasks, in addition to matched tasks. :param include_direct_upstream: Include all tasks directly upstream of matched and downstream (if include_downstream = True) tasks """fromairflow.models.baseoperatorimportBaseOperatorfromairflow.models.mappedoperatorimportMappedOperator# deep-copying self.task_dict and self._task_group takes a long time, and we don't want all# the tasks anyway, so we copy the tasks manually latermemo={id(self.task_dict):None,id(self._task_group):None}dag=copy.deepcopy(self,memo)# type: ignoreifisinstance(task_ids_or_regex,(str,Pattern)):matched_tasks=[tfortinself.tasksifre2.findall(task_ids_or_regex,t.task_id)]else:matched_tasks=[tfortinself.tasksift.task_idintask_ids_or_regex]also_include_ids:set[str]=set()fortinmatched_tasks:ifinclude_downstream:forrelint.get_flat_relatives(upstream=False):also_include_ids.add(rel.task_id)ifrelnotinmatched_tasks:# if it's in there, we're already processing it# need to include setups and teardowns for tasks that are in multiple# non-collinear setup/teardown pathsifnotrel.is_setupandnotrel.is_teardown:also_include_ids.update(x.task_idforxinrel.get_upstreams_only_setups_and_teardowns())ifinclude_upstream:also_include_ids.update(x.task_idforxint.get_upstreams_follow_setups())else:ifnott.is_setupandnott.is_teardown:also_include_ids.update(x.task_idforxint.get_upstreams_only_setups_and_teardowns())ift.is_setupandnotinclude_downstream:also_include_ids.update(x.task_idforxint.downstream_listifx.is_teardown)also_include:list[Operator]=[self.task_dict[x]forxinalso_include_ids]direct_upstreams:list[Operator]=[]ifinclude_direct_upstream:fortinitertools.chain(matched_tasks,also_include):upstream=(uforuint.upstream_listifisinstance(u,(BaseOperator,MappedOperator)))direct_upstreams.extend(upstream)# Compiling the unique list of tasks that made the cut# Make sure to not recursively deepcopy the dag or task_group while copying the task.# task_group is reset laterdef_deepcopy_task(t)->Operator:memo.setdefault(id(t.task_group),None)returncopy.deepcopy(t,memo)dag.task_dict={t.task_id:_deepcopy_task(t)fortinitertools.chain(matched_tasks,also_include,direct_upstreams)}deffilter_task_group(group,parent_group):"""Exclude tasks not included in the subdag from the given TaskGroup."""# We want to deepcopy _most but not all_ attributes of the task group, so we create a shallow copy# and then manually deep copy the instances. (memo argument to deepcopy only works for instances# of classes, not "native" properties of an instance)copied=copy.copy(group)memo[id(group.children)]={}ifparent_group:memo[id(group.parent_group)]=parent_groupforattr,valueincopied.__dict__.items():ifid(value)inmemo:value=memo[id(value)]else:value=copy.deepcopy(value,memo)copied.__dict__[attr]=valueproxy=weakref.proxy(copied)forchildingroup.children.values():ifisinstance(child,AbstractOperator):ifchild.task_idindag.task_dict:task=copied.children[child.task_id]=dag.task_dict[child.task_id]task.task_group=proxyelse:copied.used_group_ids.discard(child.task_id)else:filtered_child=filter_task_group(child,proxy)# Only include this child TaskGroup if it is non-empty.iffiltered_child.children:copied.children[child.group_id]=filtered_childreturncopieddag._task_group=filter_task_group(self.task_group,None)# Removing upstream/downstream references to tasks and TaskGroups that did not make# the cut.subdag_task_groups=dag.task_group.get_task_group_dict()forgroupinsubdag_task_groups.values():group.upstream_group_ids.intersection_update(subdag_task_groups)group.downstream_group_ids.intersection_update(subdag_task_groups)group.upstream_task_ids.intersection_update(dag.task_dict)group.downstream_task_ids.intersection_update(dag.task_dict)fortindag.tasks:# Removing upstream/downstream references to tasks that did not# make the cutt.upstream_task_ids.intersection_update(dag.task_dict)t.downstream_task_ids.intersection_update(dag.task_dict)iflen(dag.tasks)<len(self.tasks):dag.partial=Truereturndag
[docs]defget_task(self,task_id:str,include_subdags:bool=False)->Operator:iftask_idinself.task_dict:returnself.task_dict[task_id]ifinclude_subdags:fordaginself.subdags:iftask_idindag.task_dict:returndag.task_dict[task_id]raiseTaskNotFound(f"Task {task_id} not found")
[docs]deftree_view(self)->None:"""Print an ASCII tree representation of the DAG."""warnings.warn("`tree_view` is deprecated and will be removed in Airflow 3.0.",category=RemovedInAirflow3Warning,stacklevel=2,)fortmpinself._generate_tree_view():print(tmp)
[docs]defget_tree_view(self)->str:"""Return an ASCII tree representation of the DAG."""warnings.warn("`get_tree_view` is deprecated and will be removed in Airflow 3.0.",category=RemovedInAirflow3Warning,stacklevel=2,)rst=""fortmpinself._generate_tree_view():rst+=tmp+"\n"returnrst
[docs]defadd_task(self,task:Operator)->None:""" Add a task to the DAG. :param task: the task you want to add """FailStopDagInvalidTriggerRule.check(dag=self,trigger_rule=task.trigger_rule)fromairflow.utils.task_groupimportTaskGroupContext# if the task has no start date, assign it the same as the DAGifnottask.start_date:task.start_date=self.start_date# otherwise, the task will start on the later of its own start date and# the DAG's start dateelifself.start_date:task.start_date=max(task.start_date,self.start_date)# if the task has no end date, assign it the same as the dagifnottask.end_date:task.end_date=self.end_date# otherwise, the task will end on the earlier of its own end date and# the DAG's end dateeliftask.end_dateandself.end_date:task.end_date=min(task.end_date,self.end_date)task_id=task.task_idifnottask.task_group:task_group=TaskGroupContext.get_current_task_group(self)iftask_group:task_id=task_group.child_id(task_id)task_group.add(task)if(task_idinself.task_dictandself.task_dict[task_id]isnottask)ortask_idinself._task_group.used_group_ids:raiseDuplicateTaskIdFound(f"Task id '{task_id}' has already been added to the DAG")else:self.task_dict[task_id]=tasktask.dag=self# Add task_id to used_group_ids to prevent group_id and task_id collisions.self._task_group.used_group_ids.add(task_id)self.task_count=len(self.task_dict)
[docs]defadd_tasks(self,tasks:Iterable[Operator])->None:""" Add a list of tasks to the DAG. :param tasks: a lit of tasks you want to add """fortaskintasks:self.add_task(task)
def_remove_task(self,task_id:str)->None:# This is "private" as removing could leave a hole in dependencies if done incorrectly, and this# doesn't guard against thattask=self.task_dict.pop(task_id)tg=getattr(task,"task_group",None)iftg:tg._remove(task)self.task_count=len(self.task_dict)
[docs]defrun(self,start_date=None,end_date=None,mark_success=False,local=False,donot_pickle=airflow_conf.getboolean("core","donot_pickle"),ignore_task_deps=False,ignore_first_depends_on_past=True,pool=None,delay_on_limit_secs=1.0,verbose=False,conf=None,rerun_failed_tasks=False,run_backwards=False,run_at_least_once=False,continue_on_failures=False,disable_retry=False,):""" Run the DAG. :param start_date: the start date of the range to run :param end_date: the end date of the range to run :param mark_success: True to mark jobs as succeeded without running them :param local: True to run the tasks using the LocalExecutor :param executor: The executor instance to run the tasks :param donot_pickle: True to avoid pickling DAG object and send to workers :param ignore_task_deps: True to skip upstream tasks :param ignore_first_depends_on_past: True to ignore depends_on_past dependencies for the first set of tasks only :param pool: Resource pool to use :param delay_on_limit_secs: Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached :param verbose: Make logging output more verbose :param conf: user defined dictionary passed from CLI :param rerun_failed_tasks: :param run_backwards: :param run_at_least_once: If true, always run the DAG at least once even if no logical run exists within the time range. """fromairflow.executors.executor_loaderimportExecutorLoaderfromairflow.jobs.backfill_job_runnerimportBackfillJobRunneriflocal:fromairflow.executors.local_executorimportLocalExecutorExecutorLoader.set_default_executor(LocalExecutor())fromairflow.jobs.jobimportJobjob=Job()job_runner=BackfillJobRunner(job=job,dag=self,start_date=start_date,end_date=end_date,mark_success=mark_success,donot_pickle=donot_pickle,ignore_task_deps=ignore_task_deps,ignore_first_depends_on_past=ignore_first_depends_on_past,pool=pool,delay_on_limit_secs=delay_on_limit_secs,verbose=verbose,conf=conf,rerun_failed_tasks=rerun_failed_tasks,run_backwards=run_backwards,run_at_least_once=run_at_least_once,continue_on_failures=continue_on_failures,disable_retry=disable_retry,)run_job(job=job,execute_callable=job_runner._execute)
[docs]defcli(self):"""Exposes a CLI specific to this DAG."""check_cycle(self)fromairflow.cliimportcli_parserparser=cli_parser.get_parser(dag_parser=True)args=parser.parse_args()args.func(args,self)
@provide_session
[docs]deftest(self,execution_date:datetime|None=None,run_conf:dict[str,Any]|None=None,conn_file_path:str|None=None,variable_file_path:str|None=None,use_executor:bool=False,mark_success_pattern:Pattern|str|None=None,session:Session=NEW_SESSION,)->DagRun:""" Execute one single DagRun for a given DAG and execution date. :param execution_date: execution date for the DAG run :param run_conf: configuration to pass to newly created dagrun :param conn_file_path: file path to a connection file in either yaml or json :param variable_file_path: file path to a variable file in either yaml or json :param use_executor: if set, uses an executor to test the DAG :param mark_success_pattern: regex of task_ids to mark as success instead of running :param session: database connection (optional) """defadd_logger_if_needed(ti:TaskInstance):""" Add a formatted logger to the task instance. This allows all logs to surface to the command line, instead of into a task file. Since this is a local test run, it is much better for the user to see logs in the command line, rather than needing to search for a log file. :param ti: The task instance that will receive a logger. """format=logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")handler=logging.StreamHandler(sys.stdout)handler.level=logging.INFOhandler.setFormatter(format)# only add log handler onceifnotany(isinstance(h,logging.StreamHandler)forhinti.log.handlers):self.log.debug("Adding Streamhandler to taskinstance %s",ti.task_id)ti.log.addHandler(handler)exit_stack=ExitStack()ifconn_file_pathorvariable_file_path:local_secrets=LocalFilesystemBackend(variables_file_path=variable_file_path,connections_file_path=conn_file_path)secrets_backend_list.insert(0,local_secrets)exit_stack.callback(lambda:secrets_backend_list.pop(0))withexit_stack:execution_date=execution_dateortimezone.utcnow()self.validate()self.log.debug("Clearing existing task instances for execution date %s",execution_date)self.clear(start_date=execution_date,end_date=execution_date,dag_run_state=False,# type: ignoresession=session,)self.log.debug("Getting dagrun for dag %s",self.dag_id)logical_date=timezone.coerce_datetime(execution_date)data_interval=self.timetable.infer_manual_data_interval(run_after=logical_date)dr:DagRun=_get_or_create_dagrun(dag=self,start_date=execution_date,execution_date=execution_date,run_id=DagRun.generate_run_id(DagRunType.MANUAL,execution_date),session=session,conf=run_conf,data_interval=data_interval,)tasks=self.task_dictself.log.debug("starting dagrun")# Instead of starting a scheduler, we run the minimal loop possible to check# for task readiness and dependency management. This is notably faster# than creating a BackfillJob and allows us to surface logs to the user# ``Dag.test()`` works in two different modes depending on ``use_executor``:# - if ``use_executor`` is False, runs the task locally with no executor using ``_run_task``# - if ``use_executor`` is True, sends the task instances to the executor with# ``BaseExecutor.queue_task_instance``ifuse_executor:executor=ExecutorLoader.get_default_executor()executor.start()whiledr.state==DagRunState.RUNNING:session.expire_all()schedulable_tis,_=dr.update_state(session=session)forsinschedulable_tis:ifs.state!=TaskInstanceState.UP_FOR_RESCHEDULE:s.try_number+=1s.state=TaskInstanceState.SCHEDULEDsession.commit()# triggerer may mark tasks scheduled so we read from DBall_tis=set(dr.get_task_instances(session=session))scheduled_tis={xforxinall_tisifx.state==TaskInstanceState.SCHEDULED}ids_unrunnable={xforxinall_tisifx.statenotinState.finished}-scheduled_tisifnotscheduled_tisandids_unrunnable:self.log.warning("No tasks to run. unrunnable tasks: %s",ids_unrunnable)time.sleep(1)triggerer_running=_triggerer_is_healthy()fortiinscheduled_tis:ti.task=tasks[ti.task_id]mark_success=(re2.compile(mark_success_pattern).fullmatch(ti.task_id)isnotNoneifmark_success_patternisnotNoneelseFalse)ifuse_executor:ifexecutor.has_task(ti):continue# Send the task to the executorexecutor.queue_task_instance(ti,ignore_ti_state=True)else:# Run the task locallytry:add_logger_if_needed(ti)_run_task(ti=ti,inline_trigger=nottriggerer_running,session=session,mark_success=mark_success,)exceptException:self.log.exception("Task failed; ti=%s",ti)ifuse_executor:executor.heartbeat()ifuse_executor:executor.end()returndr
@provide_session
[docs]defcreate_dagrun(self,state:DagRunState,execution_date:datetime|None=None,run_id:str|None=None,start_date:datetime|None=None,external_trigger:bool|None=False,conf:dict|None=None,run_type:DagRunType|None=None,session:Session=NEW_SESSION,dag_hash:str|None=None,creating_job_id:int|None=None,data_interval:tuple[datetime,datetime]|None=None,):""" Create a dag run from this dag including the tasks associated with this dag. Returns the dag run. :param run_id: defines the run id for this dag run :param run_type: type of DagRun :param execution_date: the execution date of this dag run :param state: the state of the dag run :param start_date: the date this dag run should be evaluated :param external_trigger: whether this dag run is externally triggered :param conf: Dict containing configuration/parameters to pass to the DAG :param creating_job_id: id of the job creating this DagRun :param session: database session :param dag_hash: Hash of Serialized DAG :param data_interval: Data interval of the DagRun """logical_date=timezone.coerce_datetime(execution_date)ifdata_intervalandnotisinstance(data_interval,DataInterval):data_interval=DataInterval(*map(timezone.coerce_datetime,data_interval))ifdata_intervalisNoneandlogical_dateisnotNone:warnings.warn("Calling `DAG.create_dagrun()` without an explicit data interval is deprecated",RemovedInAirflow3Warning,stacklevel=3,)ifrun_type==DagRunType.MANUAL:data_interval=self.timetable.infer_manual_data_interval(run_after=logical_date)else:data_interval=self.infer_automated_data_interval(logical_date)ifrun_typeisNoneorisinstance(run_type,DagRunType):passelifisinstance(run_type,str):# Compatibility: run_type used to be a str.run_type=DagRunType(run_type)else:raiseValueError(f"`run_type` should be a DagRunType, not {type(run_type)}")ifrun_id:# Infer run_type from run_id if needed.ifnotisinstance(run_id,str):raiseValueError(f"`run_id` should be a str, not {type(run_id)}")inferred_run_type=DagRunType.from_run_id(run_id)ifrun_typeisNone:# No explicit type given, use the inferred type.run_type=inferred_run_typeelifrun_type==DagRunType.MANUALandinferred_run_type!=DagRunType.MANUAL:# Prevent a manual run from using an ID that looks like a scheduled run.raiseValueError(f"A {run_type.value} DAG run cannot use ID {run_id!r} since it "f"is reserved for {inferred_run_type.value} runs")elifrun_typeandlogical_dateisnotNone:# Generate run_id from run_type and execution_date.run_id=self.timetable.generate_run_id(run_type=run_type,logical_date=logical_date,data_interval=data_interval)else:raiseAirflowException("Creating DagRun needs either `run_id` or both `run_type` and `execution_date`")regex=airflow_conf.get("scheduler","allowed_run_id_pattern")ifrun_idandnotre2.match(RUN_ID_REGEX,run_id):ifnotregex.strip()ornotre2.match(regex.strip(),run_id):raiseAirflowException(f"The provided run ID '{run_id}' is invalid. It does not match either "f"the configured pattern: '{regex}' or the built-in pattern: '{RUN_ID_REGEX}'")# create a copy of params before validatingcopied_params=copy.deepcopy(self.params)copied_params.update(confor{})copied_params.validate()run=_create_orm_dagrun(dag=self,dag_id=self.dag_id,run_id=run_id,logical_date=logical_date,start_date=start_date,external_trigger=external_trigger,conf=conf,state=state,run_type=run_type,dag_hash=dag_hash,creating_job_id=creating_job_id,data_interval=data_interval,session=session,)returnrun
@classmethod@provide_session
[docs]defbulk_sync_to_db(cls,dags:Collection[DAG],session=NEW_SESSION,):"""Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated."""warnings.warn("This method is deprecated and will be removed in a future version. Please use bulk_write_to_db",RemovedInAirflow3Warning,stacklevel=2,)returncls.bulk_write_to_db(dags=dags,session=session)
@classmethod@provide_session
[docs]defbulk_write_to_db(cls,dags:Collection[DAG],processor_subdir:str|None=None,session=NEW_SESSION,):""" Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator. :param dags: the DAG objects to save to the DB :return: None """ifnotdags:returnlog.info("Sync %s DAGs",len(dags))dag_by_ids={dag.dag_id:dagfordagindags}dag_ids=set(dag_by_ids)query=(select(DagModel).options(joinedload(DagModel.tags,innerjoin=False)).where(DagModel.dag_id.in_(dag_ids)).options(joinedload(DagModel.schedule_dataset_references)).options(joinedload(DagModel.schedule_dataset_alias_references)).options(joinedload(DagModel.task_outlet_dataset_references)))query=with_row_locks(query,of=DagModel,session=session)orm_dags:list[DagModel]=session.scalars(query).unique().all()existing_dags:dict[str,DagModel]={x.dag_id:xforxinorm_dags}missing_dag_ids=dag_ids.difference(existing_dags.keys())formissing_dag_idinmissing_dag_ids:orm_dag=DagModel(dag_id=missing_dag_id)dag=dag_by_ids[missing_dag_id]ifdag.is_paused_upon_creationisnotNone:orm_dag.is_paused=dag.is_paused_upon_creationorm_dag.tags=[]log.info("Creating ORM DAG for %s",dag.dag_id)session.add(orm_dag)orm_dags.append(orm_dag)latest_runs:dict[str,DagRun]={}num_active_runs:dict[str,int]={}# Skip these queries entirely if no DAGs can be scheduled to save time.ifany(dag.timetable.can_be_scheduledfordagindags):# Get the latest automated dag run for each existing dag as a single query (avoid n+1 query)query=cls._get_latest_runs_stmt(dags=list(existing_dags.keys()))latest_runs={run.dag_id:runforruninsession.scalars(query)}# Get number of active dagruns for all dags we are processing as a single query.num_active_runs=DagRun.active_runs_of_dags(dag_ids=existing_dags,session=session)filelocs=[]fororm_daginsorted(orm_dags,key=lambdad:d.dag_id):dag=dag_by_ids[orm_dag.dag_id]filelocs.append(dag.fileloc)ifdag.is_subdag:orm_dag.is_subdag=Trueorm_dag.fileloc=dag.parent_dag.fileloc# type: ignoreorm_dag.root_dag_id=dag.parent_dag.dag_id# type: ignoreorm_dag.owners=dag.parent_dag.owner# type: ignoreelse:orm_dag.is_subdag=Falseorm_dag.fileloc=dag.filelocorm_dag.owners=dag.ownerorm_dag.is_active=Trueorm_dag.has_import_errors=Falseorm_dag.last_parsed_time=timezone.utcnow()orm_dag.default_view=dag.default_vieworm_dag._dag_display_property_value=dag._dag_display_property_valueorm_dag.description=dag.descriptionorm_dag.max_active_tasks=dag.max_active_tasksorm_dag.max_active_runs=dag.max_active_runsorm_dag.max_consecutive_failed_dag_runs=dag.max_consecutive_failed_dag_runsorm_dag.has_task_concurrency_limits=any(t.max_active_tis_per_dagisnotNoneort.max_active_tis_per_dagrunisnotNonefortindag.tasks)orm_dag.schedule_interval=dag.schedule_intervalorm_dag.timetable_description=dag.timetable.descriptionorm_dag.dataset_expression=dag.timetable.dataset_condition.as_expression()orm_dag.processor_subdir=processor_subdirlast_automated_run:DagRun|None=latest_runs.get(dag.dag_id)iflast_automated_runisNone:last_automated_data_interval=Noneelse:last_automated_data_interval=dag.get_run_data_interval(last_automated_run)ifnum_active_runs.get(dag.dag_id,0)>=orm_dag.max_active_runs:orm_dag.next_dagrun_create_after=Noneelse:orm_dag.calculate_dagrun_date_fields(dag,last_automated_data_interval)dag_tags=set(dag.tagsor{})orm_dag_tags=list(orm_dag.tagsor[])fororm_taginorm_dag_tags:iform_tag.namenotindag_tags:session.delete(orm_tag)orm_dag.tags.remove(orm_tag)orm_tag_names={t.namefortinorm_dag_tags}fordag_tagindag_tags:ifdag_tagnotinorm_tag_names:dag_tag_orm=DagTag(name=dag_tag,dag_id=dag.dag_id)orm_dag.tags.append(dag_tag_orm)session.add(dag_tag_orm)orm_dag_links=orm_dag.dag_owner_linksor[]fororm_dag_linkinorm_dag_links:iform_dag_linknotindag.owner_links:session.delete(orm_dag_link)forowner_name,owner_linkindag.owner_links.items():dag_owner_orm=DagOwnerAttributes(dag_id=dag.dag_id,owner=owner_name,link=owner_link)session.add(dag_owner_orm)DagCode.bulk_sync_to_db(filelocs,session=session)fromairflow.datasetsimportDatasetfromairflow.models.datasetimport(DagScheduleDatasetAliasReference,DagScheduleDatasetReference,DatasetModel,TaskOutletDatasetReference,)dag_references:dict[str,set[tuple[Literal["dataset","dataset-alias"],str]]]=defaultdict(set)outlet_references=defaultdict(set)# We can't use a set here as we want to preserve orderoutlet_dataset_models:dict[DatasetModel,None]={}input_dataset_models:dict[DatasetModel,None]={}outlet_dataset_alias_models:set[DatasetAliasModel]=set()input_dataset_alias_models:set[DatasetAliasModel]=set()# here we go through dags and tasks to check for dataset references# if there are now None and previously there were some, we delete them# if there are now *any*, we add them to the above data structures, and# later we'll persist them to the database.fordagindags:curr_orm_dag=existing_dags.get(dag.dag_id)ifnot(dataset_condition:=dag.timetable.dataset_condition):ifcurr_orm_dag:ifcurr_orm_dag.schedule_dataset_references:curr_orm_dag.schedule_dataset_references=[]ifcurr_orm_dag.schedule_dataset_alias_references:curr_orm_dag.schedule_dataset_alias_references=[]else:for_,datasetindataset_condition.iter_datasets():dag_references[dag.dag_id].add(("dataset",dataset.uri))input_dataset_models[DatasetModel.from_public(dataset)]=Nonefordataset_aliasindataset_condition.iter_dataset_aliases():dag_references[dag.dag_id].add(("dataset-alias",dataset_alias.name))input_dataset_alias_models.add(DatasetAliasModel.from_public(dataset_alias))curr_outlet_references=curr_orm_dagandcurr_orm_dag.task_outlet_dataset_referencesfortaskindag.tasks:dataset_outlets:list[Dataset]=[]dataset_alias_outlets:set[DatasetAlias]=set()foroutletintask.outlets:ifisinstance(outlet,Dataset):dataset_outlets.append(outlet)elifisinstance(outlet,DatasetAlias):dataset_alias_outlets.add(outlet)ifnotdataset_outlets:ifcurr_outlet_references:this_task_outlet_refs=[xforxincurr_outlet_referencesifx.dag_id==dag.dag_idandx.task_id==task.task_id]forrefinthis_task_outlet_refs:curr_outlet_references.remove(ref)fordindataset_outlets:outlet_dataset_models[DatasetModel.from_public(d)]=Noneoutlet_references[(task.dag_id,task.task_id)].add(d.uri)ford_aindataset_alias_outlets:outlet_dataset_alias_models.add(DatasetAliasModel.from_public(d_a))all_dataset_models=outlet_dataset_modelsall_dataset_models.update(input_dataset_models)# store datasetsstored_dataset_models:dict[str,DatasetModel]={}new_dataset_models:list[DatasetModel]=[]fordatasetinall_dataset_models:stored_dataset_model=session.scalar(select(DatasetModel).where(DatasetModel.uri==dataset.uri).limit(1))ifstored_dataset_model:# Some datasets may have been previously unreferenced, and therefore orphaned by the# scheduler. But if we're here, then we have found that dataset again in our DAGs, which# means that it is no longer an orphan, so set is_orphaned to False.stored_dataset_model.is_orphaned=expression.false()stored_dataset_models[stored_dataset_model.uri]=stored_dataset_modelelse:new_dataset_models.append(dataset)dataset_manager.create_datasets(dataset_models=new_dataset_models,session=session)stored_dataset_models.update({dataset_model.uri:dataset_modelfordataset_modelinnew_dataset_models})delnew_dataset_modelsdelall_dataset_models# store dataset aliasesall_datasets_alias_models=input_dataset_alias_models|outlet_dataset_alias_modelsstored_dataset_alias_models:dict[str,DatasetAliasModel]={}new_dataset_alias_models:set[DatasetAliasModel]=set()ifall_datasets_alias_models:all_dataset_alias_names={dataset_alias_model.namefordataset_alias_modelinall_datasets_alias_models}stored_dataset_alias_models={dsa_m.name:dsa_mfordsa_minsession.scalars(select(DatasetAliasModel).where(DatasetAliasModel.name.in_(all_dataset_alias_names))).fetchall()}ifstored_dataset_alias_models:new_dataset_alias_models={dataset_alias_modelfordataset_alias_modelinall_datasets_alias_modelsifdataset_alias_model.namenotinstored_dataset_alias_models.keys()}else:new_dataset_alias_models=all_datasets_alias_modelssession.add_all(new_dataset_alias_models)session.flush()stored_dataset_alias_models.update({dataset_alias_model.name:dataset_alias_modelfordataset_alias_modelinnew_dataset_alias_models})delnew_dataset_alias_modelsdelall_datasets_alias_models# reconcile dag-schedule-on-dataset and dag-schedule-on-dataset-alias referencesfordag_id,base_dataset_listindag_references.items():dag_refs_needed={DagScheduleDatasetReference(dataset_id=stored_dataset_models[base_dataset_identifier].id,dag_id=dag_id)ifbase_dataset_type=="dataset"elseDagScheduleDatasetAliasReference(alias_id=stored_dataset_alias_models[base_dataset_identifier].id,dag_id=dag_id)forbase_dataset_type,base_dataset_identifierinbase_dataset_list}# if isinstance(base_dataset, Dataset)dag_refs_stored=(set(existing_dags.get(dag_id).schedule_dataset_references)# type: ignore|set(existing_dags.get(dag_id).schedule_dataset_alias_references)# type: ignoreifexisting_dags.get(dag_id)elseset())dag_refs_to_add=dag_refs_needed-dag_refs_storedsession.bulk_save_objects(dag_refs_to_add)forobjindag_refs_stored-dag_refs_needed:session.delete(obj)existing_task_outlet_refs_dict=defaultdict(set)fordag_id,orm_daginexisting_dags.items():fortodrinorm_dag.task_outlet_dataset_references:existing_task_outlet_refs_dict[(dag_id,todr.task_id)].add(todr)# reconcile task-outlet-dataset referencesfor(dag_id,task_id),uri_listinoutlet_references.items():task_refs_needed={TaskOutletDatasetReference(dataset_id=stored_dataset_models[uri].id,dag_id=dag_id,task_id=task_id)foruriinuri_list}task_refs_stored=existing_task_outlet_refs_dict[(dag_id,task_id)]task_refs_to_add={xforxintask_refs_neededifxnotintask_refs_stored}session.bulk_save_objects(task_refs_to_add)forobjintask_refs_stored-task_refs_needed:session.delete(obj)# Issue SQL/finish "Unit of Work", but let @provide_session commit (or if passed a session, let caller# decide when to commitsession.flush()fordagindags:cls.bulk_write_to_db(dag.subdags,processor_subdir=processor_subdir,session=session)
@classmethoddef_get_latest_runs_stmt(cls,dags:list[str])->Select:""" Build a select statement for retrieve the last automated run for each dag. :param dags: dags to query """iflen(dags)==1:# Index optimized fast path to avoid more complicated & slower groupby queryplanexisting_dag_id=dags[0]last_automated_runs_subq=(select(func.max(DagRun.execution_date).label("max_execution_date")).where(DagRun.dag_id==existing_dag_id,DagRun.run_type.in_((DagRunType.BACKFILL_JOB,DagRunType.SCHEDULED)),).scalar_subquery())query=select(DagRun).where(DagRun.dag_id==existing_dag_id,DagRun.execution_date==last_automated_runs_subq)else:last_automated_runs_subq=(select(DagRun.dag_id,func.max(DagRun.execution_date).label("max_execution_date")).where(DagRun.dag_id.in_(dags),DagRun.run_type.in_((DagRunType.BACKFILL_JOB,DagRunType.SCHEDULED)),).group_by(DagRun.dag_id).subquery())query=select(DagRun).where(DagRun.dag_id==last_automated_runs_subq.c.dag_id,DagRun.execution_date==last_automated_runs_subq.c.max_execution_date,)returnquery.options(load_only(DagRun.dag_id,DagRun.execution_date,DagRun.data_interval_start,DagRun.data_interval_end,))@provide_session
[docs]defsync_to_db(self,processor_subdir:str|None=None,session=NEW_SESSION):""" Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator. :return: None """self.bulk_write_to_db([self],processor_subdir=processor_subdir,session=session)
[docs]defdeactivate_unknown_dags(active_dag_ids,session=NEW_SESSION):""" Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM. :param active_dag_ids: list of DAG IDs that are active :return: None """ifnotactive_dag_ids:returnfordaginsession.scalars(select(DagModel).where(~DagModel.dag_id.in_(active_dag_ids))).all():dag.is_active=Falsesession.merge(dag)session.commit()
@staticmethod@provide_session
[docs]defdeactivate_stale_dags(expiration_date,session=NEW_SESSION):""" Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted. :param expiration_date: set inactive DAGs that were touched before this time :return: None """fordaginsession.scalars(select(DagModel).where(DagModel.last_parsed_time<expiration_date,DagModel.is_active)):log.info("Deactivating DAG ID %s since it was last touched by the scheduler at %s",dag.dag_id,dag.last_parsed_time.isoformat(),)dag.is_active=Falsesession.merge(dag)session.commit()
@staticmethod@provide_session
[docs]defget_num_task_instances(dag_id,run_id=None,task_ids=None,states=None,session=NEW_SESSION)->int:""" Return the number of task instances in the given DAG. :param session: ORM session :param dag_id: ID of the DAG to get the task concurrency of :param run_id: ID of the DAG run to get the task concurrency of :param task_ids: A list of valid task IDs for the given DAG :param states: A list of states to filter by if supplied :return: The number of running tasks """qry=select(func.count(TaskInstance.task_id)).where(TaskInstance.dag_id==dag_id,)ifrun_id:qry=qry.where(TaskInstance.run_id==run_id,)iftask_ids:qry=qry.where(TaskInstance.task_id.in_(task_ids),)ifstates:ifNoneinstates:ifall(xisNoneforxinstates):qry=qry.where(TaskInstance.state.is_(None))else:not_none_states=[stateforstateinstatesifstate]qry=qry.where(or_(TaskInstance.state.in_(not_none_states),TaskInstance.state.is_(None)))else:qry=qry.where(TaskInstance.state.in_(states))returnsession.scalar(qry)
@classmethod
[docs]defget_serialized_fields(cls):"""Stringified DAGs and operators contain exactly these fields."""ifnotcls.__serialized_fields:exclusion_list={"parent_dag","schedule_dataset_references","schedule_dataset_alias_references","task_outlet_dataset_references","_old_context_manager_dags","safe_dag_id","last_loaded","user_defined_filters","user_defined_macros","partial","params","_pickle_id","_log","task_dict","template_searchpath","sla_miss_callback","on_success_callback","on_failure_callback","template_undefined","jinja_environment_kwargs",# has_on_*_callback are only stored if the value is True, as the default is False"has_on_success_callback","has_on_failure_callback","auto_register","fail_stop",}cls.__serialized_fields=frozenset(vars(DAG(dag_id="test",schedule=None)))-exclusion_listreturncls.__serialized_fields
[docs]defget_edge_info(self,upstream_task_id:str,downstream_task_id:str)->EdgeInfoType:"""Return edge information for the given pair of tasks or an empty edge if there is no information."""# Note - older serialized DAGs may not have edge_info being a dict at allempty=cast(EdgeInfoType,{})ifself.edge_info:returnself.edge_info.get(upstream_task_id,{}).get(downstream_task_id,empty)else:returnempty
[docs]defset_edge_info(self,upstream_task_id:str,downstream_task_id:str,info:EdgeInfoType):""" Set the given edge information on the DAG. Note that this will overwrite, rather than merge with, existing info. """self.edge_info.setdefault(upstream_task_id,{})[downstream_task_id]=info
[docs]defvalidate_schedule_and_params(self):""" Validate Param values when the DAG has schedule defined. Raise exception if there are any Params which can not be resolved by their schema definition. """ifnotself.timetable.can_be_scheduled:returntry:self.params.validate()exceptParamValidationErroraspverr:raiseAirflowException("DAG is not allowed to define a Schedule, ""if there are any required params without default values or default values are not valid.")frompverr
[docs]defiter_invalid_owner_links(self)->Iterator[tuple[str,str]]:""" Parse a given link, and verifies if it's a valid URL, or a 'mailto' link. Returns an iterator of invalid (owner, link) pairs. """forowner,linkinself.owner_links.items():result=urlsplit(link)ifresult.scheme=="mailto":# netloc is not existing for 'mailto' link, so we are checking that the path is parsedifnotresult.path:yieldresult.path,linkelifnotresult.schemeornotresult.netloc:yieldowner,link
[docs]classDagTag(Base):"""A tag name per dag, to allow quick filtering in the DAG view."""
[docs]classDagOwnerAttributes(Base):""" Table defining different owner attributes. For example, a link for an owner that will be passed as a hyperlink to the "DAGs" view. """
# The location of the file containing the DAG object# Note: Do not depend on fileloc pointing to a file; in the case of a# packaged DAG, it will point to the subpath of the DAG within the# associated zip.
def__init__(self,concurrency=None,**kwargs):super().__init__(**kwargs)ifself.max_active_tasksisNone:ifconcurrency:warnings.warn("The 'DagModel.concurrency' parameter is deprecated. Please use 'max_active_tasks'.",RemovedInAirflow3Warning,stacklevel=2,)self.max_active_tasks=concurrencyelse:self.max_active_tasks=airflow_conf.getint("core","max_active_tasks_per_dag")ifself.max_active_runsisNone:self.max_active_runs=airflow_conf.getint("core","max_active_runs_per_dag")ifself.max_consecutive_failed_dag_runsisNone:self.max_consecutive_failed_dag_runs=airflow_conf.getint("core","max_consecutive_failed_dag_runs_per_dag")ifself.has_task_concurrency_limitsisNone:# Be safe -- this will be updated later once the DAG is parsedself.has_task_concurrency_limits=True
[docs]defget_is_paused(self,*,session:Session|None=None)->bool:"""Provide interface compatibility to 'DAG'."""returnself.is_paused
[docs]defget_is_active(self,*,session:Session|None=None)->bool:"""Provide interface compatibility to 'DAG'."""returnself.is_active
@staticmethod@internal_api_call@provide_session
[docs]defget_paused_dag_ids(dag_ids:list[str],session:Session=NEW_SESSION)->set[str]:""" Given a list of dag_ids, get a set of Paused Dag Ids. :param dag_ids: List of Dag ids :param session: ORM Session :return: Paused Dag_ids """paused_dag_ids=session.execute(select(DagModel.dag_id).where(DagModel.is_paused==expression.true()).where(DagModel.dag_id.in_(dag_ids)))paused_dag_ids={paused_dag_idfor(paused_dag_id,)inpaused_dag_ids}returnpaused_dag_ids
[docs]defget_default_view(self)->str:"""Get the Default DAG View, returns the default config value if DagModel does not have a value."""# This is for backwards-compatibility with old dags that don't have None as default_viewreturnself.default_vieworairflow_conf.get_mandatory_value("webserver","dag_default_view").lower()
[docs]defrelative_fileloc(self)->pathlib.Path|None:"""File location of the importable dag 'file' relative to the configured DAGs folder."""ifself.filelocisNone:returnNonepath=pathlib.Path(self.fileloc)try:returnpath.relative_to(settings.DAGS_FOLDER)exceptValueError:# Not relative to DAGS_FOLDER.returnpath
@provide_session
[docs]defset_is_paused(self,is_paused:bool,including_subdags:bool=True,session=NEW_SESSION)->None:""" Pause/Un-pause a DAG. :param is_paused: Is the DAG paused :param including_subdags: whether to include the DAG's subdags :param session: session """filter_query=[DagModel.dag_id==self.dag_id,]ifincluding_subdags:filter_query.append(DagModel.root_dag_id==self.dag_id)session.execute(update(DagModel).where(or_(*filter_query)).values(is_paused=is_paused).execution_options(synchronize_session="fetch"))session.commit()
[docs]defdeactivate_deleted_dags(cls,alive_dag_filelocs:Container[str],processor_subdir:str,session:Session=NEW_SESSION,)->None:""" Set ``is_active=False`` on the DAGs for which the DAG files have been removed. :param alive_dag_filelocs: file paths of alive DAGs :param processor_subdir: dag processor subdir :param session: ORM Session """log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ",cls.__tablename__)dag_models=session.scalars(select(cls).where(cls.fileloc.is_not(None),or_(cls.processor_subdir.is_(None),cls.processor_subdir==processor_subdir,),))fordag_modelindag_models:ifdag_model.filelocnotinalive_dag_filelocs:dag_model.is_active=False
@classmethod
[docs]defdags_needing_dagruns(cls,session:Session)->tuple[Query,dict[str,tuple[datetime,datetime]]]:""" Return (and lock) a list of Dag objects that are due to create a new DagRun. This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. """fromairflow.models.serialized_dagimportSerializedDagModeldefdag_ready(dag_id:str,cond:BaseDataset,statuses:dict)->bool|None:# if dag was serialized before 2.9 and we *just* upgraded,# we may be dealing with old version. In that case,# just wait for the dag to be reserialized.try:returncond.evaluate(statuses)exceptAttributeError:log.warning("dag '%s' has old serialization; skipping DAG run creation.",dag_id)returnNone# this loads all the DDRQ records.... may need to limit num dagsall_records=session.scalars(select(DatasetDagRunQueue)).all()by_dag=defaultdict(list)forrinall_records:by_dag[r.target_dag_id].append(r)delall_recordsdag_statuses={}fordag_id,recordsinby_dag.items():dag_statuses[dag_id]={x.dataset.uri:Trueforxinrecords}ser_dags=session.scalars(select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))).all()forser_daginser_dags:dag_id=ser_dag.dag_idstatuses=dag_statuses[dag_id]ifnotdag_ready(dag_id,cond=ser_dag.dag.timetable.dataset_condition,statuses=statuses):delby_dag[dag_id]deldag_statuses[dag_id]deldag_statusesdataset_triggered_dag_info={}fordag_id,recordsinby_dag.items():times=sorted(x.created_atforxinrecords)dataset_triggered_dag_info[dag_id]=(times[0],times[-1])delby_dagdataset_triggered_dag_ids=set(dataset_triggered_dag_info.keys())ifdataset_triggered_dag_ids:exclusion_list=set(session.scalars(select(DagModel.dag_id).join(DagRun.dag_model).where(DagRun.state.in_((DagRunState.QUEUED,DagRunState.RUNNING))).where(DagModel.dag_id.in_(dataset_triggered_dag_ids)).group_by(DagModel.dag_id).having(func.count()>=func.max(DagModel.max_active_runs))))ifexclusion_list:dataset_triggered_dag_ids-=exclusion_listdataset_triggered_dag_info={k:vfork,vindataset_triggered_dag_info.items()ifknotinexclusion_list}# We limit so that _one_ scheduler doesn't try to do all the creation of dag runsquery=(select(cls).where(cls.is_paused==expression.false(),cls.is_active==expression.true(),cls.has_import_errors==expression.false(),or_(cls.next_dagrun_create_after<=func.now(),cls.dag_id.in_(dataset_triggered_dag_ids),),).order_by(cls.next_dagrun_create_after).limit(cls.NUM_DAGS_PER_DAGRUN_QUERY))return(session.scalars(with_row_locks(query,of=cls,session=session,skip_locked=True)),dataset_triggered_dag_info,)
[docs]defcalculate_dagrun_date_fields(self,dag:DAG,last_automated_dag_run:None|datetime|DataInterval,)->None:""" Calculate ``next_dagrun`` and `next_dagrun_create_after``. :param dag: The DAG object :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none if not yet scheduled. """last_automated_data_interval:DataInterval|Noneifisinstance(last_automated_dag_run,datetime):warnings.warn("Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. ""Provide a data interval instead.",RemovedInAirflow3Warning,stacklevel=2,)last_automated_data_interval=dag.infer_automated_data_interval(last_automated_dag_run)else:last_automated_data_interval=last_automated_dag_runnext_dagrun_info=dag.next_dagrun_info(last_automated_data_interval)ifnext_dagrun_infoisNone:self.next_dagrun_data_interval=self.next_dagrun=self.next_dagrun_create_after=Noneelse:self.next_dagrun_data_interval=next_dagrun_info.data_intervalself.next_dagrun=next_dagrun_info.logical_dateself.next_dagrun_create_after=next_dagrun_info.run_afterlog.info("Setting next_dagrun for %s to %s, run_after=%s",dag.dag_id,self.next_dagrun,self.next_dagrun_create_after,)
# NOTE: Please keep the list of arguments in sync with DAG.__init__.# Only exception: dag_id here should have a default value, but not in DAG.
[docs]defdag(dag_id:str="",description:str|None=None,schedule:ScheduleArg=NOTSET,schedule_interval:ScheduleIntervalArg=NOTSET,timetable:Timetable|None=None,start_date:datetime|None=None,end_date:datetime|None=None,full_filepath:str|None=None,template_searchpath:str|Iterable[str]|None=None,template_undefined:type[jinja2.StrictUndefined]=jinja2.StrictUndefined,user_defined_macros:dict|None=None,user_defined_filters:dict|None=None,default_args:dict|None=None,concurrency:int|None=None,max_active_tasks:int=airflow_conf.getint("core","max_active_tasks_per_dag"),max_active_runs:int=airflow_conf.getint("core","max_active_runs_per_dag"),max_consecutive_failed_dag_runs:int=airflow_conf.getint("core","max_consecutive_failed_dag_runs_per_dag"),dagrun_timeout:timedelta|None=None,sla_miss_callback:None|SLAMissCallback|list[SLAMissCallback]=None,default_view:str=airflow_conf.get_mandatory_value("webserver","dag_default_view").lower(),orientation:str=airflow_conf.get_mandatory_value("webserver","dag_orientation"),catchup:bool=airflow_conf.getboolean("scheduler","catchup_by_default"),on_success_callback:None|DagStateChangeCallback|list[DagStateChangeCallback]=None,on_failure_callback:None|DagStateChangeCallback|list[DagStateChangeCallback]=None,doc_md:str|None=None,params:abc.MutableMapping|None=None,access_control:dict[str,dict[str,Collection[str]]]|dict[str,Collection[str]]|None=None,is_paused_upon_creation:bool|None=None,jinja_environment_kwargs:dict|None=None,render_template_as_native_obj:bool=False,tags:list[str]|None=None,owner_links:dict[str,str]|None=None,auto_register:bool=True,fail_stop:bool=False,dag_display_name:str|None=None,)->Callable[[Callable],Callable[...,DAG]]:""" Python dag decorator which wraps a function into an Airflow DAG. Accepts kwargs for operator kwarg. Can be used to parameterize DAGs. :param dag_args: Arguments for DAG object :param dag_kwargs: Kwargs for DAG object. """defwrapper(f:Callable)->Callable[...,DAG]:@functools.wraps(f)deffactory(*args,**kwargs):# Generate signature for decorated function and bind the arguments when called# we do this to extract parameters, so we can annotate them on the DAG object.# In addition, this fails if we are missing any args/kwargs with TypeError as expected.f_sig=signature(f).bind(*args,**kwargs)# Apply defaults to capture default values if set.f_sig.apply_defaults()# Initialize DAG with bound argumentswithDAG(dag_idorf.__name__,description=description,schedule_interval=schedule_interval,timetable=timetable,start_date=start_date,end_date=end_date,full_filepath=full_filepath,template_searchpath=template_searchpath,template_undefined=template_undefined,user_defined_macros=user_defined_macros,user_defined_filters=user_defined_filters,default_args=default_args,concurrency=concurrency,max_active_tasks=max_active_tasks,max_active_runs=max_active_runs,max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,dagrun_timeout=dagrun_timeout,sla_miss_callback=sla_miss_callback,default_view=default_view,orientation=orientation,catchup=catchup,on_success_callback=on_success_callback,on_failure_callback=on_failure_callback,doc_md=doc_md,params=params,access_control=access_control,is_paused_upon_creation=is_paused_upon_creation,jinja_environment_kwargs=jinja_environment_kwargs,render_template_as_native_obj=render_template_as_native_obj,tags=tags,schedule=schedule,owner_links=owner_links,auto_register=auto_register,fail_stop=fail_stop,dag_display_name=dag_display_name,)asdag_obj:# Set DAG documentation from function documentation if it exists and doc_md is not set.iff.__doc__andnotdag_obj.doc_md:dag_obj.doc_md=f.__doc__# Generate DAGParam for each function arg/kwarg and replace it for calling the function.# All args/kwargs for function will be DAGParam object and replaced on execution time.f_kwargs={}forname,valueinf_sig.arguments.items():f_kwargs[name]=dag_obj.param(name,value)# set file location to caller source pathback=sys._getframe().f_backdag_obj.fileloc=back.f_code.co_filenameifbackelse""# Invoke function to create operators in the DAG scope.f(**f_kwargs)# Return dag object such that it's accessible in Globals.returndag_obj# Ensure that warnings from inside DAG() are emitted from the caller, not herefixup_decorator_warning_stack(factory)returnfactoryreturnwrapper
STATICA_HACK=Trueglobals()["kcah_acitats"[::-1].upper()]=FalseifSTATICA_HACK:# pragma: no coverfromairflow.models.serialized_dagimportSerializedDagModelDagModel.serialized_dag=relationship(SerializedDagModel)""":sphinx-autoapi-skip:"""
[docs]classDagContext:""" DAG context is used to keep the current DAG when DAG is used as ContextManager. You can use DAG as context: .. code-block:: python with DAG( dag_id="example_dag", default_args=default_args, schedule="0 0 * * *", dagrun_timeout=timedelta(minutes=60), ) as dag: ... If you do this the context stores the DAG and whenever new task is created, it will use such stored DAG as the parent DAG. """_context_managed_dags:deque[DAG]=deque()
[docs]defpop_context_managed_dag(cls)->DAG|None:dag=cls._context_managed_dags.popleft()# In a few cases around serialization we explicitly push None in to the stackifcls.current_autoregister_module_nameisnotNoneanddaganddag.auto_register:mod=sys.modules[cls.current_autoregister_module_name]cls.autoregistered_dags.add((dag,mod))returndag
def_run_inline_trigger(trigger):asyncdef_run_inline_trigger_main():asyncforeventintrigger.run():returneventreturnasyncio.run(_run_inline_trigger_main())def_run_task(*,ti:TaskInstance,inline_trigger:bool=False,mark_success:bool=False,session:Session):""" Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of extra steps used in `task.run` to keep our local running as fast as possible. This function is only meant for the `dag.test` function as a helper function. Args: ti: TaskInstance to run """log.info("[DAG TEST] starting task_id=%s map_index=%s",ti.task_id,ti.map_index)whileTrue:try:log.info("[DAG TEST] running task %s",ti)ti._run_raw_task(session=session,raise_on_defer=inline_trigger,mark_success=mark_success)breakexceptTaskDeferredase:log.info("[DAG TEST] running trigger in line")event=_run_inline_trigger(e.trigger)ti.next_method=e.method_nameti.next_kwargs={"event":event.payload}ifeventelsee.kwargslog.info("[DAG TEST] Trigger completed")session.merge(ti)session.commit()log.info("[DAG TEST] end task task_id=%s map_index=%s",ti.task_id,ti.map_index)def_get_or_create_dagrun(dag:DAG,conf:dict[Any,Any]|None,start_date:datetime,execution_date:datetime,run_id:str,session:Session,data_interval:tuple[datetime,datetime]|None=None,)->DagRun:""" Create a DAG run, replacing an existing instance if needed to prevent collisions. This function is only meant to be used by :meth:`DAG.test` as a helper function. :param dag: DAG to be used to find run. :param conf: Configuration to pass to newly created run. :param start_date: Start date of new run. :param execution_date: Logical date for finding an existing run. :param run_id: Run ID for the new DAG run. :return: The newly created DAG run. """log.info("dagrun id: %s",dag.dag_id)dr:DagRun=session.scalar(select(DagRun).where(DagRun.dag_id==dag.dag_id,DagRun.execution_date==execution_date))ifdr:session.delete(dr)session.commit()dr=dag.create_dagrun(state=DagRunState.RUNNING,execution_date=execution_date,run_id=run_id,start_date=start_dateorexecution_date,session=session,conf=conf,data_interval=data_interval,)log.info("created dagrun %s",dr)returndr