## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importjsonimportloggingimportwarningsfromjsonimportJSONDecodeErrorfromtypingimportDict,Optional,Unionfromurllib.parseimportparse_qsl,quote,unquote,urlencode,urlparsefromsqlalchemyimportBoolean,Column,Integer,String,Textfromsqlalchemy.ext.declarativeimportdeclared_attrfromsqlalchemy.ormimportreconstructor,synonymfromairflow.configurationimportensure_secrets_loadedfromairflow.exceptionsimportAirflowException,AirflowNotFoundExceptionfromairflow.models.baseimportID_LEN,Basefromairflow.models.cryptoimportget_fernetfromairflow.providers_managerimportProvidersManagerfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.module_loadingimportimport_string
[docs]defparse_netloc_to_hostname(*args,**kwargs):"""This method is deprecated."""warnings.warn("This method is deprecated.",DeprecationWarning)return_parse_netloc_to_hostname(*args,**kwargs)
# Python automatically converts all letters to lowercase in hostname# See: https://issues.apache.org/jira/browse/AIRFLOW-3615def_parse_netloc_to_hostname(uri_parts):"""Parse a URI string to get correct Hostname."""hostname=unquote(uri_parts.hostnameor'')if'/'inhostname:hostname=uri_parts.netlocif"@"inhostname:hostname=hostname.rsplit("@",1)[1]if":"inhostname:hostname=hostname.split(":",1)[0]hostname=unquote(hostname)returnhostname
[docs]classConnection(Base,LoggingMixin):""" Placeholder to store information about different database instances connection information. The idea here is that scripts use references to database instances (conn_id) instead of hard coding hostname, logins and passwords when using operators or hooks. .. seealso:: For more information on how to use this class, see: :doc:`/howto/connection` :param conn_id: The connection ID. :type conn_id: str :param conn_type: The connection type. :type conn_type: str :param description: The connection description. :type description: str :param host: The host. :type host: str :param login: The login. :type login: str :param password: The password. :type password: str :param schema: The schema. :type schema: str :param port: The port number. :type port: int :param extra: Extra metadata. Non-standard data such as private/SSH keys can be saved here. JSON encoded object. :type extra: str :param uri: URI address describing connection parameters. :type uri: str """
_extra=Column('extra',Text())def__init__(self,conn_id:Optional[str]=None,conn_type:Optional[str]=None,description:Optional[str]=None,host:Optional[str]=None,login:Optional[str]=None,password:Optional[str]=None,schema:Optional[str]=None,port:Optional[int]=None,extra:Optional[Union[str,dict]]=None,uri:Optional[str]=None,):super().__init__()self.conn_id=conn_idself.description=descriptionifextraandnotisinstance(extra,str):extra=json.dumps(extra)ifuriand(conn_typeorhostorloginorpasswordorschemaorportorextra):raiseAirflowException("You must create an object using the URI or individual values ""(conn_type, host, login, password, schema, port or extra).""You can't mix these two ways to create this object.")ifuri:self._parse_from_uri(uri)else:self.conn_type=conn_typeself.host=hostself.login=loginself.password=passwordself.schema=schemaself.port=portself.extra=extraifself.password:mask_secret(self.password)@reconstructor
[docs]defparse_from_uri(self,**uri):"""This method is deprecated. Please use uri parameter in constructor."""warnings.warn("This method is deprecated. Please use uri parameter in constructor.",DeprecationWarning)self._parse_from_uri(**uri)
[docs]defget_uri(self)->str:"""Return connection in URI format"""uri=f"{str(self.conn_type).lower().replace('_','-')}://"authority_block=''ifself.loginisnotNone:authority_block+=quote(self.login,safe='')ifself.passwordisnotNone:authority_block+=':'+quote(self.password,safe='')ifauthority_block>'':authority_block+='@'uri+=authority_blockhost_block=''ifself.host:host_block+=quote(self.host,safe='')ifself.port:ifhost_block>'':host_block+=f':{self.port}'else:host_block+=f'@:{self.port}'ifself.schema:host_block+=f"/{quote(self.schema,safe='')}"uri+=host_blockifself.extra:try:query=urlencode(self.extra_dejson)exceptTypeError:query=Noneifqueryandself.extra_dejson==dict(parse_qsl(query,keep_blank_values=True)):uri+='?'+queryelse:uri+='?'+urlencode({self.EXTRA_KEY:self.extra})returnuri
[docs]defget_password(self)->Optional[str]:"""Return encrypted password."""ifself._passwordandself.is_encrypted:fernet=get_fernet()ifnotfernet.is_encrypted:raiseAirflowException("Can't decrypt encrypted password for login={}, \ FERNET_KEY configuration is missing".format(self.login))returnfernet.decrypt(bytes(self._password,'utf-8')).decode()else:returnself._password
[docs]defset_password(self,value:Optional[str]):"""Encrypt password and set in object attribute."""ifvalue:fernet=get_fernet()self._password=fernet.encrypt(bytes(value,'utf-8')).decode()self.is_encrypted=fernet.is_encrypted
@declared_attr
[docs]defpassword(cls):"""Password. The value is decrypted/encrypted when reading/setting the value."""returnsynonym('_password',descriptor=property(cls.get_password,cls.set_password))
[docs]defget_extra(self)->Dict:"""Return encrypted extra-data."""ifself._extraandself.is_extra_encrypted:fernet=get_fernet()ifnotfernet.is_encrypted:raiseAirflowException("Can't decrypt `extra` params for login={},\ FERNET_KEY configuration is missing".format(self.login))returnfernet.decrypt(bytes(self._extra,'utf-8')).decode()else:returnself._extra
[docs]defset_extra(self,value:str):"""Encrypt extra-data and save in object attribute to object."""ifvalue:fernet=get_fernet()self._extra=fernet.encrypt(bytes(value,'utf-8')).decode()self.is_extra_encrypted=fernet.is_encryptedelse:self._extra=valueself.is_extra_encrypted=False
@declared_attr
[docs]defextra(cls):"""Extra data. The value is decrypted/encrypted when reading/setting the value."""returnsynonym('_extra',descriptor=property(cls.get_extra,cls.set_extra))
[docs]defrotate_fernet_key(self):"""Encrypts data with a new key. See: :ref:`security/fernet`"""fernet=get_fernet()ifself._passwordandself.is_encrypted:self._password=fernet.rotate(self._password.encode('utf-8')).decode()ifself._extraandself.is_extra_encrypted:self._extra=fernet.rotate(self._extra.encode('utf-8')).decode()
[docs]defget_hook(self):"""Return hook based on conn_type."""(hook_class_name,conn_id_param,package_name,hook_name,connection_type,)=ProvidersManager().hooks.get(self.conn_type,(None,None,None,None,None))ifnothook_class_name:raiseAirflowException(f'Unknown hook type "{self.conn_type}"')try:hook_class=import_string(hook_class_name)exceptImportError:warnings.warn("Could not import %s when discovering %s%s",hook_class_name,hook_name,package_name)raisereturnhook_class(**{conn_id_param:self.conn_id})
[docs]deflog_info(self):""" This method is deprecated. You can read each field individually or use the default representation (`__repr__`). """warnings.warn("This method is deprecated. You can read each field individually or ""use the default representation (__repr__).",DeprecationWarning,stacklevel=2,)return"id: {}. Host: {}, Port: {}, Schema: {}, Login: {}, Password: {}, extra: {}".format(self.conn_id,self.host,self.port,self.schema,self.login,"XXXXXXXX"ifself.passwordelseNone,"XXXXXXXX"ifself.extra_dejsonelseNone,
)
[docs]defdebug_info(self):""" This method is deprecated. You can read each field individually or use the default representation (`__repr__`). """warnings.warn("This method is deprecated. You can read each field individually or ""use the default representation (__repr__).",DeprecationWarning,stacklevel=2,)return"id: {}. Host: {}, Port: {}, Schema: {}, Login: {}, Password: {}, extra: {}".format(self.conn_id,self.host,self.port,self.schema,self.login,"XXXXXXXX"ifself.passwordelseNone,self.extra_dejson,
)
[docs]deftest_connection(self):"""Calls out get_hook method and executes test_connection method on that."""status,message=False,''try:hook=self.get_hook()ifgetattr(hook,'test_connection',False):status,message=hook.test_connection()else:message=(f"Hook {hook.__class__.__name__} doesn't implement or inherit test_connection method")exceptExceptionase:message=str(e)returnstatus,message
@property
[docs]defextra_dejson(self)->Dict:"""Returns the extra property by deserializing json."""obj={}ifself.extra:try:obj=json.loads(self.extra)exceptJSONDecodeError:self.log.exception("Failed parsing the json for conn_id %s",self.conn_id)# Mask sensitive keys from this listmask_secret(obj)returnobj
@classmethod
[docs]defget_connection_from_secrets(cls,conn_id:str)->'Connection':""" Get connection by conn_id. :param conn_id: connection id :return: connection """forsecrets_backendinensure_secrets_loaded():try:conn=secrets_backend.get_connection(conn_id=conn_id)ifconn:returnconnexceptException:# pylint: disable=broad-exceptlog.exception('Unable to retrieve connection from secrets backend (%s). ''Checking subsequent secrets backend.',type(secrets_backend).__name__,)raiseAirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")