## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcollections.abcimportcontextlibimportinspectimportitertoolsimportjsonimportloggingimportpickleimportwarningsfromfunctoolsimportcached_property,wrapsfromtypingimportTYPE_CHECKING,Any,Generator,Iterable,cast,overloadimportattrfromdeprecatedimportdeprecatedfromsqlalchemyimport(Column,ForeignKeyConstraint,Index,Integer,LargeBinary,PrimaryKeyConstraint,String,delete,text,)fromsqlalchemy.dialects.mysqlimportLONGBLOBfromsqlalchemy.ext.associationproxyimportassociation_proxyfromsqlalchemy.ormimportQuery,reconstructor,relationshipfromsqlalchemy.orm.excimportNoResultFoundfromairflowimportsettingsfromairflow.api_internal.internal_api_callimportinternal_api_callfromairflow.configurationimportconffromairflow.exceptionsimportRemovedInAirflow3Warningfromairflow.models.baseimportCOLLATION_ARGS,ID_LEN,TaskInstanceDependenciesfromairflow.utilsimporttimezonefromairflow.utils.helpersimportexactly_one,is_containerfromairflow.utils.jsonimportXComDecoder,XComEncoderfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromairflow.utils.sqlalchemyimportUtcDateTime# XCom constants below are needed for providers backward compatibility,# which should import the constants directly after apache-airflow>=2.6.0fromairflow.utils.xcomimport(MAX_XCOM_SIZE,# noqa: F401XCOM_RETURN_KEY,)
[docs]__table_args__=(# Ideally we should create a unique index over (key, dag_id, task_id, run_id),# but it goes over MySQL's index length limit. So we instead index 'key'# separately, and enforce uniqueness with DagRun.id instead.Index("idx_xcom_key",key),Index("idx_xcom_task_instance",dag_id,task_id,run_id,map_index),PrimaryKeyConstraint("dag_run_id","task_id","map_index","key",name="xcom_pkey"),ForeignKeyConstraint([dag_id,task_id,run_id,map_index],["task_instance.dag_id","task_instance.task_id","task_instance.run_id","task_instance.map_index",],name="xcom_task_instance_fkey",ondelete="CASCADE",),)
[docs]definit_on_load(self):""" Execute after the instance has been loaded from the DB or otherwise reconstituted; called by the ORM. i.e automatically deserialize Xcom value when loading from DB. """self.value=self.orm_deserialize_value()
@overload@classmethoddefset(cls,key:str,value:Any,*,dag_id:str,task_id:str,run_id:str,map_index:int=-1,session:Session=NEW_SESSION,)->None:"""Store an XCom value. A deprecated form of this function accepts ``execution_date`` instead of ``run_id``. The two arguments are mutually exclusive. :param key: Key to store the XCom. :param value: XCom value to store. :param dag_id: DAG ID. :param task_id: Task ID. :param run_id: DAG run ID for the task. :param map_index: Optional map index to assign XCom for a mapped task. The default is ``-1`` (set for a non-mapped task). :param session: Database session. If not given, a new session will be created for this function. """@overload@classmethoddefset(cls,key:str,value:Any,task_id:str,dag_id:str,execution_date:datetime.datetime,session:Session=NEW_SESSION,)->None:"""Store an XCom value. :sphinx-autoapi-skip: """@classmethod@provide_sessiondefset(cls,key:str,value:Any,task_id:str,dag_id:str,execution_date:datetime.datetime|None=None,session:Session=NEW_SESSION,*,run_id:str|None=None,map_index:int=-1,)->None:"""Store an XCom value. :sphinx-autoapi-skip: """fromairflow.models.dagrunimportDagRunifnotexactly_one(execution_dateisnotNone,run_idisnotNone):raiseValueError(f"Exactly one of run_id or execution_date must be passed. "f"Passed execution_date={execution_date}, run_id={run_id}")ifrun_idisNone:message="Passing 'execution_date' to 'XCom.set()' is deprecated. Use 'run_id' instead."warnings.warn(message,RemovedInAirflow3Warning,stacklevel=3)try:dag_run_id,run_id=(session.query(DagRun.id,DagRun.run_id).filter(DagRun.dag_id==dag_id,DagRun.execution_date==execution_date).one())exceptNoResultFound:raiseValueError(f"DAG run not found on DAG {dag_id!r} at {execution_date}")fromNoneelse:dag_run_id=session.query(DagRun.id).filter_by(dag_id=dag_id,run_id=run_id).scalar()ifdag_run_idisNone:raiseValueError(f"DAG run not found on DAG {dag_id!r} with ID {run_id!r}")# Seamlessly resolve LazyXComAccess to a list. This is intended to work# as a "lazy list" to avoid pulling a ton of XComs unnecessarily, but if# it's pushed into XCom, the user should be aware of the performance# implications, and this avoids leaking the implementation detail.ifisinstance(value,LazyXComAccess):warning_message=("Coercing mapped lazy proxy %s from task %s (DAG %s, run %s) ""to list, which may degrade performance. Review resource ""requirements for this operation, and call list() to suppress ""this message. See Dynamic Task Mapping documentation for ""more information about lazy proxy objects.")log.warning(warning_message,"return value"ifkey==XCOM_RETURN_KEYelsef"value {key}",task_id,dag_id,run_idorexecution_date,)value=list(value)value=cls.serialize_value(value=value,key=key,task_id=task_id,dag_id=dag_id,run_id=run_id,map_index=map_index,)# Remove duplicate XComs and insert a new one.session.execute(delete(cls).where(cls.key==key,cls.run_id==run_id,cls.task_id==task_id,cls.dag_id==dag_id,cls.map_index==map_index,))new=cast(Any,cls)(# Work around Mypy complaining model not defining '__init__'.dag_run_id=dag_run_id,key=key,value=value,run_id=run_id,task_id=task_id,dag_id=dag_id,map_index=map_index,)session.add(new)session.flush()@staticmethod@provide_session@internal_api_call
[docs]defget_value(*,ti_key:TaskInstanceKey,key:str|None=None,session:Session=NEW_SESSION,)->Any:"""Retrieve an XCom value for a task instance. This method returns "full" XCom values (i.e. uses ``deserialize_value`` from the XCom backend). Use :meth:`get_many` if you want the "shortened" value via ``orm_deserialize_value``. If there are no results, *None* is returned. If multiple XCom entries match the criteria, an arbitrary one is returned. :param ti_key: The TaskInstanceKey to look up the XCom for. :param key: A key for the XCom. If provided, only XCom with matching keys will be returned. Pass *None* (default) to remove the filter. :param session: Database session. If not given, a new session will be created for this function. """returnBaseXCom.get_one(key=key,task_id=ti_key.task_id,dag_id=ti_key.dag_id,run_id=ti_key.run_id,map_index=ti_key.map_index,session=session,)
@overload@staticmethod@internal_api_calldefget_one(*,key:str|None=None,dag_id:str|None=None,task_id:str|None=None,run_id:str|None=None,map_index:int|None=None,session:Session=NEW_SESSION,)->Any|None:"""Retrieve an XCom value, optionally meeting certain criteria. This method returns "full" XCom values (i.e. uses ``deserialize_value`` from the XCom backend). Use :meth:`get_many` if you want the "shortened" value via ``orm_deserialize_value``. If there are no results, *None* is returned. If multiple XCom entries match the criteria, an arbitrary one is returned. A deprecated form of this function accepts ``execution_date`` instead of ``run_id``. The two arguments are mutually exclusive. .. seealso:: ``get_value()`` is a convenience function if you already have a structured TaskInstance or TaskInstanceKey object available. :param run_id: DAG run ID for the task. :param dag_id: Only pull XCom from this DAG. Pass *None* (default) to remove the filter. :param task_id: Only XCom from task with matching ID will be pulled. Pass *None* (default) to remove the filter. :param map_index: Only XCom from task with matching ID will be pulled. Pass *None* (default) to remove the filter. :param key: A key for the XCom. If provided, only XCom with matching keys will be returned. Pass *None* (default) to remove the filter. :param include_prior_dates: If *False* (default), only XCom from the specified DAG run is returned. If *True*, the latest matching XCom is returned regardless of the run it belongs to. :param session: Database session. If not given, a new session will be created for this function. """@overload@staticmethod@internal_api_calldefget_one(execution_date:datetime.datetime,key:str|None=None,task_id:str|None=None,dag_id:str|None=None,include_prior_dates:bool=False,session:Session=NEW_SESSION,)->Any|None:"""Retrieve an XCom value, optionally meeting certain criteria. :sphinx-autoapi-skip: """@staticmethod@provide_session@internal_api_call@deprecateddefget_one(execution_date:datetime.datetime|None=None,key:str|None=None,task_id:str|None=None,dag_id:str|None=None,include_prior_dates:bool=False,session:Session=NEW_SESSION,*,run_id:str|None=None,map_index:int|None=None,)->Any|None:"""Retrieve an XCom value, optionally meeting certain criteria. :sphinx-autoapi-skip: """ifnotexactly_one(execution_dateisnotNone,run_idisnotNone):raiseValueError("Exactly one of run_id or execution_date must be passed")ifrun_id:query=BaseXCom.get_many(run_id=run_id,key=key,task_ids=task_id,dag_ids=dag_id,map_indexes=map_index,include_prior_dates=include_prior_dates,limit=1,session=session,)elifexecution_dateisnotNone:message="Passing 'execution_date' to 'XCom.get_one()' is deprecated. Use 'run_id' instead."warnings.warn(message,RemovedInAirflow3Warning,stacklevel=3)withwarnings.catch_warnings():warnings.simplefilter("ignore",RemovedInAirflow3Warning)query=BaseXCom.get_many(execution_date=execution_date,key=key,task_ids=task_id,dag_ids=dag_id,map_indexes=map_index,include_prior_dates=include_prior_dates,limit=1,session=session,)else:raiseRuntimeError("Should not happen?")result=query.with_entities(BaseXCom.value).first()ifresult:returnXCom.deserialize_value(result)returnNone@overload@staticmethoddefget_many(*,run_id:str,key:str|None=None,task_ids:str|Iterable[str]|None=None,dag_ids:str|Iterable[str]|None=None,map_indexes:int|Iterable[int]|None=None,include_prior_dates:bool=False,limit:int|None=None,session:Session=NEW_SESSION,)->Query:"""Composes a query to get one or more XCom entries. This function returns an SQLAlchemy query of full XCom objects. If you just want one stored value, use :meth:`get_one` instead. A deprecated form of this function accepts ``execution_date`` instead of ``run_id``. The two arguments are mutually exclusive. :param run_id: DAG run ID for the task. :param key: A key for the XComs. If provided, only XComs with matching keys will be returned. Pass *None* (default) to remove the filter. :param task_ids: Only XComs from task with matching IDs will be pulled. Pass *None* (default) to remove the filter. :param dag_ids: Only pulls XComs from specified DAGs. Pass *None* (default) to remove the filter. :param map_indexes: Only XComs from matching map indexes will be pulled. Pass *None* (default) to remove the filter. :param include_prior_dates: If *False* (default), only XComs from the specified DAG run are returned. If *True*, all matching XComs are returned regardless of the run it belongs to. :param session: Database session. If not given, a new session will be created for this function. :param limit: Limiting returning XComs """@overload@staticmethod@internal_api_calldefget_many(execution_date:datetime.datetime,key:str|None=None,task_ids:str|Iterable[str]|None=None,dag_ids:str|Iterable[str]|None=None,map_indexes:int|Iterable[int]|None=None,include_prior_dates:bool=False,limit:int|None=None,session:Session=NEW_SESSION,)->Query:"""Composes a query to get one or more XCom entries. :sphinx-autoapi-skip: """@staticmethod@provide_session@internal_api_calldefget_many(execution_date:datetime.datetime|None=None,key:str|None=None,task_ids:str|Iterable[str]|None=None,dag_ids:str|Iterable[str]|None=None,map_indexes:int|Iterable[int]|None=None,include_prior_dates:bool=False,limit:int|None=None,session:Session=NEW_SESSION,*,run_id:str|None=None,)->Query:"""Composes a query to get one or more XCom entries. :sphinx-autoapi-skip: """fromairflow.models.dagrunimportDagRunifnotexactly_one(execution_dateisnotNone,run_idisnotNone):raiseValueError(f"Exactly one of run_id or execution_date must be passed. "f"Passed execution_date={execution_date}, run_id={run_id}")ifexecution_dateisnotNone:message="Passing 'execution_date' to 'XCom.get_many()' is deprecated. Use 'run_id' instead."warnings.warn(message,RemovedInAirflow3Warning,stacklevel=3)query=session.query(BaseXCom).join(BaseXCom.dag_run)ifkey:query=query.filter(BaseXCom.key==key)ifis_container(task_ids):query=query.filter(BaseXCom.task_id.in_(task_ids))eliftask_idsisnotNone:query=query.filter(BaseXCom.task_id==task_ids)ifis_container(dag_ids):query=query.filter(BaseXCom.dag_id.in_(dag_ids))elifdag_idsisnotNone:query=query.filter(BaseXCom.dag_id==dag_ids)ifisinstance(map_indexes,range)andmap_indexes.step==1:query=query.filter(BaseXCom.map_index>=map_indexes.start,BaseXCom.map_index<map_indexes.stop)elifis_container(map_indexes):query=query.filter(BaseXCom.map_index.in_(map_indexes))elifmap_indexesisnotNone:query=query.filter(BaseXCom.map_index==map_indexes)ifinclude_prior_dates:ifexecution_dateisnotNone:query=query.filter(DagRun.execution_date<=execution_date)else:dr=session.query(DagRun.execution_date).filter(DagRun.run_id==run_id).subquery()query=query.filter(BaseXCom.execution_date<=dr.c.execution_date)elifexecution_dateisnotNone:query=query.filter(DagRun.execution_date==execution_date)else:query=query.filter(BaseXCom.run_id==run_id)query=query.order_by(DagRun.execution_date.desc(),BaseXCom.timestamp.desc())iflimit:returnquery.limit(limit)returnquery@classmethod@provide_session
[docs]defdelete(cls,xcoms:XCom|Iterable[XCom],session:Session)->None:"""Delete one or multiple XCom entries."""ifisinstance(xcoms,XCom):xcoms=[xcoms]forxcominxcoms:ifnotisinstance(xcom,XCom):raiseTypeError(f"Expected XCom; received {xcom.__class__.__name__}")XCom.purge(xcom,session)session.delete(xcom)session.commit()
@staticmethod
[docs]defpurge(xcom:XCom,session:Session)->None:"""Purge an XCom entry from underlying storage implementations."""pass
@overload@staticmethod@internal_api_calldefclear(*,dag_id:str,task_id:str,run_id:str,map_index:int|None=None,session:Session=NEW_SESSION,)->None:"""Clear all XCom data from the database for the given task instance. A deprecated form of this function accepts ``execution_date`` instead of ``run_id``. The two arguments are mutually exclusive. :param dag_id: ID of DAG to clear the XCom for. :param task_id: ID of task to clear the XCom for. :param run_id: ID of DAG run to clear the XCom for. :param map_index: If given, only clear XCom from this particular mapped task. The default ``None`` clears *all* XComs from the task. :param session: Database session. If not given, a new session will be created for this function. """@overload@staticmethod@internal_api_calldefclear(execution_date:pendulum.DateTime,dag_id:str,task_id:str,session:Session=NEW_SESSION,)->None:"""Clear all XCom data from the database for the given task instance. :sphinx-autoapi-skip: """@staticmethod@provide_session@internal_api_calldefclear(execution_date:pendulum.DateTime|None=None,dag_id:str|None=None,task_id:str|None=None,session:Session=NEW_SESSION,*,run_id:str|None=None,map_index:int|None=None,)->None:"""Clear all XCom data from the database for the given task instance. :sphinx-autoapi-skip: """fromairflow.modelsimportDagRun# Given the historic order of this function (execution_date was first argument) to add a new optional# param we need to add default values for everything :(ifdag_idisNone:raiseTypeError("clear() missing required argument: dag_id")iftask_idisNone:raiseTypeError("clear() missing required argument: task_id")ifnotexactly_one(execution_dateisnotNone,run_idisnotNone):raiseValueError(f"Exactly one of run_id or execution_date must be passed. "f"Passed execution_date={execution_date}, run_id={run_id}")ifexecution_dateisnotNone:message="Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."warnings.warn(message,RemovedInAirflow3Warning,stacklevel=3)run_id=(session.query(DagRun.run_id).filter(DagRun.dag_id==dag_id,DagRun.execution_date==execution_date).scalar())query=session.query(BaseXCom).filter_by(dag_id=dag_id,task_id=task_id,run_id=run_id)ifmap_indexisnotNone:query=query.filter_by(map_index=map_index)forxcominquery:# print(f"Clearing XCOM {xcom} with value {xcom.value}")XCom.purge(xcom,session)session.delete(xcom)session.commit()@staticmethod
[docs]defserialize_value(value:Any,*,key:str|None=None,task_id:str|None=None,dag_id:str|None=None,run_id:str|None=None,map_index:int|None=None,)->Any:"""Serialize XCom value to str or pickled object."""ifconf.getboolean("core","enable_xcom_pickling"):returnpickle.dumps(value)try:returnjson.dumps(value,cls=XComEncoder).encode("UTF-8")except(ValueError,TypeError)asex:log.error("%s."" If you are using pickle instead of JSON for XCom,"" then you need to enable pickle support for XCom"" in your airflow config or make sure to decorate your"" object with attr.",ex,)raise
@staticmethoddef_deserialize_value(result:XCom,orm:bool)->Any:object_hook=Noneiform:object_hook=XComDecoder.orm_object_hookifresult.valueisNone:returnNoneifconf.getboolean("core","enable_xcom_pickling"):try:returnpickle.loads(result.value)exceptpickle.UnpicklingError:returnjson.loads(result.value.decode("UTF-8"),cls=XComDecoder,object_hook=object_hook)else:# Since xcom_pickling is disabled, we should only try to deserialize with JSONreturnjson.loads(result.value.decode("UTF-8"),cls=XComDecoder,object_hook=object_hook)@staticmethod
[docs]defdeserialize_value(result:XCom)->Any:"""Deserialize XCom value from str or pickle object."""returnBaseXCom._deserialize_value(result,False)
[docs]deform_deserialize_value(self)->Any:""" Deserialize method which is used to reconstruct ORM XCom object. This method should be overridden in custom XCom backends to avoid unnecessary request or other resource consuming operations when creating XCom orm model. This is used when viewing XCom listing in the webserver, for example. """returnBaseXCom._deserialize_value(self,True)
class_LazyXComAccessIterator(collections.abc.Iterator):def__init__(self,cm:contextlib.AbstractContextManager[Query])->None:self._cm=cmself._entered=Falsedef__del__(self)->None:ifself._entered:self._cm.__exit__(None,None,None)def__iter__(self)->collections.abc.Iterator:returnselfdef__next__(self)->Any:returnXCom.deserialize_value(next(self._it))@cached_propertydef_it(self)->collections.abc.Iterator:self._entered=Truereturniter(self._cm.__enter__())@attr.define(slots=True)classLazyXComAccess(collections.abc.Sequence):"""Wrapper to lazily pull XCom with a sequence-like interface. Note that since the session bound to the parent query may have died when we actually access the sequence's content, we must create a new session for every function call with ``with_session()``. :meta private: """_query:Query_len:int|None=attr.ib(init=False,default=None)@classmethoddefbuild_from_xcom_query(cls,query:Query)->LazyXComAccess:returncls(query=query.with_entities(XCom.value))def__repr__(self)->str:returnf"LazyXComAccess([{len(self)} items])"def__str__(self)->str:returnstr(list(self))def__eq__(self,other:Any)->bool:ifisinstance(other,(list,LazyXComAccess)):z=itertools.zip_longest(iter(self),iter(other),fillvalue=object())returnall(x==yforx,yinz)returnNotImplementeddef__getstate__(self)->Any:# We don't want to go to the trouble of serializing the entire Query# object, including its filters, hints, etc. (plus SQLAlchemy does not# provide a public API to inspect a query's contents). Converting the# query into a SQL string is the best we can get. Theoratically we can# do the same for count(), but I think it should be performant enough to# calculate only that eagerly.withself._get_bound_query()asquery:statement=query.statement.compile(query.session.get_bind(),# This inlines all the values into the SQL string to simplify# cross-process commuinication as much as possible.compile_kwargs={"literal_binds":True},)return(str(statement),query.count())def__setstate__(self,state:Any)->None:statement,self._len=stateself._query=Query(XCom.value).from_statement(text(statement))def__len__(self):ifself._lenisNone:withself._get_bound_query()asquery:self._len=query.count()returnself._lendef__iter__(self):return_LazyXComAccessIterator(self._get_bound_query())def__getitem__(self,key):ifnotisinstance(key,int):raiseValueError("only support index access for now")try:withself._get_bound_query()asquery:r=query.offset(key).limit(1).one()exceptNoResultFound:raiseIndexError(key)fromNonereturnXCom.deserialize_value(r)@contextlib.contextmanagerdef_get_bound_query(self)->Generator[Query,None,None]:# Do we have a valid session already?ifself._query.sessionandself._query.session.is_active:yieldself._queryreturnSession=getattr(settings,"Session",None)ifSessionisNone:raiseRuntimeError("Session must be set before!")session=Session()try:yieldself._query.with_session(session)finally:session.close()def_patch_outdated_serializer(clazz:type[BaseXCom],params:Iterable[str])->None:"""Patch a custom ``serialize_value`` to accept the modern signature. To give custom XCom backends more flexibility with how they store values, we now forward all params passed to ``XCom.set`` to ``XCom.serialize_value``. In order to maintain compatibility with custom XCom backends written with the old signature, we check the signature and, if necessary, patch with a method that ignores kwargs the backend does not accept. """old_serializer=clazz.serialize_value@wraps(old_serializer)def_shim(**kwargs):kwargs={k:kwargs.get(k)forkinparams}warnings.warn(f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"f"must be updated to accept all params in `BaseXCom.set` except `session`. Support will be "f"removed in a future release.",RemovedInAirflow3Warning,)returnold_serializer(**kwargs)clazz.serialize_value=_shim# type: ignore[assignment]def_get_function_params(function)->list[str]:""" Return the list of variables names of a function. :param function: The function to inspect """parameters=inspect.signature(function).parametersbound_arguments=[nameforname,pinparameters.items()ifp.kindnotin(p.VAR_POSITIONAL,p.VAR_KEYWORD)]returnbound_arguments
[docs]defresolve_xcom_backend()->type[BaseXCom]:"""Resolve custom XCom class. Confirm that custom XCom class extends the BaseXCom. Compare the function signature of the custom XCom serialize_value to the base XCom serialize_value. """clazz=conf.getimport("core","xcom_backend",fallback=f"airflow.models.xcom.{BaseXCom.__name__}")ifnotclazz:returnBaseXComifnotissubclass(clazz,BaseXCom):raiseTypeError(f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`.")base_xcom_params=_get_function_params(BaseXCom.serialize_value)xcom_params=_get_function_params(clazz.serialize_value)ifset(base_xcom_params)!=set(xcom_params):_patch_outdated_serializer(clazz=clazz,params=xcom_params)returnclazz
ifTYPE_CHECKING:
[docs]XCom=BaseXCom# Hack to avoid Mypy "Variable 'XCom' is not valid as a type".