## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importcontextlibimporthashlibimportloggingimportmathimportosimportpickleimportsignalimportwarningsfromcollectionsimportdefaultdictfromdatetimeimportdatetime,timedeltafromfunctoolsimportpartialfromtempfileimportNamedTemporaryFilefromtypingimportIO,TYPE_CHECKING,Any,Callable,Dict,Iterable,List,NamedTuple,Optional,Tuple,Unionfromurllib.parseimportquoteimportdillimportjinja2importlazy_object_proxyimportpendulumfromjinja2importTemplateAssertionError,UndefinedErrorfromsqlalchemyimport(Column,Float,ForeignKeyConstraint,Index,Integer,PickleType,String,and_,func,inspect,or_,tuple_,)fromsqlalchemy.ext.associationproxyimportassociation_proxyfromsqlalchemy.ormimportreconstructor,relationshipfromsqlalchemy.orm.attributesimportNO_VALUE,set_committed_valuefromsqlalchemy.orm.sessionimportSessionfromsqlalchemy.sql.elementsimportBooleanClauseListfromsqlalchemy.sql.sqltypesimportBigIntegerfromairflowimportsettingsfromairflow.compat.functoolsimportcachefromairflow.configurationimportconffromairflow.exceptionsimport(AirflowException,AirflowFailException,AirflowNotFoundException,AirflowRescheduleException,AirflowSensorTimeout,AirflowSkipException,AirflowSmartSensorException,AirflowTaskTimeout,DagRunNotFound,TaskDeferralError,TaskDeferred,)fromairflow.models.baseimportCOLLATION_ARGS,ID_LEN,Basefromairflow.models.connectionimportConnectionfromairflow.models.logimportLogfromairflow.models.paramimportParamsDictfromairflow.models.taskfailimportTaskFailfromairflow.models.taskrescheduleimportTaskReschedulefromairflow.models.variableimportVariablefromairflow.models.xcomimportXCOM_RETURN_KEY,XComfromairflow.plugins_managerimportintegrate_macros_pluginsfromairflow.sentryimportSentryfromairflow.statsimportStatsfromairflow.ti_deps.dep_contextimportDepContextfromairflow.ti_deps.dependencies_depsimportREQUEUEABLE_DEPS,RUNNING_DEPSfromairflow.timetables.baseimportDataIntervalfromairflow.typing_compatimportLiteralfromairflow.utilsimporttimezonefromairflow.utils.emailimportsend_emailfromairflow.utils.helpersimportis_containerfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.netimportget_hostnamefromairflow.utils.operator_helpersimportcontext_to_airflow_varsfromairflow.utils.platformimportgetuserfromairflow.utils.sessionimportcreate_session,provide_sessionfromairflow.utils.sqlalchemyimportExtendedJSON,UtcDateTimefromairflow.utils.stateimportDagRunState,Statefromairflow.utils.timeoutimporttimeouttry:fromkubernetes.client.api_clientimportApiClientfromairflow.kubernetes.kube_configimportKubeConfigfromairflow.kubernetes.pod_generatorimportPodGeneratorexceptImportError:
[docs]defset_current_context(context:Context):""" Sets the current execution context to the provided context object. This method should be called once per Task execution, before calling operator.execute. """_CURRENT_CONTEXT.append(context)try:yieldcontextfinally:expected_state=_CURRENT_CONTEXT.pop()ifexpected_state!=context:log.warning("Current context is not equal to the state at context stack. Expected=%s, got=%s",context,expected_state,
)
[docs]defload_error_file(fd:IO[bytes])->Optional[Union[str,Exception]]:"""Load and return error from error file"""iffd.closed:returnNonefd.seek(0,os.SEEK_SET)data=fd.read()ifnotdata:returnNonetry:returnpickle.loads(data)exceptException:return"Failed to load task run error"
[docs]defset_error_file(error_file:str,error:Union[str,Exception])->None:"""Write error into error file by path"""withopen(error_file,"wb")asfd:try:pickle.dump(error,fd)exceptException:# local class objects cannot be pickled, so we fallback# to store the string representation insteadpickle.dump(str(error),fd)
[docs]defclear_task_instances(tis,session,activate_dag_runs=None,dag=None,dag_run_state:Union[DagRunState,Literal[False]]=DagRunState.QUEUED,):""" Clears a set of task instances, but makes sure the running ones get killed. :param tis: a list of task instances :param session: current session :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not be changed. :param dag: DAG object :param activate_dag_runs: Deprecated parameter, do not pass """job_ids=[]task_id_by_key=defaultdict(lambda:defaultdict(lambda:defaultdict(set)))fortiintis:ifti.state==State.RUNNING:ifti.job_id:# If a task is cleared when running, set its state to RESTARTING so that# the task is terminated and becomes eligible for retry.ti.state=State.RESTARTINGjob_ids.append(ti.job_id)else:task_id=ti.task_idifdaganddag.has_task(task_id):task=dag.get_task(task_id)ti.refresh_from_task(task)task_retries=task.retriesti.max_tries=ti.try_number+task_retries-1else:# Ignore errors when updating max_tries if dag is None or# task not found in dag since database records could be# outdated. We make max_tries the maximum value of its# original max_tries or the last attempted try number.ti.max_tries=max(ti.max_tries,ti.prev_attempted_tries)ti.state=State.NONEti.external_executor_id=Nonesession.merge(ti)task_id_by_key[ti.dag_id][ti.run_id][ti.try_number].add(ti.task_id)iftask_id_by_key:# Clear all reschedules related to the ti to clear# This is an optimization for the common case where all tis are for a small number# of dag_id, run_id and try_number. Use a nested dict of dag_id,# run_id, try_number and task_id to construct the where clause in a# hierarchical manner. This speeds up the delete statement by more than 40x for# large number of tis (50k+).conditions=or_(and_(TR.dag_id==dag_id,or_(and_(TR.run_id==run_id,or_(and_(TR.try_number==try_number,TR.task_id.in_(task_ids))fortry_number,task_idsintask_tries.items()),)forrun_id,task_triesinrun_ids.items()),)fordag_id,run_idsintask_id_by_key.items())delete_qry=TR.__table__.delete().where(conditions)session.execute(delete_qry)ifjob_ids:fromairflow.jobs.base_jobimportBaseJobforjobinsession.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():job.state=State.RESTARTINGifactivate_dag_runsisnotNone:warnings.warn("`activate_dag_runs` parameter to clear_task_instances function is deprecated. ""Please use `dag_run_state`",DeprecationWarning,stacklevel=2,)ifnotactivate_dag_runs:dag_run_state=Falseifdag_run_stateisnotFalseandtis:fromairflow.models.dagrunimportDagRun# Avoid circular importrun_ids_by_dag_id=defaultdict(set)forinstanceintis:run_ids_by_dag_id[instance.dag_id].add(instance.run_id)drs=(session.query(DagRun).filter(or_(and_(DagRun.dag_id==dag_id,DagRun.run_id.in_(run_ids))fordag_id,run_idsinrun_ids_by_dag_id.items())).all())fordrindrs:dr.state=dag_run_statedr.start_date=timezone.utcnow()ifdag_run_state==State.QUEUED:dr.last_scheduling_decision=Nonedr.start_date=None
[docs]classTaskInstanceKey(NamedTuple):"""Key used to identify task instance."""
[docs]defprimary(self)->Tuple[str,str,str]:"""Return task instance primary key part of the key"""returnself.dag_id,self.task_id,self.run_id
@property
[docs]defreduced(self)->'TaskInstanceKey':"""Remake the key by subtracting 1 from try number to match in memory information"""returnTaskInstanceKey(self.dag_id,self.task_id,self.run_id,max(1,self.try_number-1))
[docs]defwith_try_number(self,try_number:int)->'TaskInstanceKey':"""Returns TaskInstanceKey with provided ``try_number``"""returnTaskInstanceKey(self.dag_id,self.task_id,self.run_id,try_number)
@property
[docs]defkey(self)->"TaskInstanceKey":"""For API-compatibly with TaskInstance. Returns self """returnself
[docs]classTaskInstance(Base,LoggingMixin):""" Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the state they are in. The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions. Database transactions on this table should insure double triggers and any confusion around what task instances are or aren't ready to run even while multiple schedulers may be firing task instances. """
def__init__(self,task,execution_date:Optional[datetime]=None,run_id:str=None,state:Optional[str]=None):super().__init__()self.dag_id=task.dag_idself.task_id=task.task_idself.refresh_from_task(task)self._log=logging.getLogger("airflow.task")ifrun_idisNoneandexecution_dateisnotNone:fromairflow.models.dagrunimportDagRun# Avoid circular importwarnings.warn("Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",DeprecationWarning,# Stack level is 4 because SQLA adds some wrappers around the constructorstacklevel=4,)# make sure we have a localized execution_date stored in UTCifexecution_dateandnottimezone.is_localized(execution_date):self.log.warning("execution date %s has no timezone information. Using default from dag or system",execution_date,)ifself.task.has_dag():execution_date=timezone.make_aware(execution_date,self.task.dag.timezone)else:execution_date=timezone.make_aware(execution_date)execution_date=timezone.convert_to_utc(execution_date)withcreate_session()assession:run_id=(session.query(DagRun.run_id).filter_by(dag_id=self.dag_id,execution_date=execution_date).scalar())ifrun_idisNone:raiseDagRunNotFound(f"DagRun for {self.dag_id!r} with date {execution_date} not found")fromNoneself.run_id=run_idself.try_number=0self.unixname=getuser()ifstate:self.state=stateself.hostname=''self.init_on_load()# Is this TaskInstance being currently running within `airflow tasks run --raw`.# Not persisted to the database so only valid for the current processself.raw=False# can be changed when calling 'run'self.test_mode=False@reconstructor
[docs]definit_on_load(self):"""Initialize the attributes that aren't stored in the DB"""self.test_mode=False# can be changed when calling 'run'
@property
[docs]deftry_number(self):""" Return the try number that this task number will be when it is actually run. If the TaskInstance is currently running, this will match the column in the database, in all other cases this will be incremented. """# This is designed so that task logs end up in the right file.# TODO: whether we need sensing here or not (in sensor and task_instance state machine)ifself.stateinState.running:returnself._try_numberreturnself._try_number+1
[docs]defprev_attempted_tries(self):""" Based on this instance's try_number, this will calculate the number of previously attempted tries, defaulting to 0. """# Expose this for the Task Tries and Gantt graph views.# Using `try_number` throws off the counts for non-running tasks.# Also useful in error logging contexts to get# the try number for the last try that was attempted.# https://issues.apache.org/jira/browse/AIRFLOW-2143returnself._try_number
@property
[docs]defnext_try_number(self):"""Setting Next Try Number"""returnself._try_number+1
[docs]defcommand_as_list(self,mark_success=False,ignore_all_deps=False,ignore_task_deps=False,ignore_depends_on_past=False,ignore_ti_state=False,local=False,pickle_id=None,raw=False,job_id=None,pool=None,cfg_path=None,):""" Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator. """dag:Union["DAG","DagModel"]# Use the dag if we have it, else fallback to the ORM dag_model, which might not be loadedifhasattr(self,'task')andhasattr(self.task,'dag'):dag=self.task.dagelse:dag=self.dag_modelshould_pass_filepath=notpickle_idanddagpath=Noneifshould_pass_filepath:ifdag.is_subdag:path=dag.parent_dag.relative_filelocelse:path=dag.relative_filelocifpath:ifnotpath.is_absolute():path='DAGS_FOLDER'/pathpath=str(path)returnTaskInstance.generate_command(self.dag_id,self.task_id,run_id=self.run_id,mark_success=mark_success,ignore_all_deps=ignore_all_deps,ignore_task_deps=ignore_task_deps,ignore_depends_on_past=ignore_depends_on_past,ignore_ti_state=ignore_ti_state,local=local,pickle_id=pickle_id,file_path=path,raw=raw,job_id=job_id,pool=pool,cfg_path=cfg_path,
)@staticmethod
[docs]defgenerate_command(dag_id:str,task_id:str,run_id:str=None,mark_success:bool=False,ignore_all_deps:bool=False,ignore_depends_on_past:bool=False,ignore_task_deps:bool=False,ignore_ti_state:bool=False,local:bool=False,pickle_id:Optional[int]=None,file_path:Optional[str]=None,raw:bool=False,job_id:Optional[str]=None,pool:Optional[str]=None,cfg_path:Optional[str]=None,)->List[str]:""" Generates the shell command required to execute this task instance. :param dag_id: DAG ID :type dag_id: str :param task_id: Task ID :type task_id: str :param run_id: The run_id of this task's DagRun :type run_id: datetime :param mark_success: Whether to mark the task as successful :type mark_success: bool :param ignore_all_deps: Ignore all ignorable dependencies. Overrides the other ignore_* parameters. :type ignore_all_deps: bool :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for Backfills) :type ignore_depends_on_past: bool :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and trigger rule :type ignore_task_deps: bool :param ignore_ti_state: Ignore the task instance's previous failure/success :type ignore_ti_state: bool :param local: Whether to run the task locally :type local: bool :param pickle_id: If the DAG was serialized to the DB, the ID associated with the pickled DAG :type pickle_id: Optional[int] :param file_path: path to the file containing the DAG definition :type file_path: Optional[str] :param raw: raw mode (needs more details) :type raw: Optional[bool] :param job_id: job ID (needs more details) :type job_id: Optional[int] :param pool: the Airflow pool that the task should run in :type pool: Optional[str] :param cfg_path: the Path to the configuration file :type cfg_path: Optional[str] :return: shell command that can be used to run the task instance :rtype: list[str] """cmd=["airflow","tasks","run",dag_id,task_id,run_id]ifmark_success:cmd.extend(["--mark-success"])ifpickle_id:cmd.extend(["--pickle",str(pickle_id)])ifjob_id:cmd.extend(["--job-id",str(job_id)])ifignore_all_deps:cmd.extend(["--ignore-all-dependencies"])ifignore_task_deps:cmd.extend(["--ignore-dependencies"])ifignore_depends_on_past:cmd.extend(["--ignore-depends-on-past"])ifignore_ti_state:cmd.extend(["--force"])iflocal:cmd.extend(["--local"])ifpool:cmd.extend(["--pool",pool])ifraw:cmd.extend(["--raw"])iffile_path:cmd.extend(["--subdir",file_path])ifcfg_path:cmd.extend(["--cfg-path",cfg_path])returncmd
@property
[docs]deflog_url(self):"""Log URL for TaskInstance"""iso=quote(self.execution_date.isoformat())base_url=conf.get('webserver','BASE_URL')returnbase_url+f"/log?execution_date={iso}&task_id={self.task_id}&dag_id={self.dag_id}"
@property
[docs]defmark_success_url(self):"""URL to mark TI success"""iso=quote(self.execution_date.isoformat())base_url=conf.get('webserver','BASE_URL')returnbase_url+("/confirm"
[docs]defcurrent_state(self,session=None)->str:""" Get the very latest state from the database, if a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used. :param session: SQLAlchemy ORM Session :type session: Session """ti=(session.query(TaskInstance).filter(TaskInstance.dag_id==self.dag_id,TaskInstance.task_id==self.task_id,TaskInstance.run_id==self.run_id,).all())ifti:state=ti[0].stateelse:state=Nonereturnstate
@provide_session
[docs]deferror(self,session=None):""" Forces the task instance's state to FAILED in the database. :param session: SQLAlchemy ORM Session :type session: Session """self.log.error("Recording the task instance as FAILED")self.state=State.FAILEDsession.merge(self)session.commit()
@provide_session
[docs]defrefresh_from_db(self,session=None,lock_for_update=False)->None:""" Refreshes the task instance from the database based on the primary key :param session: SQLAlchemy ORM Session :type session: Session :param lock_for_update: if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed. :type lock_for_update: bool """self.log.debug("Refreshing TaskInstance %s from DB",self)qry=session.query(TaskInstance).filter(TaskInstance.dag_id==self.dag_id,TaskInstance.task_id==self.task_id,TaskInstance.run_id==self.run_id,)iflock_for_update:ti:Optional[TaskInstance]=qry.with_for_update().first()else:ti=qry.first()ifti:# Fields ordered per model definitionself.start_date=ti.start_dateself.end_date=ti.end_dateself.duration=ti.durationself.state=ti.state# Get the raw value of try_number column, don't read through the# accessor here otherwise it will be incremented by one already.self.try_number=ti._try_numberself.max_tries=ti.max_triesself.hostname=ti.hostnameself.unixname=ti.unixnameself.job_id=ti.job_idself.pool=ti.poolself.pool_slots=ti.pool_slotsor1self.queue=ti.queueself.priority_weight=ti.priority_weightself.operator=ti.operatorself.queued_dttm=ti.queued_dttmself.queued_by_job_id=ti.queued_by_job_idself.pid=ti.pidself.executor_config=ti.executor_configself.external_executor_id=ti.external_executor_idself.trigger_id=ti.trigger_idself.next_method=ti.next_methodself.next_kwargs=ti.next_kwargselse:self.state=Noneself.log.debug("Refreshed TaskInstance %s",self)
[docs]defrefresh_from_task(self,task,pool_override=None):""" Copy common attributes from the given task. :param task: The task object to copy from :type task: airflow.models.BaseOperator :param pool_override: Use the pool_override instead of task's pool :type pool_override: str """self.task=taskself.queue=task.queueself.pool=pool_overrideortask.poolself.pool_slots=task.pool_slotsself.priority_weight=task.priority_weight_totalself.run_as_user=task.run_as_userself.max_tries=task.retriesself.executor_config=task.executor_configself.operator=task.task_type
@provide_session
[docs]defclear_xcom_data(self,session=None):""" Clears all XCom data from the database for the task instance :param session: SQLAlchemy ORM Session :type session: Session """self.log.debug("Clearing XCom data")XCom.clear(dag_id=self.dag_id,task_id=self.task_id,execution_date=self.execution_date,session=session,)self.log.debug("XCom data cleared")
@property
[docs]defkey(self)->TaskInstanceKey:"""Returns a tuple that identifies the task instance uniquely"""returnTaskInstanceKey(self.dag_id,self.task_id,self.run_id,self.try_number)
@provide_session
[docs]defset_state(self,state:str,session=None):""" Set TaskInstance state. :param state: State to set for the TI :type state: str :param session: SQLAlchemy ORM Session :type session: Session """current_time=timezone.utcnow()self.log.debug("Setting task state for %s to %s",self,state)self.state=stateself.start_date=self.start_dateorcurrent_timeifself.stateinState.finishedorself.state==State.UP_FOR_RETRY:self.end_date=self.end_dateorcurrent_timeself.duration=(self.end_date-self.start_date).total_seconds()session.merge(self)
@property
[docs]defis_premature(self):""" Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed. """# is the task still in the retry waiting period?returnself.state==State.UP_FOR_RETRYandnotself.ready_for_retry()
@provide_session
[docs]defare_dependents_done(self,session=None):""" Checks whether the immediate dependents of this task instance have succeeded or have been skipped. This is meant to be used by wait_for_downstream. This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table. :param session: SQLAlchemy ORM Session :type session: Session """task=self.taskifnottask.downstream_task_ids:returnTrueti=session.query(func.count(TaskInstance.task_id)).filter(TaskInstance.dag_id==self.dag_id,TaskInstance.task_id.in_(task.downstream_task_ids),TaskInstance.run_id==self.run_id,TaskInstance.state.in_([State.SKIPPED,State.SUCCESS]),)count=ti[0][0]returncount==len(task.downstream_task_ids)
@provide_session
[docs]defget_previous_dagrun(self,state:Optional[str]=None,session:Optional[Session]=None,)->Optional["DagRun"]:"""The DagRun that ran before this task instance's DagRun. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session. """dag=self.task.dagifdagisNone:returnNonedr=self.get_dagrun(session=session)# LEGACY: most likely running from unit testsifnotdr:# Means that this TaskInstance is NOT being run from a DR, but from a catchuptry:# XXX: This uses DAG internals, but as the outer comment# said, the block is only reached for legacy reasons for# development code, so that's OK-ish.schedule=dag.timetable._scheduleexceptAttributeError:returnNonedt=pendulum.instance(self.execution_date)returnTaskInstance(task=self.task,execution_date=schedule.get_prev(dt),)dr.dag=dag# We always ignore schedule in dagrun lookup when `state` is given# or the DAG is never scheduled. For legacy reasons, when# `catchup=True`, we use `get_previous_scheduled_dagrun` unless# `ignore_schedule` is `True`.ignore_schedule=stateisnotNoneornotdag.timetable.can_runifdag.catchupisTrueandnotignore_schedule:last_dagrun=dr.get_previous_scheduled_dagrun(session=session)else:last_dagrun=dr.get_previous_dagrun(session=session,state=state)iflast_dagrun:returnlast_dagrunreturnNone
@provide_session
[docs]defget_previous_ti(self,state:Optional[str]=None,session:Session=None)->Optional['TaskInstance']:""" The task instance for the task that ran before this task instance. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session """dagrun=self.get_previous_dagrun(state,session=session)ifdagrunisNone:returnNonereturndagrun.get_task_instance(self.task_id,session=session)
@property
[docs]defprevious_ti(self):""" This attribute is deprecated. Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. """warnings.warn(""" This attribute is deprecated. Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. """,DeprecationWarning,stacklevel=2,)returnself.get_previous_ti()
@property
[docs]defprevious_ti_success(self)->Optional['TaskInstance']:""" This attribute is deprecated. Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. """warnings.warn(""" This attribute is deprecated. Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. """,DeprecationWarning,stacklevel=2,)returnself.get_previous_ti(state=State.SUCCESS)
@provide_session
[docs]defget_previous_execution_date(self,state:Optional[str]=None,session:Session=None,)->Optional[pendulum.DateTime]:""" The execution date from property previous_ti_success. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session """self.log.debug("previous_execution_date was called")prev_ti=self.get_previous_ti(state=state,session=session)returnprev_tiandpendulum.instance(prev_ti.execution_date)
@provide_session
[docs]defget_previous_start_date(self,state:Optional[str]=None,session:Session=None)->Optional[pendulum.DateTime]:""" The start date from property previous_ti_success. :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session """self.log.debug("previous_start_date was called")prev_ti=self.get_previous_ti(state=state,session=session)# prev_ti may not exist and prev_ti.start_date may be None.returnprev_tiandprev_ti.start_dateandpendulum.instance(prev_ti.start_date)
@property
[docs]defprevious_start_date_success(self)->Optional[pendulum.DateTime]:""" This attribute is deprecated. Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method. """warnings.warn(""" This attribute is deprecated. Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method. """,DeprecationWarning,stacklevel=2,)returnself.get_previous_start_date(state=State.SUCCESS)
@provide_session
[docs]defare_dependencies_met(self,dep_context=None,session=None,verbose=False):""" Returns whether or not all the conditions are met for this task instance to be run given the context for the dependencies (e.g. a task instance being force run from the UI will ignore some dependencies). :param dep_context: The execution context that determines the dependencies that should be evaluated. :type dep_context: DepContext :param session: database session :type session: sqlalchemy.orm.session.Session :param verbose: whether log details on failed dependencies on info or debug log level :type verbose: bool """dep_context=dep_contextorDepContext()failed=Falseverbose_aware_logger=self.log.infoifverboseelseself.log.debugfordep_statusinself.get_failed_dep_statuses(dep_context=dep_context,session=session):failed=Trueverbose_aware_logger("Dependencies not met for %s, dependency '%s' FAILED: %s",self,dep_status.dep_name,dep_status.reason,)iffailed:returnFalseverbose_aware_logger("Dependencies all met for %s",self)returnTrue
[docs]defnext_retry_datetime(self):""" Get datetime of the next retry if the task instance fails. For exponential backoff, retry_delay is used as base and will be converted to seconds. """delay=self.task.retry_delayifself.task.retry_exponential_backoff:# If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,# we must round up prior to converting to an int, otherwise a divide by zero error# will occur in the modded_hash calculation.min_backoff=int(math.ceil(delay.total_seconds()*(2**(self.try_number-2))))# In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1.# To address this, we impose a lower bound of 1 on min_backoff. This effectively makes# the ceiling function unnecessary, but the ceiling function was retained to avoid# introducing a breaking change.ifmin_backoff<1:min_backoff=1# deterministic per task instanceti_hash=int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,self.task_id,self.execution_date,self.try_number).encode('utf-8')).hexdigest(),16,)# between 1 and 1.0 * delay * (2^retry_number)modded_hash=min_backoff+ti_hash%min_backoff# timedelta has a maximum representable value. The exponentiation# here means this value can be exceeded after a certain number# of tries (around 50 if the initial delay is 1s, even fewer if# the delay is larger). Cap the value here before creating a# timedelta object so the operation doesn't fail.delay_backoff_in_seconds=min(modded_hash,timedelta.max.total_seconds()-1)delay=timedelta(seconds=delay_backoff_in_seconds)ifself.task.max_retry_delay:delay=min(self.task.max_retry_delay,delay)returnself.end_date+delay
[docs]defready_for_retry(self):""" Checks on whether the task instance is in the right state and timeframe to be retried. """returnself.state==State.UP_FOR_RETRYandself.next_retry_datetime()<timezone.utcnow()
@provide_session
[docs]defget_dagrun(self,session:Session=None):""" Returns the DagRun for this TaskInstance :param session: SQLAlchemy ORM Session :return: DagRun """info=inspect(self)ifinfo.attrs.dag_run.loaded_valueisnotNO_VALUE:returnself.dag_runfromairflow.models.dagrunimportDagRun# Avoid circular importdr=session.query(DagRun).filter(DagRun.dag_id==self.dag_id,DagRun.run_id==self.run_id).one()# Record it in the instance for next time. This means that `self.execution_date` will work correctlyset_committed_value(self,'dag_run',dr)returndr
@provide_session
[docs]defcheck_and_change_state_before_execution(self,verbose:bool=True,ignore_all_deps:bool=False,ignore_depends_on_past:bool=False,ignore_task_deps:bool=False,ignore_ti_state:bool=False,mark_success:bool=False,test_mode:bool=False,job_id:Optional[str]=None,pool:Optional[str]=None,external_executor_id:Optional[str]=None,session=None,)->bool:""" Checks dependencies and then sets state to RUNNING if they are met. Returns True if and only if state is set to RUNNING, which implies that task should be executed, in preparation for _run_raw_task :param verbose: whether to turn on more verbose logging :type verbose: bool :param ignore_all_deps: Ignore all of the non-critical dependencies, just runs :type ignore_all_deps: bool :param ignore_depends_on_past: Ignore depends_on_past DAG attribute :type ignore_depends_on_past: bool :param ignore_task_deps: Don't check the dependencies of this TaskInstance's task :type ignore_task_deps: bool :param ignore_ti_state: Disregards previous task instance state :type ignore_ti_state: bool :param mark_success: Don't run the task, mark its state as success :type mark_success: bool :param test_mode: Doesn't record success or failure in the DB :type test_mode: bool :param job_id: Job (BackfillJob / LocalTaskJob / SchedulerJob) ID :type job_id: str :param pool: specifies the pool to use to run the task instance :type pool: str :param external_executor_id: The identifier of the celery executor :type external_executor_id: str :param session: SQLAlchemy ORM Session :type session: Session :return: whether the state was changed to running or not :rtype: bool """task=self.taskself.refresh_from_task(task,pool_override=pool)self.test_mode=test_modeself.refresh_from_db(session=session,lock_for_update=True)self.job_id=job_idself.hostname=get_hostname()self.pid=Noneifnotignore_all_depsandnotignore_ti_stateandself.state==State.SUCCESS:Stats.incr('previously_succeeded',1,1)# TODO: Logging needs cleanup, not clear what is being printedhr_line_break="\n"+("-"*80)# Line breakifnotmark_success:# Firstly find non-runnable and non-requeueable tis.# Since mark_success is not set, we do nothing.non_requeueable_dep_context=DepContext(deps=RUNNING_DEPS-REQUEUEABLE_DEPS,ignore_all_deps=ignore_all_deps,ignore_ti_state=ignore_ti_state,ignore_depends_on_past=ignore_depends_on_past,ignore_task_deps=ignore_task_deps,)ifnotself.are_dependencies_met(dep_context=non_requeueable_dep_context,session=session,verbose=True):session.commit()returnFalse# For reporting purposes, we report based on 1-indexed,# not 0-indexed lists (i.e. Attempt 1 instead of# Attempt 0 for the first attempt).# Set the task start date. In case it was re-scheduled use the initial# start date that is recorded in task_reschedule tableself.start_date=timezone.utcnow()ifself.state==State.UP_FOR_RESCHEDULE:task_reschedule:TR=TR.query_for_task_instance(self,session=session).first()iftask_reschedule:self.start_date=task_reschedule.start_date# Secondly we find non-runnable but requeueable tis. We reset its state.# This is because we might have hit concurrency limits,# e.g. because of backfilling.dep_context=DepContext(deps=REQUEUEABLE_DEPS,ignore_all_deps=ignore_all_deps,ignore_depends_on_past=ignore_depends_on_past,ignore_task_deps=ignore_task_deps,ignore_ti_state=ignore_ti_state,)ifnotself.are_dependencies_met(dep_context=dep_context,session=session,verbose=True):self.state=State.NONEself.log.warning(hr_line_break)self.log.warning("Rescheduling due to concurrency limits reached ""at task runtime. Attempt %s of ""%s. State set to NONE.",self.try_number,self.max_tries+1,)self.log.warning(hr_line_break)self.queued_dttm=timezone.utcnow()session.merge(self)session.commit()returnFalse# print status messageself.log.info(hr_line_break)self.log.info("Starting attempt %s of %s",self.try_number,self.max_tries+1)self.log.info(hr_line_break)self._try_number+=1ifnottest_mode:session.add(Log(State.RUNNING,self))self.state=State.RUNNINGself.external_executor_id=external_executor_idself.end_date=Noneifnottest_mode:session.merge(self)session.commit()# Closing all pooled connections to prevent# "max number of connections reached"settings.engine.dispose()# type: ignoreifverbose:ifmark_success:self.log.info("Marking success for %s on %s",self.task,self.execution_date)else:self.log.info("Executing %s on %s",self.task,self.execution_date)returnTrue
def_date_or_empty(self,attr:str):result=getattr(self,attr,None)# type: datetimereturnresult.strftime('%Y%m%dT%H%M%S')ifresultelse''def_log_state(self,lead_msg:str=''):self.log.info('%sMarking task as %s.'+' dag_id=%s, task_id=%s,'+' execution_date=%s, start_date=%s, end_date=%s',lead_msg,self.state.upper(),self.dag_id,self.task_id,self._date_or_empty('execution_date'),self._date_or_empty('start_date'),self._date_or_empty('end_date'),)# Ensure we unset next_method and next_kwargs to ensure that any# retries don't re-use them.
[docs]defclear_next_method_args(self):self.log.debug("Clearing next_method and next_kwargs.")self.next_method=Noneself.next_kwargs=None
@provide_session@Sentry.enrich_errorsdef_run_raw_task(self,mark_success:bool=False,test_mode:bool=False,job_id:Optional[str]=None,pool:Optional[str]=None,error_file:Optional[str]=None,session=None,)->None:""" Immediately runs the task (without checking or changing db state before execution) and then sets the appropriate final state after completion and runs any post-execute callbacks. Meant to be called only after another function changes the state to running. :param mark_success: Don't run the task, mark its state as success :type mark_success: bool :param test_mode: Doesn't record success or failure in the DB :type test_mode: bool :param pool: specifies the pool to use to run the task instance :type pool: str :param session: SQLAlchemy ORM Session :type session: Session """self.test_mode=test_modeself.refresh_from_task(self.task,pool_override=pool)self.refresh_from_db(session=session)self.job_id=job_idself.hostname=get_hostname()self.pid=os.getpid()ifnottest_mode:session.merge(self)session.commit()actual_start_date=timezone.utcnow()Stats.incr(f'ti.start.{self.task.dag_id}.{self.task.task_id}')try:ifnotmark_success:self.task=self.task.prepare_for_execution()context=self.get_template_context(ignore_param_exceptions=False)self._execute_task_with_callbacks(context)ifnottest_mode:self.refresh_from_db(lock_for_update=True,session=session)self.state=State.SUCCESSexceptTaskDeferredasdefer:# The task has signalled it wants to defer execution based on# a trigger.self._defer_task(defer=defer,session=session)self.log.info('Pausing task as DEFERRED. dag_id=%s, task_id=%s, execution_date=%s, start_date=%s',self.dag_id,self.task_id,self._date_or_empty('execution_date'),self._date_or_empty('start_date'),)ifnottest_mode:session.add(Log(self.state,self))session.merge(self)session.commit()returnexceptAirflowSmartSensorExceptionase:self.log.info(e)returnexceptAirflowSkipExceptionase:# Recording SKIP# log only if exception has any arguments to prevent log floodingife.args:self.log.info(e)ifnottest_mode:self.refresh_from_db(lock_for_update=True,session=session)self.state=State.SKIPPEDexceptAirflowRescheduleExceptionasreschedule_exception:self._handle_reschedule(actual_start_date,reschedule_exception,test_mode,session=session)session.commit()returnexcept(AirflowFailException,AirflowSensorTimeout)ase:# If AirflowFailException is raised, task should not retry.# If a sensor in reschedule mode reaches timeout, task should not retry.self.handle_failure(e,test_mode,force_fail=True,error_file=error_file,session=session)session.commit()raiseexceptAirflowExceptionase:ifnottest_mode:self.refresh_from_db(lock_for_update=True,session=session)# for case when task is marked as success/failed externally# or dagrun timed out and task is marked as skipped# current behavior doesn't hit the callbacksifself.stateinState.finished:self.clear_next_method_args()session.merge(self)session.commit()returnelse:self.handle_failure(e,test_mode,error_file=error_file,session=session)session.commit()raiseexcept(Exception,KeyboardInterrupt)ase:self.handle_failure(e,test_mode,error_file=error_file,session=session)session.commit()raisefinally:Stats.incr(f'ti.finish.{self.task.dag_id}.{self.task.task_id}.{self.state}')# Recording SKIPPED or SUCCESSself.clear_next_method_args()self.end_date=timezone.utcnow()self._log_state()self.set_duration()ifnottest_mode:session.add(Log(self.state,self))session.merge(self)session.commit()def_execute_task_with_callbacks(self,context):"""Prepare Task for Execution"""fromairflow.models.renderedtifieldsimportRenderedTaskInstanceFieldsdefsignal_handler(signum,frame):self.log.error("Received SIGTERM. Terminating subprocesses.")self.task.on_kill()raiseAirflowException("Task received SIGTERM signal")signal.signal(signal.SIGTERM,signal_handler)# Don't clear Xcom until the task is certain to executeself.clear_xcom_data()withStats.timer(f'dag.{self.task.dag_id}.{self.task.task_id}.duration'):self.render_templates(context=context)RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self,render_templates=False))RenderedTaskInstanceFields.delete_old_records(self.task_id,self.dag_id)# Export context to make it available for operators to use.airflow_context_vars=context_to_airflow_vars(context,in_env_var_format=True)self.log.info("Exporting the following env vars:\n%s",'\n'.join(f"{k}={v}"fork,vinairflow_context_vars.items()),)os.environ.update(airflow_context_vars)# Run pre_execute callbackself.task.pre_execute(context=context)# Run on_execute callbackself._run_execute_callback(context,self.task)ifself.task.is_smart_sensor_compatible():# Try to register it in the smart sensor service.registered=Falsetry:registered=self.task.register_in_sensor_service(self,context)exceptException:self.log.warning("Failed to register in sensor service."" Continue to run task in non smart sensor mode.",exc_info=True,)ifregistered:# Will raise AirflowSmartSensorException to avoid long running execution.self._update_ti_state_for_sensing()# Execute the taskwithset_current_context(context):result=self._execute_task(context,self.task)# Run post_execute callbackself.task.post_execute(context=context,result=result)Stats.incr(f'operator_successes_{self.task.task_type}',1,1)Stats.incr('ti_successes')@provide_sessiondef_update_ti_state_for_sensing(self,session=None):self.log.info('Submitting %s to sensor service',self)self.state=State.SENSINGself.start_date=timezone.utcnow()session.merge(self)session.commit()# Raise exception for sensing stateraiseAirflowSmartSensorException("Task successfully registered in smart sensor.")def_execute_task(self,context,task_copy):"""Executes Task (optionally with a Timeout) and pushes Xcom results"""# If the task has been deferred and is being executed due to a trigger,# then we need to pick the right method to come back to, otherwise# we go for the default executeexecute_callable=task_copy.executeifself.next_method:# __fail__ is a special signal value for next_method that indicates# this task was scheduled specifically to fail.ifself.next_method=="__fail__":next_kwargs=self.next_kwargsor{}raiseTaskDeferralError(next_kwargs.get("error","Unknown"))# Grab the callable off the Operator/Task and add in any kwargsexecute_callable=getattr(task_copy,self.next_method)ifself.next_kwargs:execute_callable=partial(execute_callable,**self.next_kwargs)# If a timeout is specified for the task, make it fail# if it goes beyondiftask_copy.execution_timeout:# If we are coming in with a next_method (i.e. from a deferral),# calculate the timeout from our start_date.ifself.next_method:timeout_seconds=(task_copy.execution_timeout-(timezone.utcnow()-self.start_date)).total_seconds()else:timeout_seconds=task_copy.execution_timeout.total_seconds()try:# It's possible we're already timed out, so fast-fail if trueiftimeout_seconds<=0:raiseAirflowTaskTimeout()# Run task in timeout wrapperwithtimeout(timeout_seconds):result=execute_callable(context=context)exceptAirflowTaskTimeout:task_copy.on_kill()raiseelse:result=execute_callable(context=context)# If the task returns a result, push an XCom containing itiftask_copy.do_xcom_pushandresultisnotNone:self.xcom_push(key=XCOM_RETURN_KEY,value=result)returnresult@provide_sessiondef_defer_task(self,session,defer:TaskDeferred):""" Marks the task as deferred and sets up the trigger that is needed to resume it. """fromairflow.models.triggerimportTrigger# First, make the trigger entrytrigger_row=Trigger.from_object(defer.trigger)session.add(trigger_row)session.flush()# Then, update ourselves so it matches the deferral requestself.state=State.DEFERREDself.trigger_id=trigger_row.idself.next_method=defer.method_nameself.next_kwargs=defer.kwargsor{}# Decrement try number so the next one is the same tryself._try_number-=1# Calculate timeout too if it was passedifdefer.timeoutisnotNone:self.trigger_timeout=timezone.utcnow()+defer.timeoutelse:self.trigger_timeout=None# If an execution_timeout is set, set the timeout to the minimum of# it and the trigger timeoutexecution_timeout=self.task.execution_timeoutifexecution_timeout:ifself.trigger_timeout:self.trigger_timeout=min(self.start_date+execution_timeout,self.trigger_timeout)else:self.trigger_timeout=self.start_date+execution_timeoutdef_run_execute_callback(self,context:Context,task):"""Functions that need to be run before a Task is executed"""try:iftask.on_execute_callback:task.on_execute_callback(context)exceptException:self.log.exception("Failed when executing execute callback")def_run_finished_callback(self,error:Optional[Union[str,Exception]]=None)->None:""" Call callback defined for finished state change. NOTE: Only invoke this function from caller of self._run_raw_task or self.run """ifself.state==State.FAILED:task=self.taskiftask.on_failure_callbackisnotNone:context=self.get_template_context()context["exception"]=errortry:task.on_failure_callback(context)exceptException:self.log.exception("Error when executing on_failure_callback")elifself.state==State.SUCCESS:task=self.taskiftask.on_success_callbackisnotNone:context=self.get_template_context()try:task.on_success_callback(context)exceptException:self.log.exception("Error when executing on_success_callback")elifself.state==State.UP_FOR_RETRY:task=self.taskiftask.on_retry_callbackisnotNone:context=self.get_template_context()context["exception"]=errortry:task.on_retry_callback(context)exceptException:self.log.exception("Error when executing on_retry_callback")@provide_session
[docs]defdry_run(self):"""Only Renders Templates for the TI"""task=self.tasktask_copy=task.prepare_for_execution()self.task=task_copyself.render_templates()task_copy.dry_run()
@provide_sessiondef_handle_reschedule(self,actual_start_date,reschedule_exception,test_mode=False,session=None):# Don't record reschedule request in test modeiftest_mode:returnself.refresh_from_db(session)self.end_date=timezone.utcnow()self.set_duration()# Log reschedule requestsession.add(TaskReschedule(self.task,self.run_id,self._try_number,actual_start_date,self.end_date,reschedule_exception.reschedule_date,))# set stateself.state=State.UP_FOR_RESCHEDULE# Decrement try_number so subsequent runs will use the same try number and write# to same log file.self._try_number-=1self.clear_next_method_args()session.merge(self)session.commit()self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')@provide_session
[docs]defhandle_failure(self,error:Union[str,Exception],test_mode:Optional[bool]=None,force_fail:bool=False,error_file:Optional[str]=None,session=None,)->None:"""Handle Failure for the TaskInstance"""iftest_modeisNone:test_mode=self.test_modeiferror:ifisinstance(error,Exception):self.log.error("Task failed with exception",exc_info=error)else:self.log.error("%s",error)# external monitoring process provides pickle file so _run_raw_task# can send its runtime errors for access by failure callbackiferror_file:set_error_file(error_file,error)ifnottest_mode:self.refresh_from_db(session)task=self.taskself.end_date=timezone.utcnow()self.set_duration()Stats.incr(f'operator_failures_{task.task_type}',1,1)Stats.incr('ti_failures')ifnottest_mode:session.add(Log(State.FAILED,self))# Log failure durationdag_run=self.get_dagrun(session=session)# self.dag_run not populated by refresh_from_dbsession.add(TaskFail(task,dag_run.execution_date,self.start_date,self.end_date))self.clear_next_method_args()# Set state correctly and figure out how to log it and decide whether# to email# Note, callback invocation needs to be handled by caller of# _run_raw_task to avoid race conditions which could lead to duplicate# invocations or miss invocation.# Since this function is called only when the TaskInstance state is running,# try_number contains the current try_number (not the next). We# only mark task instance as FAILED if the next task instance# try_number exceeds the max_tries ... or if force_fail is truthyifforce_failornotself.is_eligible_to_retry():self.state=State.FAILEDemail_for_state=task.email_on_failureelse:ifself.state==State.QUEUED:# We increase the try_number so as to fail the task if it fails to start after sometimeself._try_number+=1self.state=State.UP_FOR_RETRYemail_for_state=task.email_on_retryself._log_state('Immediate failure requested. 'ifforce_failelse'')ifemail_for_stateandtask.email:try:self.email_alert(error)exceptException:self.log.exception('Failed to send email to: %s',task.email)ifnottest_mode:session.merge(self)session.flush()
[docs]defis_eligible_to_retry(self):"""Is task instance is eligible for retry"""ifself.state==State.RESTARTING:# If a task is cleared when running, it goes into RESTARTING state and is always# eligible for retryreturnTruereturnself.task.retriesandself.try_number<=self.max_tries
[docs]defget_template_context(self,session:Session=None,ignore_param_exceptions:bool=True)->Context:"""Return TI Context"""# Do not use provide_session here -- it expunges everything on exit!ifnotsession:session=settings.Session()task=self.taskdag:DAG=task.dagfromairflowimportmacrosintegrate_macros_plugins()dag_run=self.get_dagrun(session)data_interval=dag.get_run_data_interval(dag_run)params=ParamsDict(suppress_exception=ignore_param_exceptions)withcontextlib.suppress(AttributeError):params.update(dag.params)iftask.params:params.update(task.params)ifconf.getboolean('core','dag_run_conf_overrides_params'):self.overwrite_params_with_dag_run_conf(params=params,dag_run=dag_run)logical_date=timezone.coerce_datetime(self.execution_date)ds=logical_date.strftime('%Y-%m-%d')ds_nodash=ds.replace('-','')ts=logical_date.isoformat()ts_nodash=logical_date.strftime('%Y%m%dT%H%M%S')ts_nodash_with_tz=ts.replace('-','').replace(':','')# Now validates Params and convert them into a simple dicttask.params=params.validate()@cache# Prevent multiple database access.def_get_previous_dagrun_success()->Optional["DagRun"]:returnself.get_previous_dagrun(state=State.SUCCESS,session=session)def_get_previous_dagrun_data_interval_success()->Optional["DataInterval"]:dagrun=_get_previous_dagrun_success()ifdagrunisNone:returnNonereturndag.get_run_data_interval(dagrun)defget_prev_data_interval_start_success()->Optional[pendulum.DateTime]:data_interval=_get_previous_dagrun_data_interval_success()ifdata_intervalisNone:returnNonereturndata_interval.startdefget_prev_data_interval_end_success()->Optional[pendulum.DateTime]:data_interval=_get_previous_dagrun_data_interval_success()ifdata_intervalisNone:returnNonereturndata_interval.enddefget_prev_start_date_success()->Optional[pendulum.DateTime]:dagrun=_get_previous_dagrun_success()ifdagrunisNone:returnNonereturntimezone.coerce_datetime(dagrun.start_date)# Custom accessors.classVariableAccessor:""" Wrapper around Variable. This way you can get variables in templates by using ``{{ var.value.variable_name }}`` or ``{{ var.value.get('variable_name', 'fallback') }}``. """def__init__(self):self.var=Nonedef__getattr__(self,item:str,):self.var=Variable.get(item)returnself.vardef__repr__(self):returnstr(self.var)@staticmethoddefget(item:str,default_var:Any=Variable._Variable__NO_DEFAULT_SENTINEL,):"""Get Airflow Variable value"""returnVariable.get(item,default_var=default_var)classVariableJsonAccessor:""" Wrapper around Variable. This way you can get variables in templates by using ``{{ var.json.variable_name }}`` or ``{{ var.json.get('variable_name', {'fall': 'back'}) }}``. """def__init__(self):self.var=Nonedef__getattr__(self,item:str,):self.var=Variable.get(item,deserialize_json=True)returnself.vardef__repr__(self):returnstr(self.var)@staticmethoddefget(item:str,default_var:Any=Variable._Variable__NO_DEFAULT_SENTINEL,):"""Get Airflow Variable after deserializing JSON value"""returnVariable.get(item,default_var=default_var,deserialize_json=True)classConnectionAccessor:""" Wrapper around Connection. This way you can get connections in templates by using ``{{ conn.conn_id }}`` or ``{{ conn.get('conn_id') }}``. """def__getattr__(self,item:str,):returnConnection.get_connection_from_secrets(item)@staticmethoddefget(item:str,default_conn:Any=None,):"""Get Airflow Connection value"""try:returnConnection.get_connection_from_secrets(item)exceptAirflowNotFoundException:returndefault_conn# Create lazy proxies for deprecated stuff.defdeprecated_proxy(func:Callable[[],Any],*,key:str,replacements:Optional[List[str]]=None,)->lazy_object_proxy.Proxy:defdeprecated_func():message=(f"Accessing {key!r} from the template is deprecated and "f"will be removed in a future version.")ifreplacements:display_except_last=", ".join(repr(r)forrinreplacements[:-1])ifdisplay_except_last:message+=f" Please use {display_except_last} or {replacements[-1]!r} instead."else:message+=f" Please use {replacements[-1]!r} instead."warnings.warn(message,DeprecationWarning)returnfunc()returnlazy_object_proxy.Proxy(deprecated_func)@cachedefget_yesterday_ds()->str:return(self.execution_date-timedelta(1)).strftime('%Y-%m-%d')defget_yesterday_ds_nodash()->str:returnget_yesterday_ds().replace('-','')@cachedefget_tomorrow_ds()->str:return(self.execution_date+timedelta(1)).strftime('%Y-%m-%d')defget_tomorrow_ds_nodash()->str:returnget_tomorrow_ds().replace('-','')@cachedefget_next_execution_date()->Optional[pendulum.DateTime]:# For manually triggered dagruns that aren't run on a schedule,# next/previous execution dates don't make sense, and should be set# to execution date for consistency with how execution_date is set# for manually triggered tasks, i.e. triggered_date == execution_date.ifdag_run.external_trigger:next_execution_date=dag_run.execution_dateelse:withwarnings.catch_warnings():warnings.simplefilter("ignore",DeprecationWarning)next_execution_date=dag.following_schedule(self.execution_date)ifnext_execution_dateisNone:returnNonereturntimezone.coerce_datetime(next_execution_date)defget_next_ds()->Optional[str]:execution_date=get_next_execution_date()ifexecution_dateisNone:returnNonereturnexecution_date.strftime('%Y-%m-%d')defget_next_ds_nodash()->Optional[str]:ds=get_next_ds()ifdsisNone:returndsreturnds.replace('-','')@cachedefget_prev_execution_date():ifdag_run.external_trigger:returntimezone.coerce_datetime(self.execution_date)withwarnings.catch_warnings():warnings.simplefilter("ignore",DeprecationWarning)returndag.previous_schedule(self.execution_date)@cachedefget_prev_ds()->Optional[str]:execution_date=get_prev_execution_date()ifexecution_dateisNone:returnNonereturnexecution_date.strftime(r'%Y-%m-%d')defget_prev_ds_nodash()->Optional[str]:prev_ds=get_prev_ds()ifprev_dsisNone:returnNonereturnprev_ds.replace('-','')return{'conf':conf,'dag':dag,'dag_run':dag_run,'data_interval_end':timezone.coerce_datetime(data_interval.end),'data_interval_start':timezone.coerce_datetime(data_interval.start),'ds':ds,'ds_nodash':ds_nodash,'execution_date':deprecated_proxy(lambda:logical_date,key='execution_date',replacements=['logical_date','data_interval_start'],),'inlets':task.inlets,'logical_date':logical_date,'macros':macros,'next_ds':deprecated_proxy(get_next_ds,key="next_ds",replacements=["data_interval_end | ds"]),'next_ds_nodash':deprecated_proxy(get_next_ds_nodash,key="next_ds_nodash",replacements=["data_interval_end | ds_nodash"],),'next_execution_date':deprecated_proxy(get_next_execution_date,key='next_execution_date',replacements=['data_interval_end'],),'outlets':task.outlets,'params':task.params,'prev_data_interval_start_success':lazy_object_proxy.Proxy(get_prev_data_interval_start_success),'prev_data_interval_end_success':lazy_object_proxy.Proxy(get_prev_data_interval_end_success),'prev_ds':deprecated_proxy(get_prev_ds,key="prev_ds"),'prev_ds_nodash':deprecated_proxy(get_prev_ds_nodash,key="prev_ds_nodash"),'prev_execution_date':deprecated_proxy(get_prev_execution_date,key='prev_execution_date'),'prev_execution_date_success':deprecated_proxy(lambda:self.get_previous_execution_date(state=State.SUCCESS,session=session),key='prev_execution_date_success',replacements=['prev_data_interval_start_success'],),'prev_start_date_success':lazy_object_proxy.Proxy(get_prev_start_date_success),'run_id':self.run_id,'task':task,'task_instance':self,'task_instance_key_str':f"{task.dag_id}__{task.task_id}__{ds_nodash}",'test_mode':self.test_mode,'ti':self,'tomorrow_ds':deprecated_proxy(get_tomorrow_ds,key='tomorrow_ds'),'tomorrow_ds_nodash':deprecated_proxy(get_tomorrow_ds_nodash,key='tomorrow_ds_nodash'),'ts':ts,'ts_nodash':ts_nodash,'ts_nodash_with_tz':ts_nodash_with_tz,'var':{'json':VariableJsonAccessor(),'value':VariableAccessor(),},'conn':ConnectionAccessor(),'yesterday_ds':deprecated_proxy(get_yesterday_ds,key='yesterday_ds'),'yesterday_ds_nodash':deprecated_proxy(get_yesterday_ds_nodash,key='yesterday_ds_nodash'),
}@provide_session
[docs]defget_rendered_template_fields(self,session=None):"""Fetch rendered template fields from DB"""fromairflow.models.renderedtifieldsimportRenderedTaskInstanceFieldsrendered_task_instance_fields=RenderedTaskInstanceFields.get_templated_fields(self,session=session)ifrendered_task_instance_fields:forfield_name,rendered_valueinrendered_task_instance_fields.items():setattr(self.task,field_name,rendered_value)else:try:self.render_templates()except(TemplateAssertionError,UndefinedError)ase:raiseAirflowException("Webserver does not have access to User-defined Macros or Filters ""when Dag Serialization is enabled. Hence for the task that have not yet ""started running, please use 'airflow tasks render' for debugging the ""rendering of template_fields.")frome
@provide_session
[docs]defget_rendered_k8s_spec(self,session=None):"""Fetch rendered template fields from DB"""fromairflow.models.renderedtifieldsimportRenderedTaskInstanceFieldsrendered_k8s_spec=RenderedTaskInstanceFields.get_k8s_pod_yaml(self,session=session)ifnotrendered_k8s_spec:try:rendered_k8s_spec=self.render_k8s_pod_yaml()except(TemplateAssertionError,UndefinedError)ase:raiseAirflowException(f"Unable to render a k8s spec for this taskinstance: {e}")fromereturnrendered_k8s_spec
[docs]defoverwrite_params_with_dag_run_conf(self,params,dag_run):"""Overwrite Task Params with DagRun.conf"""ifdag_runanddag_run.conf:self.log.debug("Updating task params (%s) with DagRun.conf (%s)",params,dag_run.conf)params.update(dag_run.conf)
[docs]defrender_templates(self,context:Optional[Context]=None)->None:"""Render templates in the operator fields."""ifnotcontext:context=self.get_template_context()self.task.render_template_fields(context)
[docs]defrender_k8s_pod_yaml(self)->Optional[dict]:"""Render k8s pod yaml"""fromairflow.kubernetes.kubernetes_helper_functionsimportcreate_pod_id# Circular importkube_config=KubeConfig()pod=PodGenerator.construct_pod(dag_id=self.dag_id,task_id=self.task_id,pod_id=create_pod_id(self.dag_id,self.task_id),try_number=self.try_number,kube_image=kube_config.kube_image,date=self.execution_date,args=self.command_as_list(),pod_override_object=PodGenerator.from_obj(self.executor_config),scheduler_job_id="worker-config",namespace=kube_config.executor_namespace,base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),)settings.pod_mutation_hook(pod)sanitized_pod=ApiClient().sanitize_for_serialization(pod)returnsanitized_pod
[docs]defget_email_subject_content(self,exception):"""Get the email subject content for exceptions."""# For a ti from DB (without ti.task), return the default value# Reuse it for smart sensor to send default email alertuse_default=nothasattr(self,'task')exception_html=str(exception).replace('\n','<br>')default_subject='Airflow alert: {{ti}}'# For reporting purposes, we report based on 1-indexed,# not 0-indexed lists (i.e. Try 1 instead of# Try 0 for the first attempt).default_html_content=('Try {{try_number}} out of {{max_tries + 1}}<br>''Exception:<br>{{exception_html}}<br>''Log: <a href="{{ti.log_url}}">Link</a><br>''Host: {{ti.hostname}}<br>''Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>')default_html_content_err=('Try {{try_number}} out of {{max_tries + 1}}<br>''Exception:<br>Failed attempt to attach error logs<br>''Log: <a href="{{ti.log_url}}">Link</a><br>''Host: {{ti.hostname}}<br>''Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>')ifuse_default:jinja_context={'ti':self}# This function is called after changing the state# from State.RUNNING so need to subtract 1 from self.try_number.jinja_context.update(dict(exception=exception,exception_html=exception_html,try_number=self.try_number-1,max_tries=self.max_tries,))jinja_env=jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.dirname(__file__)),autoescape=True)subject=jinja_env.from_string(default_subject).render(**jinja_context)html_content=jinja_env.from_string(default_html_content).render(**jinja_context)html_content_err=jinja_env.from_string(default_html_content_err).render(**jinja_context)else:jinja_context=self.get_template_context()jinja_context.update(dict(exception=exception,exception_html=exception_html,try_number=self.try_number-1,max_tries=self.max_tries,))jinja_env=self.task.get_template_env()defrender(key,content):ifconf.has_option('email',key):path=conf.get('email',key)withopen(path)asf:content=f.read()returnjinja_env.from_string(content).render(**jinja_context)subject=render('subject_template',default_subject)html_content=render('html_content_template',default_html_content)html_content_err=render('html_content_template',default_html_content_err)returnsubject,html_content,html_content_err
[docs]defemail_alert(self,exception):"""Send alert email with exception information."""subject,html_content,html_content_err=self.get_email_subject_content(exception)try:send_email(self.task.email,subject,html_content)exceptException:send_email(self.task.email,subject,html_content_err)
[docs]defset_duration(self)->None:"""Set TI duration"""ifself.end_dateandself.start_date:self.duration=(self.end_date-self.start_date).total_seconds()else:self.duration=Noneself.log.debug("Task Duration set to %s",self.duration)
@provide_session
[docs]defxcom_push(self,key:str,value:Any,execution_date:Optional[datetime]=None,session:Session=None,)->None:""" Make an XCom available for tasks to pull. :param key: A key for the XCom :type key: str :param value: A value for the XCom. The value is pickled and stored in the database. :type value: any picklable object :param execution_date: if provided, the XCom will not be visible until this date. This can be used, for example, to send a message to a task on a future date without it being immediately visible. :type execution_date: datetime :param session: Sqlalchemy ORM Session :type session: Session """self_execution_date=self.get_dagrun(session).execution_dateifexecution_dateandexecution_date<self_execution_date:raiseValueError('execution_date can not be in the past (current ''execution_date is {}; received {})'.format(self_execution_date,execution_date))XCom.set(key=key,value=value,task_id=self.task_id,dag_id=self.dag_id,execution_date=execution_dateorself_execution_date,session=session,
)@provide_session
[docs]defxcom_pull(self,task_ids:Optional[Union[str,Iterable[str]]]=None,dag_id:Optional[str]=None,key:str=XCOM_RETURN_KEY,include_prior_dates:bool=False,session:Session=None,)->Any:""" Pull XComs that optionally meet certain criteria. The default value for `key` limits the search to XComs that were returned by other tasks (as opposed to those that were pushed manually). To remove this filter, pass key=None (or any desired value). If a single task_id string is provided, the result is the value of the most recent matching XCom from that task_id. If multiple task_ids are provided, a tuple of matching values is returned. None is returned whenever no matches are found. :param key: A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as a constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass key=None. :type key: str :param task_ids: Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter. :type task_ids: str or iterable of strings (representing task_ids) :param dag_id: If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used. :type dag_id: str :param include_prior_dates: If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well. :type include_prior_dates: bool :param session: Sqlalchemy ORM Session :type session: Session """ifdag_idisNone:dag_id=self.dag_idexecution_date=self.get_dagrun(session).execution_datequery=XCom.get_many(execution_date=execution_date,key=key,dag_ids=dag_id,task_ids=task_ids,include_prior_dates=include_prior_dates,session=session,)# Since we're only fetching the values field, and not the# whole class, the @recreate annotation does not kick in.# Therefore we need to deserialize the fields by ourselves.ifis_container(task_ids):vals_kv={result.task_id:XCom.deserialize_value(result)forresultinquery.with_entities(XCom.task_id,XCom.value)}values_ordered_by_id=[vals_kv.get(task_id)fortask_idintask_ids]returnvalues_ordered_by_idelse:xcom=query.with_entities(XCom.value).first()ifxcom:returnXCom.deserialize_value(xcom)
@provide_session
[docs]defget_num_running_task_instances(self,session):"""Return Number of running TIs from the DB"""# .count() is inefficientreturn(session.query(func.count()).filter(TaskInstance.dag_id==self.dag_id,TaskInstance.task_id==self.task_id,TaskInstance.state==State.RUNNING,
).scalar())
[docs]definit_run_context(self,raw=False):"""Sets the log context."""self.raw=rawself._set_context(self)
@staticmethod
[docs]deffilter_for_tis(tis:Iterable[Union["TaskInstance",TaskInstanceKey]])->Optional[BooleanClauseList]:"""Returns SQLAlchemy filter to query selected task instances"""# DictKeys type, (what we often pass here from the scheduler) is not directly indexable :(# Or it might be a generator, but we need to be able to iterate over it more than oncetis=list(tis)ifnottis:returnNonefirst=tis[0]dag_id=first.dag_idrun_id=first.run_idfirst_task_id=first.task_id# Common path optimisations: when all TIs are for the same dag_id and run_id, or same dag_id# and task_id -- this can be over 150x for huge numbers of TIs (20k+)ifall(t.dag_id==dag_idandt.run_id==run_idfortintis):returnand_(TaskInstance.dag_id==dag_id,TaskInstance.run_id==run_id,TaskInstance.task_id.in_(t.task_idfortintis),)ifall(t.dag_id==dag_idandt.task_id==first_task_idfortintis):returnand_(TaskInstance.dag_id==dag_id,TaskInstance.run_id.in_(t.run_idfortintis),TaskInstance.task_id==first_task_id,)ifsettings.Session.bind.dialect.name=='mssql':returnor_(and_(TaskInstance.dag_id==ti.dag_id,TaskInstance.task_id==ti.task_id,TaskInstance.run_id==ti.run_id,)fortiintis)else:returntuple_(TaskInstance.dag_id,TaskInstance.task_id,TaskInstance.run_id).in_([ti.key.primaryfortiintis]
)# State of the task instance.# Stores string version of the task state.
[docs]classSimpleTaskInstance:""" Simplified Task Instance. Used to send data between processes via Queues. """def__init__(self,ti:TaskInstance):self._dag_id:str=ti.dag_idself._task_id:str=ti.task_idself._run_id:datetime=ti.run_idself._start_date:datetime=ti.start_dateself._end_date:datetime=ti.end_dateself._try_number:int=ti.try_numberself._state:str=ti.stateself._executor_config:Any=ti.executor_configself._run_as_user:Optional[str]=Noneifhasattr(ti,'run_as_user'):self._run_as_user=ti.run_as_userself._pool:str=ti.poolself._priority_weight:Optional[int]=Noneifhasattr(ti,'priority_weight'):self._priority_weight=ti.priority_weightself._queue:str=ti.queueself._key=ti.key@property