Source code for airflow.providers.amazon.aws.utils.connection_wrapper
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportwarningsfromcopyimportdeepcopyfromdataclassesimportMISSING,InitVar,dataclass,field,fieldsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,AnyfrombotocoreimportUNSIGNEDfrombotocore.configimportConfigfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.utilsimporttrim_none_valuesfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.typesimportNOTSET,ArgNotSetifTYPE_CHECKING:fromairflow.models.connectionimportConnection# Avoid circular imports.@dataclassclass_ConnectionMetadata:""" Connection metadata data-class. This class implements main :ref:`~airflow.models.connection.Connection` attributes and use in AwsConnectionWrapper for avoid circular imports. Only for internal usage, this class might change or removed in the future. """conn_id:str|None=Noneconn_type:str|None=Nonedescription:str|None=Nonehost:str|None=Nonelogin:str|None=Nonepassword:str|None=Noneschema:str|None=Noneport:int|None=Noneextra:str|dict|None=None@propertydefextra_dejson(self):ifnotself.extra:return{}extra=deepcopy(self.extra)ifisinstance(extra,str):try:extra=json.loads(extra)exceptjson.JSONDecodeErroraserr:raiseAirflowException(f"'extra' expected valid JSON-Object string. Original error:\n * {err}")fromNoneifnotisinstance(extra,dict):raiseTypeError(f"Expected JSON-Object or dict, got {type(extra).__name__}.")returnextra@dataclass
[docs]classAwsConnectionWrapper(LoggingMixin):""" AWS Connection Wrapper class helper. Use for validate and resolve AWS Connection parameters. ``conn`` references an Airflow Connection object or AwsConnectionWrapper if it set to ``None`` than default values would use. The precedence rules for ``region_name`` 1. Explicit set (in Hook) ``region_name``. 2. Airflow Connection Extra 'region_name'. The precedence rules for ``botocore_config`` 1. Explicit set (in Hook) ``botocore_config``. 2. Construct from Airflow Connection Extra 'botocore_kwargs'. 3. The wrapper's default value """
[docs]defget_service_config(self,service_name:str)->dict[str,Any]:""" Get AWS Service related config dictionary. :param service_name: Name of botocore/boto3 service. """returnself.service_config.get(service_name,{})
[docs]defget_service_endpoint_url(self,service_name:str,*,sts_connection_assume:bool=False,sts_test_connection:bool=False)->str|None:service_config=self.get_service_config(service_name=service_name)global_endpoint_url=self.endpoint_urlifservice_name=="sts"andTruein(sts_connection_assume,sts_test_connection):# There are different logics exists historically for STS Client# 1. For assume role we never use global endpoint_url# 2. For test connection we also use undocumented `test_endpoint`\# 3. For STS as service we might use endpoint_url (default for other services)global_endpoint_url=Noneifsts_connection_assumeandsts_test_connection:raiseAirflowException("Can't resolve STS endpoint when both ""`sts_connection` and `sts_test_connection` set to True.")returnservice_config.get("endpoint_url",global_endpoint_url)
[docs]def__post_init__(self,conn:Connection|AwsConnectionWrapper|_ConnectionMetadata|None)->None:"""Initialize the AwsConnectionWrapper object after instantiation."""ifisinstance(conn,type(self)):# For every field with init=False we copy reference value from original wrapper# For every field with init=True we use init values if it not equal default# We can't use ``dataclasses.replace`` in classmethod because# we limited by InitVar arguments since it not stored in object,# and also we do not want to run __post_init__ method again which print all logs/warnings again.forflinfields(conn):value=getattr(conn,fl.name)ifnotfl.init:setattr(self,fl.name,value)else:iffl.defaultisnotMISSING:default=fl.defaulteliffl.default_factoryisnotMISSING:default=fl.default_factory()# zero-argument callableelse:continue# Value mandatory, skiporig_value=getattr(self,fl.name)iforig_value==default:# Only replace value if it not equal default valuesetattr(self,fl.name,value)returnelifnotconn:returnifTYPE_CHECKING:assertisinstance(conn,(Connection,_ConnectionMetadata))# Assign attributes from AWS Connectionself.conn_id=conn.conn_idself.conn_type=conn.conn_typeor"aws"self.login=conn.loginself.password=conn.passwordself.schema=conn.schemaorNoneself.extra_config=deepcopy(conn.extra_dejson)ifself.conn_type!="aws":warnings.warn(f"{self.conn_repr} expected connection type 'aws', got {self.conn_type!r}. ""This connection might not work correctly. ""Please use Amazon Web Services Connection type.",UserWarning,stacklevel=2,)extra=deepcopy(conn.extra_dejson)self.service_config=extra.get("service_config",{})# Retrieve initial connection credentialsinit_credentials=self._get_credentials(**extra)self.aws_access_key_id,self.aws_secret_access_key,self.aws_session_token=init_credentialsifnotself.region_name:if"region_name"inextra:self.region_name=extra["region_name"]self.log.debug("Retrieving region_name=%s from %s extra.",self.region_name,self.conn_repr)ifself.verifyisNoneand"verify"inextra:self.verify=extra["verify"]self.log.debug("Retrieving verify=%s from %s extra.",self.verify,self.conn_repr)if"profile_name"inextra:self.profile_name=extra["profile_name"]self.log.debug("Retrieving profile_name=%s from %s extra.",self.profile_name,self.conn_repr)# Warn the user that an invalid parameter is being used which actually not related to 'profile_name'.# ToDo: Remove this check entirely as soon as drop support credentials from s3_config_fileif"profile"inextraand"s3_config_file"notinextraandnotself.profile_name:warnings.warn(f"Found 'profile' without specifying 's3_config_file' in {self.conn_repr} extra. ""If required profile from AWS Shared Credentials please "f"set 'profile_name' in {self.conn_repr} extra.",UserWarning,stacklevel=2,)config_kwargs=extra.get("config_kwargs")ifnotself.botocore_configandconfig_kwargs:# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.htmlself.log.debug("Retrieving botocore config=%s from %s extra.",config_kwargs,self.conn_repr)ifconfig_kwargs.get("signature_version")=="unsigned":config_kwargs["signature_version"]=UNSIGNEDself.botocore_config=Config(**config_kwargs)self.endpoint_url=extra.get("endpoint_url")# Retrieve Assume Role Configurationassume_role_configs=self._get_assume_role_configs(**extra)self.role_arn,self.assume_role_method,self.assume_role_kwargs=assume_role_configs
[docs]defextra_dejson(self):"""Compatibility with `airflow.models.Connection.extra_dejson` property."""returnself.extra_config
@property
[docs]defsession_kwargs(self)->dict[str,Any]:"""Additional kwargs passed to boto3.session.Session."""returntrim_none_values({"aws_access_key_id":self.aws_access_key_id,"aws_secret_access_key":self.aws_secret_access_key,"aws_session_token":self.aws_session_token,"region_name":self.region_name,"profile_name":self.profile_name,})
[docs]def__bool__(self):"""Return the truth value of the AwsConnectionWrapper instance."""returnself.conn_idisnotNOTSET
def_get_credentials(self,*,aws_access_key_id:str|None=None,aws_secret_access_key:str|None=None,aws_session_token:str|None=None,session_kwargs:dict[str,Any]|None=None,**kwargs,)->tuple[str|None,str|None,str|None]:""" Get AWS credentials from connection login/password and extra. ``aws_access_key_id`` and ``aws_secret_access_key`` order: 1. From Connection login and password 2. From Connection ``extra['aws_access_key_id']`` and ``extra['aws_access_key_id']`` 3. (deprecated) Form Connection ``extra['session_kwargs']`` 4. (deprecated) From a local credentials file Get ``aws_session_token`` from ``extra['aws_access_key_id']``. """session_kwargs=session_kwargsor{}session_aws_access_key_id=session_kwargs.get("aws_access_key_id")session_aws_secret_access_key=session_kwargs.get("aws_secret_access_key")session_aws_session_token=session_kwargs.get("aws_session_token")ifself.loginandself.password:self.log.info("%s credentials retrieved from login and password.",self.conn_repr)aws_access_key_id,aws_secret_access_key=self.login,self.passwordelifaws_access_key_idandaws_secret_access_key:self.log.info("%s credentials retrieved from extra.",self.conn_repr)elifsession_aws_access_key_idandsession_aws_secret_access_key:aws_access_key_id=session_aws_access_key_idaws_secret_access_key=session_aws_secret_access_keyself.log.info("%s credentials retrieved from extra['session_kwargs'].",self.conn_repr)ifaws_session_token:self.log.info("%s session token retrieved from extra, please note you are responsible for renewing these.",self.conn_repr,)elifsession_aws_session_token:aws_session_token=session_aws_session_tokenself.log.info("%s session token retrieved from extra['session_kwargs'], ""please note you are responsible for renewing these.",self.conn_repr,)returnaws_access_key_id,aws_secret_access_key,aws_session_tokendef_get_assume_role_configs(self,*,role_arn:str|None=None,assume_role_method:str="assume_role",assume_role_kwargs:dict[str,Any]|None=None,**kwargs,)->tuple[str|None,str|None,dict[Any,str]]:"""Get assume role configs from Connection extra."""ifrole_arn:self.log.debug("Retrieving role_arn=%r from %s extra.",role_arn,self.conn_repr)else:# There is no reason obtain `assume_role_method` and `assume_role_kwargs` if `role_arn` not set.returnNone,None,{}supported_methods=["assume_role","assume_role_with_saml","assume_role_with_web_identity"]ifassume_role_methodnotinsupported_methods:raiseNotImplementedError(f"Found assume_role_method={assume_role_method!r} in {self.conn_repr} extra."f" Currently {supported_methods} are supported."' (Exclude this setting will default to "assume_role").')self.log.debug("Retrieve assume_role_method=%r from %s.",assume_role_method,self.conn_repr)assume_role_kwargs=assume_role_kwargsor{}returnrole_arn,assume_role_method,assume_role_kwargs