## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimeimportmultiprocessingfromairflow.configurationimportconffromairflow.utilsimporttimezone
[docs]classSecretCache:"""A static class to manage the global secret cache."""__manager:multiprocessing.managers.SyncManager|None=None_cache:dict[str,_CacheValue]|None=None_ttl:datetime.timedeltaclassNotPresentException(Exception):"""Raised when a key is not present in the cache."""class_CacheValue:def__init__(self,value:str|None)->None:self.value=valueself.date=timezone.utcnow()defis_expired(self,ttl:datetime.timedelta)->bool:returntimezone.utcnow()-self.date>ttl_VARIABLE_PREFIX="__v_"_CONNECTION_PREFIX="__c_"@classmethod
[docs]definit(cls):""" Initialize the cache, provided the configuration allows it. Safe to call several times. """ifcls._cacheisnotNone:returnuse_cache=conf.getboolean(section="secrets",key="use_cache",fallback=False)ifnotuse_cache:returnifcls.__managerisNone:# it is not really necessary to save the manager, but doing so allows to reuse it between tests,# making them run a lot faster because this operation takes ~300ms each timecls.__manager=multiprocessing.Manager()cls._cache=cls.__manager.dict()ttl_seconds=conf.getint(section="secrets",key="cache_ttl_seconds",fallback=15*60)cls._ttl=datetime.timedelta(seconds=ttl_seconds)
@classmethod
[docs]defreset(cls):"""Use for test purposes only."""cls._cache=None
@classmethod
[docs]defget_variable(cls,key:str)->str|None:""" Try to get the value associated with the key from the cache. :return: The saved value (which can be None) if present in cache and not expired, a NotPresent exception otherwise. """returncls._get(key,cls._VARIABLE_PREFIX)
@classmethod
[docs]defget_connection_uri(cls,conn_id:str)->str:""" Try to get the uri associated with the conn_id from the cache. :return: The saved uri if present in cache and not expired, a NotPresent exception otherwise. """val=cls._get(conn_id,cls._CONNECTION_PREFIX)ifval:# there shouldn't be any empty entries in the connections cache, but we enforce it here.returnvalraisecls.NotPresentException
@classmethoddef_get(cls,key:str,prefix:str)->str|None:ifcls._cacheisNone:# using an exception for misses allow to meaningfully cache None valuesraisecls.NotPresentExceptionval=cls._cache.get(f"{prefix}{key}")ifvalandnotval.is_expired(cls._ttl):returnval.valueraisecls.NotPresentException@classmethod
[docs]defsave_variable(cls,key:str,value:str|None):"""Save the value for that key in the cache, if initialized."""cls._save(key,value,cls._VARIABLE_PREFIX)
@classmethod
[docs]defsave_connection_uri(cls,conn_id:str,uri:str):"""Save the uri representation for that connection in the cache, if initialized."""ifuriisNone:# connections raise exceptions if not present, so we shouldn't have any None value to save.returncls._save(conn_id,uri,cls._CONNECTION_PREFIX)
[docs]definvalidate_variable(cls,key:str):"""Invalidate (actually removes) the value stored in the cache for that Variable."""ifcls._cacheisnotNone:# second arg ensures no exception if key is absentcls._cache.pop(f"{cls._VARIABLE_PREFIX}{key}",None)