Source code for airflow.providers.amazon.aws.utils.connection_wrapper
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importwarningsfromcopyimportdeepcopyfromdataclassesimportMISSING,InitVar,dataclass,field,fieldsfromtypingimportTYPE_CHECKING,Any,Dict,Optional,Tuple,Unionfrombotocore.configimportConfigfromairflow.compat.functoolsimportcached_propertyfromairflow.exceptionsimportAirflowExceptionfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromairflow.models.connectionimportConnection@dataclass
[docs]classAwsConnectionWrapper(LoggingMixin):""" AWS Connection Wrapper class helper. Use for validate and resolve AWS Connection parameters. ``conn`` reference to Airflow Connection object or AwsConnectionWrapper if it set to ``None`` than default values would use. The precedence rules for ``region_name`` 1. Explicit set (in Hook) ``region_name``. 2. Airflow Connection Extra 'region_name'. The precedence rules for ``botocore_config`` 1. Explicit set (in Hook) ``botocore_config``. 2. Construct from Airflow Connection Extra 'botocore_kwargs'. 3. The wrapper's default value """
[docs]def__post_init__(self,conn:"Connection"):ifisinstance(conn,type(self)):# For every field with init=False we copy reference value from original wrapper# For every field with init=True we use init values if it not equal default# We can't use ``dataclasses.replace`` in classmethod because# we limited by InitVar arguments since it not stored in object,# and also we do not want to run __post_init__ method again which print all logs/warnings again.forflinfields(conn):value=getattr(conn,fl.name)ifnotfl.init:setattr(self,fl.name,value)else:iffl.defaultisnotMISSING:default=fl.defaulteliffl.default_factoryisnotMISSING:default=fl.default_factory()# zero-argument callableelse:continue# Value mandatory, skiporig_value=getattr(self,fl.name)iforig_value==default:# Only replace value if it not equal default valuesetattr(self,fl.name,value)returnelifnotconn:returnextra=deepcopy(conn.extra_dejson)# Assign attributes from AWS Connectionself.conn_id=conn.conn_idself.conn_type=conn.conn_typeor"aws"self.login=conn.loginself.password=conn.passwordself.extra_config=deepcopy(conn.extra_dejson)ifself.conn_type!="aws":warnings.warn(f"{self.conn_repr} expected connection type 'aws', got {self.conn_type!r}.",UserWarning,stacklevel=2,)# Retrieve initial connection credentialsinit_credentials=self._get_credentials(**extra)self.aws_access_key_id,self.aws_secret_access_key,self.aws_session_token=init_credentialsifnotself.region_nameand"region_name"inextra:self.region_name=extra["region_name"]self.log.info("Retrieving region_name=%s from %s extra.",self.region_name,self.conn_repr)if"session_kwargs"inextra:self.session_kwargs=extra["session_kwargs"]self.log.info("Retrieving session_kwargs=%s from %s extra.",self.session_kwargs,self.conn_repr)# Warn the user that an invalid parameter is being used which actually not related to 'profile_name'.if"profile"inextraand"s3_config_file"notinextra:if"profile_name"notinself.session_kwargs:warnings.warn(f"Found 'profile' without specifying 's3_config_file' in {self.conn_repr} extra. ""If required profile from AWS Shared Credentials please "f"set 'profile_name' in {self.conn_repr} extra['session_kwargs'].",UserWarning,stacklevel=2,)config_kwargs=extra.get("config_kwargs")ifnotself.botocore_configandconfig_kwargs:# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.htmlself.log.info("Retrieving botocore config=%s from %s extra.",config_kwargs,self.conn_repr)self.botocore_config=Config(**config_kwargs)ifconn.host:warnings.warn(f"Host {conn.host} specified in the connection is not used."" Please, set it on extra['endpoint_url'] instead",DeprecationWarning,stacklevel=2,)self.endpoint_url=extra.get("host")ifself.endpoint_url:warnings.warn("extra['host'] is deprecated and will be removed in a future release."" Please set extra['endpoint_url'] instead",DeprecationWarning,stacklevel=2,)else:self.endpoint_url=extra.get("endpoint_url")# Retrieve Assume Role Configurationassume_role_configs=self._get_assume_role_configs(**extra)self.role_arn,self.assume_role_method,self.assume_role_kwargs=assume_role_configs
@property
[docs]defextra_dejson(self):"""Compatibility with `airflow.models.Connection.extra_dejson` property."""returnself.extra_config
def_get_credentials(self,*,aws_access_key_id:Optional[str]=None,aws_secret_access_key:Optional[str]=None,aws_session_token:Optional[str]=None,# Deprecated Valuess3_config_file:Optional[str]=None,s3_config_format:Optional[str]=None,profile:Optional[str]=None,**kwargs,)->Tuple[Optional[str],Optional[str],Optional[str]]:""" Get AWS credentials from connection login/password and extra. ``aws_access_key_id`` and ``aws_secret_access_key`` order 1. From Connection login and password 2. From Connection extra['aws_access_key_id'] and extra['aws_access_key_id'] 3. (deprecated) From local credentials file Get ``aws_session_token`` from extra['aws_access_key_id'] """ifself.loginandself.password:self.log.info("%s credentials retrieved from login and password.",self.conn_repr)aws_access_key_id,aws_secret_access_key=self.login,self.passwordelifaws_access_key_idandaws_secret_access_key:self.log.info("%s credentials retrieved from extra.",self.conn_repr)elifs3_config_file:aws_access_key_id,aws_secret_access_key=_parse_s3_config(s3_config_file,s3_config_format,profile,)self.log.info("%s credentials retrieved from extra['s3_config_file']",self.conn_repr)ifaws_session_token:self.log.info("%s session token retrieved from extra, please note you are responsible for renewing these.",self.conn_repr,)returnaws_access_key_id,aws_secret_access_key,aws_session_tokendef_get_assume_role_configs(self,*,role_arn:Optional[str]=None,assume_role_method:str="assume_role",assume_role_kwargs:Optional[Dict[str,Any]]=None,# Deprecated Valuesaws_account_id:Optional[str]=None,aws_iam_role:Optional[str]=None,external_id:Optional[str]=None,**kwargs,)->Tuple[Optional[str],Optional[str],Dict[Any,str]]:"""Get assume role configs from Connection extra."""ifrole_arn:self.log.info("Retrieving role_arn=%r from %s extra.",role_arn,self.conn_repr)elifaws_account_idandaws_iam_role:warnings.warn("Constructing 'role_arn' from extra['aws_account_id'] and extra['aws_iam_role'] is deprecated"f" and will be removed in a future releases."f" Please set 'role_arn' in {self.conn_repr} extra.",DeprecationWarning,stacklevel=3,)role_arn=f"arn:aws:iam::{aws_account_id}:role/{aws_iam_role}"self.log.info("Constructions role_arn=%r from %s extra['aws_account_id'] and extra['aws_iam_role'].",role_arn,self.conn_repr,)ifnotrole_arn:# There is no reason obtain `assume_role_method` and `assume_role_kwargs` if `role_arn` not set.returnNone,None,{}supported_methods=['assume_role','assume_role_with_saml','assume_role_with_web_identity']ifassume_role_methodnotinsupported_methods:raiseNotImplementedError(f'Found assume_role_method={assume_role_method!r} in {self.conn_repr} extra.'f' Currently {supported_methods} are supported.'' (Exclude this setting will default to "assume_role").')self.log.info("Retrieve assume_role_method=%r from %s.",assume_role_method,self.conn_repr)assume_role_kwargs=assume_role_kwargsor{}if"ExternalId"notinassume_role_kwargsandexternal_id:warnings.warn("'external_id' in extra config is deprecated and will be removed in a future releases. "f"Please set 'ExternalId' in 'assume_role_kwargs' in {self.conn_repr} extra.",DeprecationWarning,stacklevel=3,)assume_role_kwargs["ExternalId"]=external_idreturnrole_arn,assume_role_method,assume_role_kwargs
def_parse_s3_config(config_file_name:str,config_format:Optional[str]="boto",profile:Optional[str]=None)->Tuple[Optional[str],Optional[str]]:""" Parses a config file for s3 credentials. Can currently parse boto, s3cmd.conf and AWS SDK config formats :param config_file_name: path to the config file :param config_format: config type. One of "boto", "s3cmd" or "aws". Defaults to "boto" :param profile: profile name in AWS type config file """warnings.warn("Use local credentials file is never documented and well tested. ""Obtain credentials by this way deprecated and will be removed in a future releases.",DeprecationWarning,stacklevel=4,)importconfigparserconfig=configparser.ConfigParser()ifconfig.read(config_file_name):# pragma: no coversections=config.sections()else:raiseAirflowException(f"Couldn't read {config_file_name}")# Setting option names depending on file formatifconfig_formatisNone:config_format="boto"conf_format=config_format.lower()ifconf_format=="boto":# pragma: no coverifprofileisnotNoneand"profile "+profileinsections:cred_section="profile "+profileelse:cred_section="Credentials"elifconf_format=="aws"andprofileisnotNone:cred_section=profileelse:cred_section="default"# Option namesifconf_formatin("boto","aws"):# pragma: no coverkey_id_option="aws_access_key_id"secret_key_option="aws_secret_access_key"else:key_id_option="access_key"secret_key_option="secret_key"# Actual Parsingifcred_sectionnotinsections:raiseAirflowException("This config file format is not recognized")else:try:access_key=config.get(cred_section,key_id_option)secret_key=config.get(cred_section,secret_key_option)exceptException:raiseAirflowException("Option Error in parsing s3 config file")returnaccess_key,secret_key