## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importjsonimportloggingimportosfromtypingimportAny,Optionalfromcryptography.fernetimportInvalidTokenasInvalidFernetTokenfromsqlalchemyimportBoolean,Column,Integer,String,Textfromsqlalchemy.ext.declarativeimportdeclared_attrfromsqlalchemy.ormimportSession,reconstructor,synonymfromairflow.configurationimportensure_secrets_loadedfromairflow.models.baseimportID_LEN,Basefromairflow.models.cryptoimportget_fernetfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.sessionimportprovide_session
[docs]classVariable(Base,LoggingMixin):""" Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow. """
[docs]def__repr__(self):# Hiding the valuereturnf'{self.key} : {self._val}'
[docs]defget_val(self):"""Get Airflow Variable from Metadata DB and decode it using the Fernet Key"""ifself._valisnotNoneandself.is_encrypted:try:fernet=get_fernet()returnfernet.decrypt(bytes(self._val,'utf-8')).decode()exceptInvalidFernetToken:self.log.error("Can't decrypt _val for key=%s, invalid token or value",self.key)returnNoneexceptException:self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing",self.key)returnNoneelse:returnself._val
[docs]defset_val(self,value):"""Encode the specified value with Fernet Key and store it in Variables Table."""ifvalueisnotNone:fernet=get_fernet()self._val=fernet.encrypt(bytes(value,'utf-8')).decode()self.is_encrypted=fernet.is_encrypted
@declared_attr
[docs]defval(cls):"""Get Airflow Variable from Metadata DB and decode it using the Fernet Key"""returnsynonym('_val',descriptor=property(cls.get_val,cls.set_val))
@classmethod
[docs]defsetdefault(cls,key,default,deserialize_json=False):""" Like a Python builtin dict object, setdefault returns the current value for a key, and if it isn't there, stores the default value and returns it. :param key: Dict key for this Variable :type key: str :param default: Default value to set and return if the variable isn't already in the DB :type default: Mixed :param deserialize_json: Store this as a JSON encoded value in the DB and un-encode it when retrieving a value :return: Mixed """obj=Variable.get(key,default_var=None,deserialize_json=deserialize_json)ifobjisNone:ifdefaultisnotNone:Variable.set(key,default,serialize_json=deserialize_json)returndefaultelse:raiseValueError('Default Value must be set')else:returnobj
@classmethod
[docs]defget(cls,key:str,default_var:Any=__NO_DEFAULT_SENTINEL,deserialize_json:bool=False,)->Any:""" Gets a value for an Airflow Variable Key :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exists :param deserialize_json: Deserialize the value to a Python dict """var_val=Variable.get_variable_from_secrets(key=key)ifvar_valisNone:ifdefault_varisnotcls.__NO_DEFAULT_SENTINEL:returndefault_varelse:raiseKeyError(f'Variable {key} does not exist')else:ifdeserialize_json:obj=json.loads(var_val)mask_secret(var_val,key)returnobjelse:mask_secret(var_val,key)returnvar_val
@classmethod@provide_session
[docs]defset(cls,key:str,value:Any,serialize_json:bool=False,session:Session=None):""" Sets a value for an Airflow Variable with a given Key :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string :param session: SQL Alchemy Sessions """env_var_name="AIRFLOW_VAR_"+key.upper()ifenv_var_nameinos.environ:log.warning("You have the environment variable %s defined, which takes precedence over reading ""from the database. The value will be saved, but to read it you have to delete ""the environment variable.",env_var_name,)ifserialize_json:stored_value=json.dumps(value,indent=2)else:stored_value=str(value)Variable.delete(key,session=session)session.add(Variable(key=key,val=stored_value))session.flush()
@classmethod@provide_session
[docs]defdelete(cls,key:str,session:Session=None)->int:""" Delete an Airflow Variable for a given key :param key: Variable Key :param session: SQL Alchemy Sessions """returnsession.query(cls).filter(cls.key==key).delete()
[docs]defget_variable_from_secrets(key:str)->Optional[str]:""" Get Airflow Variable by iterating over all Secret Backends. :param key: Variable Key :return: Variable Value """forsecrets_backendinensure_secrets_loaded():var_val=secrets_backend.get_variable(key=key)ifvar_valisnotNone:returnvar_valreturnNone