## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportloggingimportwarningsfromjsonimportJSONDecodeErrorfromurllib.parseimportparse_qsl,quote,unquote,urlencode,urlsplitfromsqlalchemyimportBoolean,Column,Integer,String,Textfromsqlalchemy.ext.declarativeimportdeclared_attrfromsqlalchemy.ormimportreconstructor,synonymfromairflow.configurationimportensure_secrets_loadedfromairflow.exceptionsimportAirflowException,AirflowNotFoundException,RemovedInAirflow3Warningfromairflow.models.baseimportID_LEN,Basefromairflow.models.cryptoimportget_fernetfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.module_loadingimportimport_string
[docs]defparse_netloc_to_hostname(*args,**kwargs):"""This method is deprecated."""warnings.warn("This method is deprecated.",RemovedInAirflow3Warning)return_parse_netloc_to_hostname(*args,**kwargs)
# Python automatically converts all letters to lowercase in hostname# See: https://issues.apache.org/jira/browse/AIRFLOW-3615def_parse_netloc_to_hostname(uri_parts):"""Parse a URI string to get correct Hostname."""hostname=unquote(uri_parts.hostnameor"")if"/"inhostname:hostname=uri_parts.netlocif"@"inhostname:hostname=hostname.rsplit("@",1)[1]if":"inhostname:hostname=hostname.split(":",1)[0]hostname=unquote(hostname)returnhostname
[docs]classConnection(Base,LoggingMixin):""" Placeholder to store information about different database instances connection information. The idea here is that scripts use references to database instances (conn_id) instead of hard coding hostname, logins and passwords when using operators or hooks. .. seealso:: For more information on how to use this class, see: :doc:`/howto/connection` :param conn_id: The connection ID. :param conn_type: The connection type. :param description: The connection description. :param host: The host. :param login: The login. :param password: The password. :param schema: The schema. :param port: The port number. :param extra: Extra metadata. Non-standard data such as private/SSH keys can be saved here. JSON encoded object. :param uri: URI address describing connection parameters. """
_extra=Column("extra",Text())def__init__(self,conn_id:str|None=None,conn_type:str|None=None,description:str|None=None,host:str|None=None,login:str|None=None,password:str|None=None,schema:str|None=None,port:int|None=None,extra:str|dict|None=None,uri:str|None=None,):super().__init__()self.conn_id=conn_idself.description=descriptionifextraandnotisinstance(extra,str):extra=json.dumps(extra)ifuriand(conn_typeorhostorloginorpasswordorschemaorportorextra):raiseAirflowException("You must create an object using the URI or individual values ""(conn_type, host, login, password, schema, port or extra).""You can't mix these two ways to create this object.")ifuri:self._parse_from_uri(uri)else:self.conn_type=conn_typeself.host=hostself.login=loginself.password=passwordself.schema=schemaself.port=portself.extra=extraifself.extra:self._validate_extra(self.extra,self.conn_id)ifself.password:mask_secret(self.password)@staticmethoddef_validate_extra(extra,conn_id)->None:""" Here we verify that ``extra`` is a JSON-encoded Python dict. From Airflow 3.0, we should no longer suppress these errors but raise instead. """ifextraisNone:returnNonetry:extra_parsed=json.loads(extra)ifnotisinstance(extra_parsed,dict):warnings.warn("Encountered JSON value in `extra` which does not parse as a dictionary in "f"connection {conn_id!r}. From Airflow 3.0, the `extra` field must contain a JSON ""representation of a Python dict.",RemovedInAirflow3Warning,stacklevel=3,)exceptjson.JSONDecodeError:warnings.warn(f"Encountered non-JSON in `extra` field for connection {conn_id!r}. Support for ""non-JSON `extra` will be removed in Airflow 3.0",RemovedInAirflow3Warning,stacklevel=2,)returnNone@reconstructor
[docs]defparse_from_uri(self,**uri):"""This method is deprecated. Please use uri parameter in constructor."""warnings.warn("This method is deprecated. Please use uri parameter in constructor.",RemovedInAirflow3Warning,)self._parse_from_uri(**uri)
[docs]defget_uri(self)->str:"""Return connection in URI format"""ifself.conn_typeand"_"inself.conn_type:self.log.warning("Connection schemes (type: %s) shall not contain '_' according to RFC3986.",self.conn_type,)ifself.conn_type:uri=f"{self.conn_type.lower().replace('_','-')}://"else:uri="//"authority_block=""ifself.loginisnotNone:authority_block+=quote(self.login,safe="")ifself.passwordisnotNone:authority_block+=":"+quote(self.password,safe="")ifauthority_block>"":authority_block+="@"uri+=authority_blockhost_block=""ifself.host:host_block+=quote(self.host,safe="")ifself.port:ifhost_block==""andauthority_block=="":host_block+=f"@:{self.port}"else:host_block+=f":{self.port}"ifself.schema:host_block+=f"/{quote(self.schema,safe='')}"uri+=host_blockifself.extra:try:query:str|None=urlencode(self.extra_dejson)exceptTypeError:query=Noneifqueryandself.extra_dejson==dict(parse_qsl(query,keep_blank_values=True)):uri+=("?"ifself.schemaelse"/?")+queryelse:uri+=("?"ifself.schemaelse"/?")+urlencode({self.EXTRA_KEY:self.extra})returnuri
[docs]defget_password(self)->str|None:"""Return encrypted password."""ifself._passwordandself.is_encrypted:fernet=get_fernet()ifnotfernet.is_encrypted:raiseAirflowException(f"Can't decrypt encrypted password for login={self.login} "f"FERNET_KEY configuration is missing")returnfernet.decrypt(bytes(self._password,"utf-8")).decode()else:returnself._password
[docs]defset_password(self,value:str|None):"""Encrypt password and set in object attribute."""ifvalue:fernet=get_fernet()self._password=fernet.encrypt(bytes(value,"utf-8")).decode()self.is_encrypted=fernet.is_encrypted
@declared_attr
[docs]defpassword(cls):"""Password. The value is decrypted/encrypted when reading/setting the value."""returnsynonym("_password",descriptor=property(cls.get_password,cls.set_password))
[docs]defget_extra(self)->str:"""Return encrypted extra-data."""ifself._extraandself.is_extra_encrypted:fernet=get_fernet()ifnotfernet.is_encrypted:raiseAirflowException(f"Can't decrypt `extra` params for login={self.login}, "f"FERNET_KEY configuration is missing")extra_val=fernet.decrypt(bytes(self._extra,"utf-8")).decode()else:extra_val=self._extraifextra_val:self._validate_extra(extra_val,self.conn_id)returnextra_val
[docs]defset_extra(self,value:str):"""Encrypt extra-data and save in object attribute to object."""ifvalue:self._validate_extra(value,self.conn_id)fernet=get_fernet()self._extra=fernet.encrypt(bytes(value,"utf-8")).decode()self.is_extra_encrypted=fernet.is_encryptedelse:self._extra=valueself.is_extra_encrypted=False
@declared_attr
[docs]defextra(cls):"""Extra data. The value is decrypted/encrypted when reading/setting the value."""returnsynonym("_extra",descriptor=property(cls.get_extra,cls.set_extra))
[docs]defrotate_fernet_key(self):"""Encrypts data with a new key. See: :ref:`security/fernet`"""fernet=get_fernet()ifself._passwordandself.is_encrypted:self._password=fernet.rotate(self._password.encode("utf-8")).decode()ifself._extraandself.is_extra_encrypted:self._extra=fernet.rotate(self._extra.encode("utf-8")).decode()
[docs]defget_hook(self,*,hook_params=None):"""Return hook based on conn_type"""fromairflow.providers_managerimportProvidersManagerhook=ProvidersManager().hooks.get(self.conn_type,None)ifhookisNone:raiseAirflowException(f'Unknown hook type "{self.conn_type}"')try:hook_class=import_string(hook.hook_class_name)exceptImportError:warnings.warn("Could not import %s when discovering %s%s",hook.hook_class_name,hook.hook_name,hook.package_name,)raiseifhook_paramsisNone:hook_params={}returnhook_class(**{hook.connection_id_attribute_name:self.conn_id},**hook_params)
[docs]deflog_info(self):""" This method is deprecated. You can read each field individually or use the default representation (`__repr__`). """warnings.warn("This method is deprecated. You can read each field individually or ""use the default representation (__repr__).",RemovedInAirflow3Warning,stacklevel=2,)return(f"id: {self.conn_id}. Host: {self.host}, Port: {self.port}, Schema: {self.schema}, "
[docs]defdebug_info(self):""" This method is deprecated. You can read each field individually or use the default representation (`__repr__`). """warnings.warn("This method is deprecated. You can read each field individually or ""use the default representation (__repr__).",RemovedInAirflow3Warning,stacklevel=2,)return(f"id: {self.conn_id}. Host: {self.host}, Port: {self.port}, Schema: {self.schema}, "
[docs]deftest_connection(self):"""Calls out get_hook method and executes test_connection method on that."""status,message=False,""try:hook=self.get_hook()ifgetattr(hook,"test_connection",False):status,message=hook.test_connection()else:message=(f"Hook {hook.__class__.__name__} doesn't implement or inherit test_connection method")exceptExceptionase:message=str(e)returnstatus,message
@property
[docs]defextra_dejson(self)->dict:"""Returns the extra property by deserializing json."""obj={}ifself.extra:try:obj=json.loads(self.extra)exceptJSONDecodeError:self.log.exception("Failed parsing the json for conn_id %s",self.conn_id)# Mask sensitive keys from this listmask_secret(obj)returnobj
@classmethod
[docs]defget_connection_from_secrets(cls,conn_id:str)->Connection:""" Get connection by conn_id. :param conn_id: connection id :return: connection """forsecrets_backendinensure_secrets_loaded():try:conn=secrets_backend.get_connection(conn_id=conn_id)ifconn:returnconnexceptException:log.exception("Unable to retrieve connection from secrets backend (%s). ""Checking subsequent secrets backend.",type(secrets_backend).__name__,)raiseAirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
@classmethod
[docs]deffrom_json(cls,value,conn_id=None)->Connection:kwargs=json.loads(value)extra=kwargs.pop("extra",None)ifextra:kwargs["extra"]=extraifisinstance(extra,str)elsejson.dumps(extra)conn_type=kwargs.pop("conn_type",None)ifconn_type:kwargs["conn_type"]=cls._normalize_conn_type(conn_type)port=kwargs.pop("port",None)ifport:try:kwargs["port"]=int(port)exceptValueError:raiseValueError(f"Expected integer value for `port`, but got {port!r} instead.")returnConnection(conn_id=conn_id,**kwargs)