Source code for airflow.providers.amazon.aws.utils.connection_wrapper
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportwarningsfromcopyimportdeepcopyfromdataclassesimportMISSING,InitVar,dataclass,field,fieldsfromtypingimportTYPE_CHECKING,Anyfrombotocore.configimportConfigfromairflow.compat.functoolsimportcached_propertyfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.utilsimporttrim_none_valuesfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.typesimportNOTSET,ArgNotSetifTYPE_CHECKING:fromairflow.models.connectionimportConnection# Avoid circular imports.@dataclassclass_ConnectionMetadata:"""Connection metadata data-class. This class implements main :ref:`~airflow.models.connection.Connection` attributes and use in AwsConnectionWrapper for avoid circular imports. Only for internal usage, this class might change or removed in the future. """conn_id:str|None=Noneconn_type:str|None=Nonedescription:str|None=Nonehost:str|None=Nonelogin:str|None=Nonepassword:str|None=Noneschema:str|None=Noneport:int|None=Noneextra:str|dict|None=None@propertydefextra_dejson(self):ifnotself.extra:return{}extra=deepcopy(self.extra)ifisinstance(extra,str):try:extra=json.loads(extra)exceptjson.JSONDecodeErroraserr:raiseAirflowException(f"'extra' expected valid JSON-Object string. Original error:\n * {err}")fromNoneifnotisinstance(extra,dict):raiseTypeError(f"Expected JSON-Object or dict, got {type(extra).__name__}.")returnextra@dataclass
[docs]classAwsConnectionWrapper(LoggingMixin):""" AWS Connection Wrapper class helper. Use for validate and resolve AWS Connection parameters. ``conn`` reference to Airflow Connection object or AwsConnectionWrapper if it set to ``None`` than default values would use. The precedence rules for ``region_name`` 1. Explicit set (in Hook) ``region_name``. 2. Airflow Connection Extra 'region_name'. The precedence rules for ``botocore_config`` 1. Explicit set (in Hook) ``botocore_config``. 2. Construct from Airflow Connection Extra 'botocore_kwargs'. 3. The wrapper's default value """
[docs]def__post_init__(self,conn:Connection):ifisinstance(conn,type(self)):# For every field with init=False we copy reference value from original wrapper# For every field with init=True we use init values if it not equal default# We can't use ``dataclasses.replace`` in classmethod because# we limited by InitVar arguments since it not stored in object,# and also we do not want to run __post_init__ method again which print all logs/warnings again.forflinfields(conn):value=getattr(conn,fl.name)ifnotfl.init:setattr(self,fl.name,value)else:iffl.defaultisnotMISSING:default=fl.defaulteliffl.default_factoryisnotMISSING:default=fl.default_factory()# zero-argument callableelse:continue# Value mandatory, skiporig_value=getattr(self,fl.name)iforig_value==default:# Only replace value if it not equal default valuesetattr(self,fl.name,value)returnelifnotconn:return# Assign attributes from AWS Connectionself.conn_id=conn.conn_idself.conn_type=conn.conn_typeor"aws"self.login=conn.loginself.password=conn.passwordself.schema=conn.schemaorNoneself.extra_config=deepcopy(conn.extra_dejson)ifself.conn_type.lower()=="s3":warnings.warn(f"{self.conn_repr} has connection type 's3', ""which has been replaced by connection type 'aws'. ""Please update your connection to have `conn_type='aws'`.",DeprecationWarning,stacklevel=2,)elifself.conn_type!="aws":warnings.warn(f"{self.conn_repr} expected connection type 'aws', got {self.conn_type!r}. ""This connection might not work correctly. ""Please use Amazon Web Services Connection type.",UserWarning,stacklevel=2,)extra=deepcopy(conn.extra_dejson)session_kwargs=extra.get("session_kwargs",{})ifsession_kwargs:warnings.warn("'session_kwargs' in extra config is deprecated and will be removed in a future releases. "f"Please specify arguments passed to boto3 Session directly in {self.conn_repr} extra.",DeprecationWarning,stacklevel=2,)# Retrieve initial connection credentialsinit_credentials=self._get_credentials(**extra)self.aws_access_key_id,self.aws_secret_access_key,self.aws_session_token=init_credentialsifnotself.region_name:if"region_name"inextra:self.region_name=extra["region_name"]self.log.debug("Retrieving region_name=%s from %s extra.",self.region_name,self.conn_repr)elif"region_name"insession_kwargs:self.region_name=session_kwargs["region_name"]self.log.debug("Retrieving region_name=%s from %s extra['session_kwargs'].",self.region_name,self.conn_repr,)ifself.verifyisNoneand"verify"inextra:self.verify=extra["verify"]self.log.debug("Retrieving verify=%s from %s extra.",self.verify,self.conn_repr)if"profile_name"inextra:self.profile_name=extra["profile_name"]self.log.debug("Retrieving profile_name=%s from %s extra.",self.profile_name,self.conn_repr)elif"profile_name"insession_kwargs:self.profile_name=session_kwargs["profile_name"]self.log.debug("Retrieving profile_name=%s from %s extra['session_kwargs'].",self.profile_name,self.conn_repr,)# Warn the user that an invalid parameter is being used which actually not related to 'profile_name'.# ToDo: Remove this check entirely as soon as drop support credentials from s3_config_fileif"profile"inextraand"s3_config_file"notinextraandnotself.profile_name:warnings.warn(f"Found 'profile' without specifying 's3_config_file' in {self.conn_repr} extra. ""If required profile from AWS Shared Credentials please "f"set 'profile_name' in {self.conn_repr} extra.",UserWarning,stacklevel=2,)config_kwargs=extra.get("config_kwargs")ifnotself.botocore_configandconfig_kwargs:# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.htmlself.log.debug("Retrieving botocore config=%s from %s extra.",config_kwargs,self.conn_repr)self.botocore_config=Config(**config_kwargs)ifconn.host:warnings.warn(f"Host {conn.host} specified in the connection is not used."" Please, set it on extra['endpoint_url'] instead",DeprecationWarning,stacklevel=2,)self.endpoint_url=extra.get("host")ifself.endpoint_url:warnings.warn("extra['host'] is deprecated and will be removed in a future release."" Please set extra['endpoint_url'] instead",DeprecationWarning,stacklevel=2,)else:self.endpoint_url=extra.get("endpoint_url")# Retrieve Assume Role Configurationassume_role_configs=self._get_assume_role_configs(**extra)self.role_arn,self.assume_role_method,self.assume_role_kwargs=assume_role_configs
[docs]defextra_dejson(self):"""Compatibility with `airflow.models.Connection.extra_dejson` property."""returnself.extra_config
@property
[docs]defsession_kwargs(self)->dict[str,Any]:"""Additional kwargs passed to boto3.session.Session."""returntrim_none_values({"aws_access_key_id":self.aws_access_key_id,"aws_secret_access_key":self.aws_secret_access_key,"aws_session_token":self.aws_session_token,"region_name":self.region_name,"profile_name":self.profile_name,
def_get_credentials(self,*,aws_access_key_id:str|None=None,aws_secret_access_key:str|None=None,aws_session_token:str|None=None,# Deprecated Valuess3_config_file:str|None=None,s3_config_format:str|None=None,profile:str|None=None,session_kwargs:dict[str,Any]|None=None,**kwargs,)->tuple[str|None,str|None,str|None]:""" Get AWS credentials from connection login/password and extra. ``aws_access_key_id`` and ``aws_secret_access_key`` order 1. From Connection login and password 2. From Connection extra['aws_access_key_id'] and extra['aws_access_key_id'] 3. (deprecated) Form Connection extra['session_kwargs'] 4. (deprecated) From local credentials file Get ``aws_session_token`` from extra['aws_access_key_id'] """session_kwargs=session_kwargsor{}session_aws_access_key_id=session_kwargs.get("aws_access_key_id")session_aws_secret_access_key=session_kwargs.get("aws_secret_access_key")session_aws_session_token=session_kwargs.get("aws_session_token")ifself.loginandself.password:self.log.info("%s credentials retrieved from login and password.",self.conn_repr)aws_access_key_id,aws_secret_access_key=self.login,self.passwordelifaws_access_key_idandaws_secret_access_key:self.log.info("%s credentials retrieved from extra.",self.conn_repr)elifsession_aws_access_key_idandsession_aws_secret_access_key:aws_access_key_id=session_aws_access_key_idaws_secret_access_key=session_aws_secret_access_keyself.log.info("%s credentials retrieved from extra['session_kwargs'].",self.conn_repr)elifs3_config_file:aws_access_key_id,aws_secret_access_key=_parse_s3_config(s3_config_file,s3_config_format,profile,)self.log.info("%s credentials retrieved from extra['s3_config_file']",self.conn_repr)ifaws_session_token:self.log.info("%s session token retrieved from extra, please note you are responsible for renewing these.",self.conn_repr,)elifsession_aws_session_token:aws_session_token=session_aws_session_tokenself.log.info("%s session token retrieved from extra['session_kwargs'], ""please note you are responsible for renewing these.",self.conn_repr,)returnaws_access_key_id,aws_secret_access_key,aws_session_tokendef_get_assume_role_configs(self,*,role_arn:str|None=None,assume_role_method:str="assume_role",assume_role_kwargs:dict[str,Any]|None=None,# Deprecated Valuesaws_account_id:str|None=None,aws_iam_role:str|None=None,external_id:str|None=None,**kwargs,)->tuple[str|None,str|None,dict[Any,str]]:"""Get assume role configs from Connection extra."""ifrole_arn:self.log.debug("Retrieving role_arn=%r from %s extra.",role_arn,self.conn_repr)elifaws_account_idandaws_iam_role:warnings.warn("Constructing 'role_arn' from extra['aws_account_id'] and extra['aws_iam_role'] is deprecated"f" and will be removed in a future releases."f" Please set 'role_arn' in {self.conn_repr} extra.",DeprecationWarning,stacklevel=3,)role_arn=f"arn:aws:iam::{aws_account_id}:role/{aws_iam_role}"self.log.debug("Constructions role_arn=%r from %s extra['aws_account_id'] and extra['aws_iam_role'].",role_arn,self.conn_repr,)ifnotrole_arn:# There is no reason obtain `assume_role_method` and `assume_role_kwargs` if `role_arn` not set.returnNone,None,{}supported_methods=["assume_role","assume_role_with_saml","assume_role_with_web_identity"]ifassume_role_methodnotinsupported_methods:raiseNotImplementedError(f"Found assume_role_method={assume_role_method!r} in {self.conn_repr} extra."f" Currently {supported_methods} are supported."' (Exclude this setting will default to "assume_role").')self.log.debug("Retrieve assume_role_method=%r from %s.",assume_role_method,self.conn_repr)assume_role_kwargs=assume_role_kwargsor{}if"ExternalId"notinassume_role_kwargsandexternal_id:warnings.warn("'external_id' in extra config is deprecated and will be removed in a future releases. "f"Please set 'ExternalId' in 'assume_role_kwargs' in {self.conn_repr} extra.",DeprecationWarning,stacklevel=3,)assume_role_kwargs["ExternalId"]=external_idreturnrole_arn,assume_role_method,assume_role_kwargs
def_parse_s3_config(config_file_name:str,config_format:str|None="boto",profile:str|None=None)->tuple[str|None,str|None]:""" Parses a config file for s3 credentials. Can currently parse boto, s3cmd.conf and AWS SDK config formats :param config_file_name: path to the config file :param config_format: config type. One of "boto", "s3cmd" or "aws". Defaults to "boto" :param profile: profile name in AWS type config file """warnings.warn("Use local credentials file is never documented and well tested. ""Obtain credentials by this way deprecated and will be removed in a future releases.",DeprecationWarning,stacklevel=4,)importconfigparserconfig=configparser.ConfigParser()try:ifconfig.read(config_file_name):# pragma: no coversections=config.sections()else:raiseAirflowException(f"Couldn't read {config_file_name}")exceptExceptionase:raiseAirflowException("Exception when parsing %s: %s",config_file_name,e.__class__.__name__)# Setting option names depending on file formatifconfig_formatisNone:config_format="boto"conf_format=config_format.lower()ifconf_format=="boto":# pragma: no coverifprofileisnotNoneand"profile "+profileinsections:cred_section="profile "+profileelse:cred_section="Credentials"elifconf_format=="aws"andprofileisnotNone:cred_section=profileelse:cred_section="default"# Option namesifconf_formatin("boto","aws"):# pragma: no coverkey_id_option="aws_access_key_id"secret_key_option="aws_secret_access_key"else:key_id_option="access_key"secret_key_option="secret_key"# Actual Parsingifcred_sectionnotinsections:raiseAirflowException("This config file format is not recognized")else:try:access_key=config.get(cred_section,key_id_option)secret_key=config.get(cred_section,secret_key_option)mask_secret(secret_key)exceptException:raiseAirflowException("Option Error in parsing s3 config file")returnaccess_key,secret_key