Source code for airflow.secrets.cache
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import datetime
import multiprocessing
from airflow.configuration import conf
from airflow.utils import timezone
[docs]class SecretCache:
"""A static class to manage the global secret cache."""
__manager: multiprocessing.managers.SyncManager | None = None
_cache: dict[str, _CacheValue] | None = None
_ttl: datetime.timedelta
class NotPresentException(Exception):
"""Raised when a key is not present in the cache."""
class _CacheValue:
def __init__(self, value: str | None) -> None:
self.value = value
self.date = timezone.utcnow()
def is_expired(self, ttl: datetime.timedelta) -> bool:
return timezone.utcnow() - self.date > ttl
_VARIABLE_PREFIX = "__v_"
_CONNECTION_PREFIX = "__c_"
@classmethod
[docs] def init(cls):
"""
Initialize the cache, provided the configuration allows it.
Safe to call several times.
"""
if cls._cache is not None:
return
use_cache = conf.getboolean(section="secrets", key="use_cache", fallback=False)
if not use_cache:
return
if cls.__manager is None:
# it is not really necessary to save the manager, but doing so allows to reuse it between tests,
# making them run a lot faster because this operation takes ~300ms each time
cls.__manager = multiprocessing.Manager()
cls._cache = cls.__manager.dict()
ttl_seconds = conf.getint(section="secrets", key="cache_ttl_seconds", fallback=15 * 60)
cls._ttl = datetime.timedelta(seconds=ttl_seconds)
@classmethod
[docs] def reset(cls):
"""Use for test purposes only."""
cls._cache = None
@classmethod
[docs] def get_variable(cls, key: str) -> str | None:
"""
Try to get the value associated with the key from the cache.
:return: The saved value (which can be None) if present in cache and not expired,
a NotPresent exception otherwise.
"""
return cls._get(key, cls._VARIABLE_PREFIX)
@classmethod
[docs] def get_connection_uri(cls, conn_id: str) -> str:
"""
Try to get the uri associated with the conn_id from the cache.
:return: The saved uri if present in cache and not expired,
a NotPresent exception otherwise.
"""
val = cls._get(conn_id, cls._CONNECTION_PREFIX)
if val: # there shouldn't be any empty entries in the connections cache, but we enforce it here.
return val
raise cls.NotPresentException
@classmethod
def _get(cls, key: str, prefix: str) -> str | None:
if cls._cache is None:
# using an exception for misses allow to meaningfully cache None values
raise cls.NotPresentException
val = cls._cache.get(f"{prefix}{key}")
if val and not val.is_expired(cls._ttl):
return val.value
raise cls.NotPresentException
@classmethod
[docs] def save_variable(cls, key: str, value: str | None):
"""Save the value for that key in the cache, if initialized."""
cls._save(key, value, cls._VARIABLE_PREFIX)
@classmethod
[docs] def save_connection_uri(cls, conn_id: str, uri: str):
"""Save the uri representation for that connection in the cache, if initialized."""
if uri is None:
# connections raise exceptions if not present, so we shouldn't have any None value to save.
return
cls._save(conn_id, uri, cls._CONNECTION_PREFIX)
@classmethod
def _save(cls, key: str, value: str | None, prefix: str):
if cls._cache is not None:
cls._cache[f"{prefix}{key}"] = cls._CacheValue(value)
@classmethod
[docs] def invalidate_variable(cls, key: str):
"""Invalidate (actually removes) the value stored in the cache for that Variable."""
if cls._cache is not None:
# second arg ensures no exception if key is absent
cls._cache.pop(f"{cls._VARIABLE_PREFIX}{key}", None)