## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Objects relating to retrieving connections and variables from local file"""importjsonimportloggingimportosimportwarningsfromcollectionsimportdefaultdictfrominspectimportsignaturefromjsonimportJSONDecodeErrorfromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Set,Tuplefromairflow.exceptionsimport(AirflowException,AirflowFileParseException,ConnectionNotUnique,FileSyntaxError,)fromairflow.secrets.base_secretsimportBaseSecretsBackendfromairflow.utilsimportyamlfromairflow.utils.fileimportCOMMENT_PATTERNfromairflow.utils.log.logging_mixinimportLoggingMixin
def_parse_env_file(file_path:str)->Tuple[Dict[str,List[str]],List[FileSyntaxError]]:""" Parse a file in the ``.env`` format. .. code-block:: text MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2 :param file_path: The location of the file that will be processed. :return: Tuple with mapping of key and list of values and list of syntax errors """withopen(file_path)asf:content=f.read()secrets:Dict[str,List[str]]=defaultdict(list)errors:List[FileSyntaxError]=[]forline_no,lineinenumerate(content.splitlines(),1):ifnotline:# Ignore empty linecontinueifCOMMENT_PATTERN.match(line):# Ignore commentscontinuekey,sep,value=line.partition("=")ifnotsep:errors.append(FileSyntaxError(line_no=line_no,message='Invalid line format. The line should contain at least one equal sign ("=").',))continueifnotvalue:errors.append(FileSyntaxError(line_no=line_no,message="Invalid line format. Key is empty.",))secrets[key].append(value)returnsecrets,errorsdef_parse_yaml_file(file_path:str)->Tuple[Dict[str,List[str]],List[FileSyntaxError]]:""" Parse a file in the YAML format. :param file_path: The location of the file that will be processed. :return: Tuple with mapping of key and list of values and list of syntax errors """withopen(file_path)asf:content=f.read()ifnotcontent:return{},[FileSyntaxError(line_no=1,message="The file is empty.")]try:secrets=yaml.safe_load(content)exceptyaml.MarkedYAMLErrorase:err_line_no=e.problem_mark.lineife.problem_markelse-1return{},[FileSyntaxError(line_no=err_line_no,message=str(e))]ifnotisinstance(secrets,dict):return{},[FileSyntaxError(line_no=1,message="The file should contain the object.")]returnsecrets,[]def_parse_json_file(file_path:str)->Tuple[Dict[str,Any],List[FileSyntaxError]]:""" Parse a file in the JSON format. :param file_path: The location of the file that will be processed. :return: Tuple with mapping of key and list of values and list of syntax errors """withopen(file_path)asf:content=f.read()ifnotcontent:return{},[FileSyntaxError(line_no=1,message="The file is empty.")]try:secrets=json.loads(content)exceptJSONDecodeErrorase:return{},[FileSyntaxError(line_no=int(e.lineno),message=e.msg)]ifnotisinstance(secrets,dict):return{},[FileSyntaxError(line_no=1,message="The file should contain the object.")]returnsecrets,[]
}def_parse_secret_file(file_path:str)->Dict[str,Any]:""" Based on the file extension format, selects a parser, and parses the file. :param file_path: The location of the file that will be processed. :return: Map of secret key (e.g. connection ID) and value. """ifnotos.path.exists(file_path):raiseAirflowException(f"File {file_path} was not found. Check the configuration of your Secrets backend.")log.debug("Parsing file: %s",file_path)ext=file_path.rsplit(".",2)[-1].lower()ifextnotinFILE_PARSERS:raiseAirflowException("Unsupported file format. The file must have one of the following extensions: "".env .json .yaml .yml")secrets,parse_errors=FILE_PARSERS[ext](file_path)log.debug("Parsed file: len(parse_errors)=%d, len(secrets)=%d",len(parse_errors),len(secrets))ifparse_errors:raiseAirflowFileParseException("Failed to load the secret file.",file_path=file_path,parse_errors=parse_errors)returnsecretsdef_create_connection(conn_id:str,value:Any):"""Creates a connection based on a URL or JSON object."""fromairflow.models.connectionimportConnectionifisinstance(value,str):returnConnection(conn_id=conn_id,uri=value)ifisinstance(value,dict):connection_parameter_names=get_connection_parameter_names()|{"extra_dejson"}current_keys=set(value.keys())ifnotcurrent_keys.issubset(connection_parameter_names):illegal_keys=current_keys-connection_parameter_namesillegal_keys_list=", ".join(illegal_keys)raiseAirflowException(f"The object have illegal keys: {illegal_keys_list}. "f"The dictionary can only contain the following keys: {connection_parameter_names}")if"extra"invalueand"extra_dejson"invalue:raiseAirflowException("The extra and extra_dejson parameters are mutually exclusive. ""Please provide only one parameter.")if"extra_dejson"invalue:value["extra"]=json.dumps(value["extra_dejson"])delvalue["extra_dejson"]if"conn_id"incurrent_keysandconn_id!=value["conn_id"]:raiseAirflowException(f"Mismatch conn_id. "f"The dictionary key has the value: {value['conn_id']}. "f"The item has the value: {conn_id}.")value["conn_id"]=conn_idreturnConnection(**value)raiseAirflowException(f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object.")
[docs]defload_variables(file_path:str)->Dict[str,str]:""" Load variables from a text file. ``JSON``, `YAML` and ``.env`` files are supported. :param file_path: The location of the file that will be processed. :rtype: Dict[str, List[str]] """log.debug("Loading variables from a text file")secrets=_parse_secret_file(file_path)invalid_keys=[keyforkey,valuesinsecrets.items()ifisinstance(values,list)andlen(values)!=1]ifinvalid_keys:raiseAirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}')variables={key:values[0]ifisinstance(values,list)elsevaluesforkey,valuesinsecrets.items()}log.debug("Loaded %d variables: ",len(variables))returnvariables
[docs]defload_connections(file_path)->Dict[str,List[Any]]:"""This function is deprecated. Please use `airflow.secrets.local_filesystem.load_connections_dict`.","""warnings.warn("This function is deprecated. Please use `airflow.secrets.local_filesystem.load_connections_dict`.",DeprecationWarning,stacklevel=2,)return{k:[v]fork,vinload_connections_dict(file_path).values()}
[docs]defload_connections_dict(file_path:str)->Dict[str,Any]:""" Load connection from text file. ``JSON``, `YAML` and ``.env`` files are supported. :return: A dictionary where the key contains a connection ID and the value contains the connection. :rtype: Dict[str, airflow.models.connection.Connection] """log.debug("Loading connection")secrets:Dict[str,Any]=_parse_secret_file(file_path)connection_by_conn_id={}forkey,secret_valuesinlist(secrets.items()):ifisinstance(secret_values,list):iflen(secret_values)>1:raiseConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")forsecret_valueinsecret_values:connection_by_conn_id[key]=_create_connection(key,secret_value)else:connection_by_conn_id[key]=_create_connection(key,secret_values)num_conn=len(connection_by_conn_id)log.debug("Loaded %d connections",num_conn)returnconnection_by_conn_id
[docs]classLocalFilesystemBackend(BaseSecretsBackend,LoggingMixin):""" Retrieves Connection objects and Variables from local files ``JSON``, `YAML` and ``.env`` files are supported. :param variables_file_path: File location with variables data. :param connections_file_path: File location with connection data. """def__init__(self,variables_file_path:Optional[str]=None,connections_file_path:Optional[str]=None):super().__init__()self.variables_file=variables_file_pathself.connections_file=connections_file_path@propertydef_local_variables(self)->Dict[str,str]:ifnotself.variables_file:self.log.debug("The file for variables is not specified. Skipping")# The user may not specify any file.return{}secrets=load_variables(self.variables_file)returnsecrets@propertydef_local_connections(self)->Dict[str,'Connection']:ifnotself.connections_file:self.log.debug("The file for connection is not specified. Skipping")# The user may not specify any file.return{}returnload_connections_dict(self.connections_file)
[docs]defget_connections(self,conn_id:str)->List[Any]:warnings.warn("This method is deprecated. Please use ""`airflow.secrets.local_filesystem.LocalFilesystemBackend.get_connection`.",PendingDeprecationWarning,stacklevel=2,)conn=self.get_connection(conn_id=conn_id)ifconn:return[conn]return[]