## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportloggingfromtypingimportTYPE_CHECKING,AnyfromsqlalchemyimportBoolean,Column,Integer,String,Text,delete,selectfromsqlalchemy.dialects.mysqlimportMEDIUMTEXTfromsqlalchemy.ormimportdeclared_attr,reconstructor,synonymfromairflow.api_internal.internal_api_callimportinternal_api_callfromairflow.configurationimportensure_secrets_loadedfromairflow.models.baseimportID_LEN,Basefromairflow.models.cryptoimportget_fernetfromairflow.secrets.cacheimportSecretCachefromairflow.secrets.metastoreimportMetastoreBackendfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.sessionimportprovide_sessionifTYPE_CHECKING:fromsqlalchemy.ormimportSession
[docs]def__repr__(self):# Hiding the valuereturnf"{self.key} : {self._val}"
[docs]defget_val(self):"""Get Airflow Variable from Metadata DB and decode it using the Fernet Key."""fromcryptography.fernetimportInvalidTokenasInvalidFernetTokenifself._valisnotNoneandself.is_encrypted:try:fernet=get_fernet()returnfernet.decrypt(bytes(self._val,"utf-8")).decode()exceptInvalidFernetToken:self.log.error("Can't decrypt _val for key=%s, invalid token or value",self.key)returnNoneexceptException:self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing",self.key)returnNoneelse:returnself._val
[docs]defset_val(self,value):"""Encode the specified value with Fernet Key and store it in Variables Table."""ifvalueisnotNone:fernet=get_fernet()self._val=fernet.encrypt(bytes(value,"utf-8")).decode()self.is_encrypted=fernet.is_encrypted
@declared_attr
[docs]defval(cls):"""Get Airflow Variable from Metadata DB and decode it using the Fernet Key."""returnsynonym("_val",descriptor=property(cls.get_val,cls.set_val))
@classmethod
[docs]defsetdefault(cls,key,default,description=None,deserialize_json=False):""" Return the current value for a key or store the default value and return it. Works the same as the Python builtin dict object. :param key: Dict key for this Variable :param default: Default value to set and return if the variable isn't already in the DB :param description: Default value to set Description of the Variable :param deserialize_json: Store this as a JSON encoded value in the DB and un-encode it when retrieving a value :return: Mixed """obj=Variable.get(key,default_var=None,deserialize_json=deserialize_json)ifobjisNone:ifdefaultisnotNone:Variable.set(key,default,description=description,serialize_json=deserialize_json)returndefaultelse:raiseValueError("Default Value must be set")else:returnobj
@classmethod
[docs]defget(cls,key:str,default_var:Any=__NO_DEFAULT_SENTINEL,deserialize_json:bool=False,)->Any:"""Get a value for an Airflow Variable Key. :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exist :param deserialize_json: Deserialize the value to a Python dict """var_val=Variable.get_variable_from_secrets(key=key)ifvar_valisNone:ifdefault_varisnotcls.__NO_DEFAULT_SENTINEL:returndefault_varelse:raiseKeyError(f"Variable {key} does not exist")else:ifdeserialize_json:obj=json.loads(var_val)mask_secret(obj,key)returnobjelse:mask_secret(var_val,key)returnvar_val
@staticmethod@provide_session@internal_api_call
[docs]defset(key:str,value:Any,description:str|None=None,serialize_json:bool=False,session:Session=None,)->None:"""Set a value for an Airflow Variable with a given Key. This operation overwrites an existing variable. :param key: Variable Key :param value: Value to set for the Variable :param description: Description of the Variable :param serialize_json: Serialize the value to a JSON string """# check if the secret exists in the custom secrets' backend.Variable.check_for_write_conflict(key)ifserialize_json:stored_value=json.dumps(value,indent=2)else:stored_value=str(value)Variable.delete(key,session=session)session.add(Variable(key=key,val=stored_value,description=description))session.flush()# invalidate key in cache for faster propagation# we cannot save the value set because it's possible that it's shadowed by a custom backend# (see call to check_for_write_conflict above)SecretCache.invalidate_variable(key)
@staticmethod@provide_session@internal_api_call
[docs]defupdate(key:str,value:Any,serialize_json:bool=False,session:Session=None,)->None:"""Update a given Airflow Variable with the Provided value. :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string """Variable.check_for_write_conflict(key)ifVariable.get_variable_from_secrets(key=key)isNone:raiseKeyError(f"Variable {key} does not exist")obj=session.scalar(select(Variable).where(Variable.key==key))ifobjisNone:raiseAttributeError(f"Variable {key} does not exist in the Database and cannot be updated.")Variable.set(key,value,description=obj.description,serialize_json=serialize_json)
@staticmethod@provide_session@internal_api_call
[docs]defdelete(key:str,session:Session=None)->int:"""Delete an Airflow Variable for a given key. :param key: Variable Keys """rows=session.execute(delete(Variable).where(Variable.key==key)).rowcountSecretCache.invalidate_variable(key)returnrows
[docs]defcheck_for_write_conflict(key:str)->None:"""Log a warning if a variable exists outside the metastore. If we try to write a variable to the metastore while the same key exists in an environment variable or custom secrets backend, then subsequent reads will not read the set value. :param key: Variable Key """forsecrets_backendinensure_secrets_loaded():ifnotisinstance(secrets_backend,MetastoreBackend):try:var_val=secrets_backend.get_variable(key=key)ifvar_valisnotNone:_backend_name=type(secrets_backend).__name__log.warning("The variable %s is defined in the %s secrets backend, which takes ""precedence over reading from the database. The value in the database will be ""updated, but to read it you have to delete the conflicting variable ""from %s",key,_backend_name,_backend_name,)returnexceptException:log.exception("Unable to retrieve variable from secrets backend (%s). ""Checking subsequent secrets backend.",type(secrets_backend).__name__,)returnNone
@staticmethod
[docs]defget_variable_from_secrets(key:str)->str|None:""" Get Airflow Variable by iterating over all Secret Backends. :param key: Variable Key :return: Variable Value """# check cache first# enabled only if SecretCache.init() has been called firsttry:returnSecretCache.get_variable(key)exceptSecretCache.NotPresentException:pass# continue businessvar_val=None# iterate over backends if not in cache (or expired)forsecrets_backendinensure_secrets_loaded():try:var_val=secrets_backend.get_variable(key=key)ifvar_valisnotNone:breakexceptException:log.exception("Unable to retrieve variable from secrets backend (%s). ""Checking subsequent secrets backend.",type(secrets_backend).__name__,)SecretCache.save_variable(key,var_val)# we save None as wellreturnvar_val