## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportloggingimportwarningsfromcontextlibimportsuppressfromjsonimportJSONDecodeErrorfromtypingimportAnyfromurllib.parseimportparse_qsl,quote,unquote,urlencode,urlsplitimportre2fromsqlalchemyimportBoolean,Column,Integer,String,Textfromsqlalchemy.ormimportdeclared_attr,reconstructor,synonymfromairflow.configurationimportensure_secrets_loadedfromairflow.exceptionsimportAirflowException,AirflowNotFoundException,RemovedInAirflow3Warningfromairflow.models.baseimportID_LEN,Basefromairflow.models.cryptoimportget_fernetfromairflow.secrets.cacheimportSecretCachefromairflow.utils.helpersimportprune_dictfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.module_loadingimportimport_string
# sanitize the `conn_id` pattern by allowing alphanumeric characters plus# the symbols #,!,-,_,.,:,\,/ and () requiring at least one match.## You can try the regex here: https://regex101.com/r/69033B/1
[docs]defparse_netloc_to_hostname(*args,**kwargs):"""Do not use, this method is deprecated."""warnings.warn("This method is deprecated.",RemovedInAirflow3Warning,stacklevel=2)return_parse_netloc_to_hostname(*args,**kwargs)
[docs]defsanitize_conn_id(conn_id:str|None,max_length=CONN_ID_MAX_LEN)->str|None:r""" Sanitizes the connection id and allows only specific characters to be within. Namely, it allows alphanumeric characters plus the symbols #,!,-,_,.,:,\,/ and () from 1 and up to 250 consecutive matches. If desired, the max length can be adjusted by setting `max_length`. You can try to play with the regex here: https://regex101.com/r/69033B/1 The character selection is such that it prevents the injection of javascript or executable bits to avoid any awkward behaviour in the front-end. :param conn_id: The connection id to sanitize. :param max_length: The max length of the connection ID, by default it is 250. :return: the sanitized string, `None` otherwise. """# check if `conn_id` or our match group is `None` and the `conn_id` is within the specified length.if(notisinstance(conn_id,str)orlen(conn_id)>max_length)or(res:=re2.match(RE_SANITIZE_CONN_ID,conn_id))isNone:returnNone# if we reach here, then we matched something, return the first matchreturnres.group(0)
def_parse_netloc_to_hostname(uri_parts):""" Parse a URI string to get the correct Hostname. ``urlparse(...).hostname`` or ``urlsplit(...).hostname`` returns value into the lowercase in most cases, there are some exclusion exists for specific cases such as https://bugs.python.org/issue32323 In case if expected to get a path as part of hostname path, then default behavior ``urlparse``/``urlsplit`` is unexpected. """hostname=unquote(uri_parts.hostnameor"")if"/"inhostname:hostname=uri_parts.netlocif"@"inhostname:hostname=hostname.rsplit("@",1)[1]if":"inhostname:hostname=hostname.split(":",1)[0]hostname=unquote(hostname)returnhostname
[docs]classConnection(Base,LoggingMixin):""" Placeholder to store information about different database instances connection information. The idea here is that scripts use references to database instances (conn_id) instead of hard coding hostname, logins and passwords when using operators or hooks. .. seealso:: For more information on how to use this class, see: :doc:`/howto/connection` :param conn_id: The connection ID. :param conn_type: The connection type. :param description: The connection description. :param host: The host. :param login: The login. :param password: The password. :param schema: The schema. :param port: The port number. :param extra: Extra metadata. Non-standard data such as private/SSH keys can be saved here. JSON encoded object. :param uri: URI address describing connection parameters. """
_extra=Column("extra",Text())def__init__(self,conn_id:str|None=None,conn_type:str|None=None,description:str|None=None,host:str|None=None,login:str|None=None,password:str|None=None,schema:str|None=None,port:int|None=None,extra:str|dict|None=None,uri:str|None=None,):super().__init__()self.conn_id=sanitize_conn_id(conn_id)self.description=descriptionifextraandnotisinstance(extra,str):extra=json.dumps(extra)ifuriand(conn_typeorhostorloginorpasswordorschemaorportorextra):raiseAirflowException("You must create an object using the URI or individual values ""(conn_type, host, login, password, schema, port or extra).""You can't mix these two ways to create this object.")ifuri:self._parse_from_uri(uri)else:self.conn_type=conn_typeself.host=hostself.login=loginself.password=passwordself.schema=schemaself.port=portself.extra=extraifself.extra:self._validate_extra(self.extra,self.conn_id)ifself.password:mask_secret(self.password)mask_secret(quote(self.password))@staticmethoddef_validate_extra(extra,conn_id)->None:""" Verify that ``extra`` is a JSON-encoded Python dict. From Airflow 3.0, we should no longer suppress these errors but raise instead. """ifextraisNone:returnNonetry:extra_parsed=json.loads(extra)ifnotisinstance(extra_parsed,dict):warnings.warn("Encountered JSON value in `extra` which does not parse as a dictionary in "f"connection {conn_id!r}. From Airflow 3.0, the `extra` field must contain a JSON ""representation of a Python dict.",RemovedInAirflow3Warning,stacklevel=3,)exceptjson.JSONDecodeError:warnings.warn(f"Encountered non-JSON in `extra` field for connection {conn_id!r}. Support for ""non-JSON `extra` will be removed in Airflow 3.0",RemovedInAirflow3Warning,stacklevel=2,)returnNone@reconstructor
[docs]defparse_from_uri(self,**uri):"""Use uri parameter in constructor, this method is deprecated."""warnings.warn("This method is deprecated. Please use uri parameter in constructor.",RemovedInAirflow3Warning,stacklevel=2,)self._parse_from_uri(**uri)
@staticmethoddef_normalize_conn_type(conn_type):ifconn_type=="postgresql":conn_type="postgres"elif"-"inconn_type:conn_type=conn_type.replace("-","_")returnconn_typedef_parse_from_uri(self,uri:str):schemes_count_in_uri=uri.count("://")ifschemes_count_in_uri>2:raiseAirflowException(f"Invalid connection string: {uri}.")host_with_protocol=schemes_count_in_uri==2uri_parts=urlsplit(uri)conn_type=uri_parts.schemeself.conn_type=self._normalize_conn_type(conn_type)rest_of_the_url=uri.replace(f"{conn_type}://",(""ifhost_with_protocolelse"//"))ifhost_with_protocol:uri_splits=rest_of_the_url.split("://",1)if"@"inuri_splits[0]or":"inuri_splits[0]:raiseAirflowException(f"Invalid connection string: {uri}.")uri_parts=urlsplit(rest_of_the_url)protocol=uri_parts.schemeifhost_with_protocolelseNonehost=_parse_netloc_to_hostname(uri_parts)self.host=self._create_host(protocol,host)quoted_schema=uri_parts.path[1:]self.schema=unquote(quoted_schema)ifquoted_schemaelsequoted_schemaself.login=unquote(uri_parts.username)ifuri_parts.usernameelseuri_parts.usernameself.password=unquote(uri_parts.password)ifuri_parts.passwordelseuri_parts.passwordself.port=uri_parts.portifuri_parts.query:query=dict(parse_qsl(uri_parts.query,keep_blank_values=True))ifself.EXTRA_KEYinquery:self.extra=query[self.EXTRA_KEY]else:self.extra=json.dumps(query)@staticmethoddef_create_host(protocol,host)->str|None:"""Return the connection host with the protocol."""ifnothost:returnhostifprotocol:returnf"{protocol}://{host}"returnhost
[docs]defget_uri(self)->str:"""Return connection in URI format."""ifself.conn_typeand"_"inself.conn_type:self.log.warning("Connection schemes (type: %s) shall not contain '_' according to RFC3986.",self.conn_type,)ifself.conn_type:uri=f"{self.conn_type.lower().replace('_','-')}://"else:uri="//"ifself.hostand"://"inself.host:protocol,host=self.host.split("://",1)else:protocol,host=None,self.hostifprotocol:uri+=f"{protocol}://"authority_block=""ifself.loginisnotNone:authority_block+=quote(self.login,safe="")ifself.passwordisnotNone:authority_block+=":"+quote(self.password,safe="")ifauthority_block>"":authority_block+="@"uri+=authority_blockhost_block=""ifhost:host_block+=quote(host,safe="")ifself.port:ifhost_block==""andauthority_block=="":host_block+=f"@:{self.port}"else:host_block+=f":{self.port}"ifself.schema:host_block+=f"/{quote(self.schema,safe='')}"uri+=host_blockifself.extra:try:query:str|None=urlencode(self.extra_dejson)exceptTypeError:query=Noneifqueryandself.extra_dejson==dict(parse_qsl(query,keep_blank_values=True)):uri+=("?"ifself.schemaelse"/?")+queryelse:uri+=("?"ifself.schemaelse"/?")+urlencode({self.EXTRA_KEY:self.extra})returnuri
[docs]defget_password(self)->str|None:"""Return encrypted password."""ifself._passwordandself.is_encrypted:fernet=get_fernet()ifnotfernet.is_encrypted:raiseAirflowException(f"Can't decrypt encrypted password for login={self.login} "f"FERNET_KEY configuration is missing")returnfernet.decrypt(bytes(self._password,"utf-8")).decode()else:returnself._password
[docs]defset_password(self,value:str|None):"""Encrypt password and set in object attribute."""ifvalue:fernet=get_fernet()self._password=fernet.encrypt(bytes(value,"utf-8")).decode()self.is_encrypted=fernet.is_encrypted
@declared_attr
[docs]defpassword(cls):"""Password. The value is decrypted/encrypted when reading/setting the value."""returnsynonym("_password",descriptor=property(cls.get_password,cls.set_password))
[docs]defget_extra(self)->str:"""Return encrypted extra-data."""ifself._extraandself.is_extra_encrypted:fernet=get_fernet()ifnotfernet.is_encrypted:raiseAirflowException(f"Can't decrypt `extra` params for login={self.login}, "f"FERNET_KEY configuration is missing")extra_val=fernet.decrypt(bytes(self._extra,"utf-8")).decode()else:extra_val=self._extraifextra_val:self._validate_extra(extra_val,self.conn_id)returnextra_val
[docs]defset_extra(self,value:str):"""Encrypt extra-data and save in object attribute to object."""ifvalue:self._validate_extra(value,self.conn_id)fernet=get_fernet()self._extra=fernet.encrypt(bytes(value,"utf-8")).decode()self.is_extra_encrypted=fernet.is_encryptedelse:self._extra=valueself.is_extra_encrypted=False
@declared_attr
[docs]defextra(cls):"""Extra data. The value is decrypted/encrypted when reading/setting the value."""returnsynonym("_extra",descriptor=property(cls.get_extra,cls.set_extra))
[docs]defrotate_fernet_key(self):"""Encrypts data with a new key. See: :ref:`security/fernet`."""fernet=get_fernet()ifself._passwordandself.is_encrypted:self._password=fernet.rotate(self._password.encode("utf-8")).decode()ifself._extraandself.is_extra_encrypted:self._extra=fernet.rotate(self._extra.encode("utf-8")).decode()
[docs]defget_hook(self,*,hook_params=None):"""Return hook based on conn_type."""fromairflow.providers_managerimportProvidersManagerhook=ProvidersManager().hooks.get(self.conn_type,None)ifhookisNone:raiseAirflowException(f'Unknown hook type "{self.conn_type}"')try:hook_class=import_string(hook.hook_class_name)exceptImportError:log.error("Could not import %s when discovering %s%s",hook.hook_class_name,hook.hook_name,hook.package_name,)raiseifhook_paramsisNone:hook_params={}returnhook_class(**{hook.connection_id_attribute_name:self.conn_id},**hook_params)
[docs]deflog_info(self):""" Read each field individually or use the default representation (`__repr__`). This method is deprecated. """warnings.warn("This method is deprecated. You can read each field individually or ""use the default representation (__repr__).",RemovedInAirflow3Warning,stacklevel=2,)return(f"id: {self.conn_id}. Host: {self.host}, Port: {self.port}, Schema: {self.schema}, "f"Login: {self.login}, Password: {'XXXXXXXX'ifself.passwordelseNone}, "f"extra: {'XXXXXXXX'ifself.extra_dejsonelseNone}")
[docs]defdebug_info(self):""" Read each field individually or use the default representation (`__repr__`). This method is deprecated. """warnings.warn("This method is deprecated. You can read each field individually or ""use the default representation (__repr__).",RemovedInAirflow3Warning,stacklevel=2,)return(f"id: {self.conn_id}. Host: {self.host}, Port: {self.port}, Schema: {self.schema}, "f"Login: {self.login}, Password: {'XXXXXXXX'ifself.passwordelseNone}, "f"extra: {self.extra_dejson}")
[docs]deftest_connection(self):"""Calls out get_hook method and executes test_connection method on that."""status,message=False,""try:hook=self.get_hook()ifgetattr(hook,"test_connection",False):status,message=hook.test_connection()else:message=(f"Hook {hook.__class__.__name__} doesn't implement or inherit test_connection method")exceptExceptionase:message=str(e)returnstatus,message
[docs]defget_extra_dejson(self,nested:bool=False)->dict:""" Deserialize extra property to JSON. :param nested: Determines whether nested structures are also deserialized into JSON (default False). """extra={}ifself.extra:try:ifnested:forkey,valueinjson.loads(self.extra).items():extra[key]=valueifisinstance(value,str):withsuppress(JSONDecodeError):extra[key]=json.loads(value)else:extra=json.loads(self.extra)exceptJSONDecodeError:self.log.exception("Failed parsing the json for conn_id %s",self.conn_id)# Mask sensitive keys from this listmask_secret(extra)returnextra
@property
[docs]defextra_dejson(self)->dict:"""Returns the extra property by deserializing json."""returnself.get_extra_dejson()
@classmethod
[docs]defget_connection_from_secrets(cls,conn_id:str)->Connection:""" Get connection by conn_id. :param conn_id: connection id :return: connection """# check cache first# enabled only if SecretCache.init() has been called firsttry:uri=SecretCache.get_connection_uri(conn_id)returnConnection(conn_id=conn_id,uri=uri)exceptSecretCache.NotPresentException:pass# continue business# iterate over backends if not in cache (or expired)forsecrets_backendinensure_secrets_loaded():try:conn=secrets_backend.get_connection(conn_id=conn_id)ifconn:SecretCache.save_connection_uri(conn_id,conn.get_uri())returnconnexceptException:log.exception("Unable to retrieve connection from secrets backend (%s). ""Checking subsequent secrets backend.",type(secrets_backend).__name__,)raiseAirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
defto_dict(self,*,prune_empty:bool=False,validate:bool=True)->dict[str,Any]:""" Convert Connection to json-serializable dictionary. :param prune_empty: Whether or not remove empty values. :param validate: Validate dictionary is JSON-serializable :meta private: """conn={"conn_id":self.conn_id,"conn_type":self.conn_type,"description":self.description,"host":self.host,"login":self.login,"password":self.password,"schema":self.schema,"port":self.port,}ifprune_empty:conn=prune_dict(val=conn,mode="strict")if(extra:=self.extra_dejson)ornotprune_empty:conn["extra"]=extraifvalidate:json.dumps(conn)returnconn@classmethod
[docs]deffrom_json(cls,value,conn_id=None)->Connection:kwargs=json.loads(value)extra=kwargs.pop("extra",None)ifextra:kwargs["extra"]=extraifisinstance(extra,str)elsejson.dumps(extra)conn_type=kwargs.pop("conn_type",None)ifconn_type:kwargs["conn_type"]=cls._normalize_conn_type(conn_type)port=kwargs.pop("port",None)ifport:try:kwargs["port"]=int(port)exceptValueError:raiseValueError(f"Expected integer value for `port`, but got {port!r} instead.")returnConnection(conn_id=conn_id,**kwargs)
[docs]defas_json(self)->str:"""Convert Connection to JSON-string object."""conn_repr=self.to_dict(prune_empty=True,validate=False)conn_repr.pop("conn_id",None)returnjson.dumps(conn_repr)