Source code for airflow.providers.amazon.aws.hooks.base_aws
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Base AWS Hook... seealso:: For more information on how to use this hook, take a look at the guide: :ref:`howto/connection:AWSHook`"""importconfigparserimportdatetimeimportloggingimportsysimportwarningsfromfunctoolsimportwrapsfromtypingimportAny,Callable,Dict,Optional,Tuple,Type,Unionimportboto3importbotocoreimportbotocore.sessionimportrequestsimporttenacityfrombotocore.configimportConfigfrombotocore.credentialsimportReadOnlyCredentialsfromslugifyimportslugifyifsys.version_info>=(3,8):fromfunctoolsimportcached_propertyelse:fromcached_propertyimportcached_propertyfromdateutil.tzimporttzlocalfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.models.connectionimportConnectionfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classBaseSessionFactory(LoggingMixin):""" Base AWS Session Factory class to handle boto3 session creation. It can handle most of the AWS supported authentication methods. User can also derive from this class to have full control of boto3 session creation or to support custom federation. .. seealso:: :ref:`howto/connection:aws:session-factory` """def__init__(self,conn:Connection,region_name:Optional[str],config:Config)->None:super().__init__()self.conn=connself.region_name=region_nameself.config=configself.extra_config=self.conn.extra_dejsonself.basic_session:Optional[boto3.session.Session]=Noneself.role_arn:Optional[str]=None
[docs]defcreate_session(self)->boto3.session.Session:"""Create AWS session."""session_kwargs={}if"session_kwargs"inself.extra_config:self.log.info("Retrieving session_kwargs from Connection.extra_config['session_kwargs']: %s",self.extra_config["session_kwargs"],)session_kwargs=self.extra_config["session_kwargs"]self.basic_session=self._create_basic_session(session_kwargs=session_kwargs)self.role_arn=self._read_role_arn_from_extra_config()# If role_arn was specified then STS + assume_roleifself.role_arnisNone:returnself.basic_sessionreturnself._create_session_with_assume_role(session_kwargs=session_kwargs)
def_get_region_name(self)->Optional[str]:region_name=self.region_nameifself.region_nameisNoneand'region_name'inself.extra_config:self.log.info("Retrieving region_name from Connection.extra_config['region_name']")region_name=self.extra_config["region_name"]returnregion_namedef_create_basic_session(self,session_kwargs:Dict[str,Any])->boto3.session.Session:aws_access_key_id,aws_secret_access_key=self._read_credentials_from_connection()aws_session_token=self.extra_config.get("aws_session_token")region_name=self._get_region_name()self.log.debug("Creating session with aws_access_key_id=%s region_name=%s",aws_access_key_id,region_name,)returnboto3.session.Session(aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=region_name,aws_session_token=aws_session_token,**session_kwargs,)def_create_session_with_assume_role(self,session_kwargs:Dict[str,Any])->boto3.session.Session:assume_role_method=self.extra_config.get('assume_role_method','assume_role')self.log.debug("assume_role_method=%s",assume_role_method)supported_methods=['assume_role','assume_role_with_saml','assume_role_with_web_identity']ifassume_role_methodnotinsupported_methods:raiseNotImplementedError(f'assume_role_method={assume_role_method} in Connection {self.conn.conn_id} Extra.'f'Currently {supported_methods} are supported.''(Exclude this setting will default to "assume_role").')ifassume_role_method=='assume_role_with_web_identity':# Deferred credentials have no initial credentialscredential_fetcher=self._get_web_identity_credential_fetcher()credentials=botocore.credentials.DeferredRefreshableCredentials(method='assume-role-with-web-identity',refresh_using=credential_fetcher.fetch_credentials,time_fetcher=lambda:datetime.datetime.now(tz=tzlocal()),)else:# Refreshable credentials do have initial credentialscredentials=botocore.credentials.RefreshableCredentials.create_from_metadata(metadata=self._refresh_credentials(),refresh_using=self._refresh_credentials,method="sts-assume-role",)session=botocore.session.get_session()session._credentials=credentialsifself.basic_sessionisNone:raiseRuntimeError("The basic session should be created here!")region_name=self.basic_session.region_namesession.set_config_variable("region",region_name)returnboto3.session.Session(botocore_session=session,**session_kwargs)def_refresh_credentials(self)->Dict[str,Any]:self.log.debug('Refreshing credentials')assume_role_method=self.extra_config.get('assume_role_method','assume_role')sts_session=self.basic_sessionifsts_sessionisNone:raiseRuntimeError("Session should be initialized when refresh credentials with assume_role is used!")sts_client=sts_session.client("sts",config=self.config)ifassume_role_method=='assume_role':sts_response=self._assume_role(sts_client=sts_client)elifassume_role_method=='assume_role_with_saml':sts_response=self._assume_role_with_saml(sts_client=sts_client)else:raiseNotImplementedError(f'assume_role_method={assume_role_method} not expected')sts_response_http_status=sts_response['ResponseMetadata']['HTTPStatusCode']ifnotsts_response_http_status==200:raiseRuntimeError(f'sts_response_http_status={sts_response_http_status}')credentials=sts_response['Credentials']expiry_time=credentials.get('Expiration').isoformat()self.log.debug('New credentials expiry_time: %s',expiry_time)credentials={"access_key":credentials.get("AccessKeyId"),"secret_key":credentials.get("SecretAccessKey"),"token":credentials.get("SessionToken"),"expiry_time":expiry_time,}returncredentialsdef_read_role_arn_from_extra_config(self)->Optional[str]:aws_account_id=self.extra_config.get("aws_account_id")aws_iam_role=self.extra_config.get("aws_iam_role")role_arn=self.extra_config.get("role_arn")ifrole_arnisNoneandaws_account_idisnotNoneandaws_iam_roleisnotNone:self.log.info("Constructing role_arn from aws_account_id and aws_iam_role")role_arn=f"arn:aws:iam::{aws_account_id}:role/{aws_iam_role}"self.log.debug("role_arn is %s",role_arn)returnrole_arndef_read_credentials_from_connection(self)->Tuple[Optional[str],Optional[str]]:aws_access_key_id=Noneaws_secret_access_key=Noneifself.conn.login:aws_access_key_id=self.conn.loginaws_secret_access_key=self.conn.passwordself.log.info("Credentials retrieved from login")elif"aws_access_key_id"inself.extra_configand"aws_secret_access_key"inself.extra_config:aws_access_key_id=self.extra_config["aws_access_key_id"]aws_secret_access_key=self.extra_config["aws_secret_access_key"]self.log.info("Credentials retrieved from extra_config")elif"s3_config_file"inself.extra_config:aws_access_key_id,aws_secret_access_key=_parse_s3_config(self.extra_config["s3_config_file"],self.extra_config.get("s3_config_format"),self.extra_config.get("profile"),)self.log.info("Credentials retrieved from extra_config['s3_config_file']")returnaws_access_key_id,aws_secret_access_keydef_strip_invalid_session_name_characters(self,role_session_name:str)->str:returnslugify(role_session_name,regex_pattern=r'[^\w+=,.@-]+')def_assume_role(self,sts_client:boto3.client)->Dict:assume_role_kwargs=self.extra_config.get("assume_role_kwargs",{})if"external_id"inself.extra_config:# Backwards compatibilityassume_role_kwargs["ExternalId"]=self.extra_config.get("external_id")role_session_name=self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}")self.log.debug("Doing sts_client.assume_role to role_arn=%s (role_session_name=%s)",self.role_arn,role_session_name,)returnsts_client.assume_role(RoleArn=self.role_arn,RoleSessionName=role_session_name,**assume_role_kwargs)def_assume_role_with_saml(self,sts_client:boto3.client)->Dict[str,Any]:saml_config=self.extra_config['assume_role_with_saml']principal_arn=saml_config['principal_arn']idp_auth_method=saml_config['idp_auth_method']ifidp_auth_method=='http_spegno_auth':saml_assertion=self._fetch_saml_assertion_using_http_spegno_auth(saml_config)else:raiseNotImplementedError(f'idp_auth_method={idp_auth_method} in Connection {self.conn.conn_id} Extra.''Currently only "http_spegno_auth" is supported, and must be specified.')self.log.debug("Doing sts_client.assume_role_with_saml to role_arn=%s",self.role_arn)assume_role_kwargs=self.extra_config.get("assume_role_kwargs",{})returnsts_client.assume_role_with_saml(RoleArn=self.role_arn,PrincipalArn=principal_arn,SAMLAssertion=saml_assertion,**assume_role_kwargs,)def_get_idp_response(self,saml_config:Dict[str,Any],auth:requests.auth.AuthBase)->requests.models.Response:idp_url=saml_config["idp_url"]self.log.debug("idp_url= %s",idp_url)session=requests.Session()# Configurable Retry when querying the IDP endpointif"idp_request_retry_kwargs"insaml_config:idp_request_retry_kwargs=saml_config["idp_request_retry_kwargs"]self.log.info("idp_request_retry_kwargs= %s",idp_request_retry_kwargs)fromrequests.adaptersimportHTTPAdapterfromrequests.packages.urllib3.util.retryimportRetryretry_strategy=Retry(**idp_request_retry_kwargs)adapter=HTTPAdapter(max_retries=retry_strategy)session.mount("https://",adapter)session.mount("http://",adapter)idp_request_kwargs={}if"idp_request_kwargs"insaml_config:idp_request_kwargs=saml_config["idp_request_kwargs"]idp_response=session.get(idp_url,auth=auth,**idp_request_kwargs)idp_response.raise_for_status()returnidp_responsedef_fetch_saml_assertion_using_http_spegno_auth(self,saml_config:Dict[str,Any])->str:# requests_gssapi will need paramiko > 2.6 since you'll need# 'gssapi' not 'python-gssapi' from PyPi.# https://github.com/paramiko/paramiko/pull/1311importrequests_gssapifromlxmlimportetreeauth=requests_gssapi.HTTPSPNEGOAuth()if'mutual_authentication'insaml_config:mutual_auth=saml_config['mutual_authentication']ifmutual_auth=='REQUIRED':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.REQUIRED)elifmutual_auth=='OPTIONAL':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.OPTIONAL)elifmutual_auth=='DISABLED':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.DISABLED)else:raiseNotImplementedError(f'mutual_authentication={mutual_auth} in Connection {self.conn.conn_id} Extra.''Currently "REQUIRED", "OPTIONAL" and "DISABLED" are supported.''(Exclude this setting will default to HTTPSPNEGOAuth() ).')# Query the IDPidp_response=self._get_idp_response(saml_config,auth=auth)# Assist with debugging. Note: contains sensitive info!xpath=saml_config['saml_response_xpath']log_idp_response='log_idp_response'insaml_configandsaml_config['log_idp_response']iflog_idp_response:self.log.warning('The IDP response contains sensitive information, but log_idp_response is ON (%s).',log_idp_response,)self.log.debug('idp_response.content= %s',idp_response.content)self.log.debug('xpath= %s',xpath)# Extract SAML Assertion from the returned HTML / XMLxml=etree.fromstring(idp_response.content)saml_assertion=xml.xpath(xpath)ifisinstance(saml_assertion,list):iflen(saml_assertion)==1:saml_assertion=saml_assertion[0]ifnotsaml_assertion:raiseValueError('Invalid SAML Assertion')returnsaml_assertiondef_get_web_identity_credential_fetcher(self,)->botocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher:ifself.basic_sessionisNone:raiseException("Session should be set where identity is fetched!")base_session=self.basic_session._sessionorbotocore.session.get_session()client_creator=base_session.create_clientfederation=self.extra_config.get('assume_role_with_web_identity_federation')iffederation=='google':web_identity_token_loader=self._get_google_identity_token_loader()else:raiseAirflowException(f'Unsupported federation: {federation}. Currently "google" only are supported.')assume_role_kwargs=self.extra_config.get("assume_role_kwargs",{})returnbotocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher(client_creator=client_creator,web_identity_token_loader=web_identity_token_loader,role_arn=self.role_arn,extra_args=assume_role_kwargs,)def_get_google_identity_token_loader(self):fromgoogle.auth.transportimportrequestsasrequests_transportfromairflow.providers.google.common.utils.id_token_credentialsimport(get_default_id_token_credentials,)audience=self.extra_config.get('assume_role_with_web_identity_federation_audience')google_id_token_credentials=get_default_id_token_credentials(target_audience=audience)defweb_identity_token_loader():ifnotgoogle_id_token_credentials.valid:request_adapter=requests_transport.Request()google_id_token_credentials.refresh(request=request_adapter)returngoogle_id_token_credentials.tokenreturnweb_identity_token_loader
[docs]classAwsBaseHook(BaseHook):""" Interact with AWS. This class is a thin wrapper around the boto3 python library. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param verify: Whether or not to verify SSL certificates. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param client_type: boto3.client client_type. Eg 's3', 'emr' etc :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc :param config: Configuration for botocore client. (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html) """
def__init__(self,aws_conn_id:Optional[str]=default_conn_name,verify:Union[bool,str,None]=None,region_name:Optional[str]=None,client_type:Optional[str]=None,resource_type:Optional[str]=None,config:Optional[Config]=None,)->None:super().__init__()self.aws_conn_id=aws_conn_idself.verify=verifyself.client_type=client_typeself.resource_type=resource_typeself.region_name=region_nameself.config=configifnot(self.client_typeorself.resource_type):raiseAirflowException('Either client_type or resource_type must be provided.')def_get_credentials(self,region_name:Optional[str])->Tuple[boto3.session.Session,Optional[str]]:ifnotself.aws_conn_id:session=boto3.session.Session(region_name=region_name)returnsession,Noneself.log.debug("Airflow Connection: aws_conn_id=%s",self.aws_conn_id)try:# Fetch the Airflow connection objectconnection_object=self.get_connection(self.aws_conn_id)extra_config=connection_object.extra_dejsonendpoint_url=extra_config.get("host")# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html#botocore.config.Configif"config_kwargs"inextra_config:self.log.debug("Retrieving config_kwargs from Connection.extra_config['config_kwargs']: %s",extra_config["config_kwargs"],)self.config=Config(**extra_config["config_kwargs"])session=SessionFactory(conn=connection_object,region_name=region_name,config=self.config).create_session()returnsession,endpoint_urlexceptAirflowException:self.log.warning("Unable to use Airflow Connection for credentials.")self.log.debug("Fallback on boto3 credential strategy")# http://boto3.readthedocs.io/en/latest/guide/configuration.htmlself.log.debug("Creating session using boto3 credential strategy region_name=%s",region_name,)session=boto3.session.Session(region_name=region_name)returnsession,None
[docs]defget_client_type(self,client_type:Optional[str]=None,region_name:Optional[str]=None,config:Optional[Config]=None,)->boto3.client:"""Get the underlying boto3 client using boto3 session"""session,endpoint_url=self._get_credentials(region_name=region_name)ifclient_type:warnings.warn("client_type is deprecated. Set client_type from class attribute.",DeprecationWarning,stacklevel=2,)else:client_type=self.client_type# No AWS Operators use the config argument to this method.# Keep backward compatibility with other users who might use itifconfigisNone:config=self.configreturnsession.client(client_type,endpoint_url=endpoint_url,config=config,verify=self.verify)
[docs]defget_resource_type(self,resource_type:Optional[str]=None,region_name:Optional[str]=None,config:Optional[Config]=None,)->boto3.resource:"""Get the underlying boto3 resource using boto3 session"""session,endpoint_url=self._get_credentials(region_name=region_name)ifresource_type:warnings.warn("resource_type is deprecated. Set resource_type from class attribute.",DeprecationWarning,stacklevel=2,)else:resource_type=self.resource_type# No AWS Operators use the config argument to this method.# Keep backward compatibility with other users who might use itifconfigisNone:config=self.configreturnsession.resource(resource_type,endpoint_url=endpoint_url,config=config,verify=self.verify)
@cached_property
[docs]defconn(self)->Union[boto3.client,boto3.resource]:""" Get the underlying boto3 client/resource (cached) :return: boto3.client or boto3.resource :rtype: Union[boto3.client, boto3.resource] """ifself.client_type:returnself.get_client_type(region_name=self.region_name)elifself.resource_type:returnself.get_resource_type(region_name=self.region_name)else:# Rare possibility - subclasses have not specified a client_type or resource_typeraiseNotImplementedError('Could not get boto3 connection!')
[docs]defget_conn(self)->Union[boto3.client,boto3.resource]:""" Get the underlying boto3 client/resource (cached) Implemented so that caching works as intended. It exists for compatibility with subclasses that rely on a super().get_conn() method. :return: boto3.client or boto3.resource :rtype: Union[boto3.client, boto3.resource] """# Compat shimreturnself.conn
[docs]defget_session(self,region_name:Optional[str]=None)->boto3.session.Session:"""Get the underlying boto3.session."""session,_=self._get_credentials(region_name=region_name)returnsession
[docs]defget_credentials(self,region_name:Optional[str]=None)->ReadOnlyCredentials:""" Get the underlying `botocore.Credentials` object. This contains the following authentication attributes: access_key, secret_key and token. """session,_=self._get_credentials(region_name=region_name)# Credentials are refreshable, so accessing your access key and# secret key separately can lead to a race condition.# See https://stackoverflow.com/a/36291428/8283373returnsession.get_credentials().get_frozen_credentials()
[docs]defexpand_role(self,role:str,region_name:Optional[str]=None)->str:""" If the IAM role is a role name, get the Amazon Resource Name (ARN) for the role. If IAM role is already an IAM role ARN, no change is made. :param role: IAM role name or ARN :param region_name: Optional region name to get credentials for :return: IAM role ARN """if"/"inrole:returnroleelse:session,endpoint_url=self._get_credentials(region_name=region_name)_client=session.client('iam',endpoint_url=endpoint_url,config=self.config,verify=self.verify)return_client.get_role(RoleName=role)["Role"]["Arn"]
@staticmethod
[docs]defretry(should_retry:Callable[[Exception],bool]):""" A decorator that provides a mechanism to repeat requests in response to exceeding a temporary quote limit. """defretry_decorator(fun:Callable):@wraps(fun)defdecorator_f(self,*args,**kwargs):retry_args=getattr(self,'retry_args',None)ifretry_argsisNone:returnfun(self,*args,**kwargs)multiplier=retry_args.get('multiplier',1)min_limit=retry_args.get('min',1)max_limit=retry_args.get('max',1)stop_after_delay=retry_args.get('stop_after_delay',10)tenacity_before_logger=tenacity.before_log(self.log,logging.INFO)ifself.logelseNonetenacity_after_logger=tenacity.after_log(self.log,logging.INFO)ifself.logelseNonedefault_kwargs={'wait':tenacity.wait_exponential(multiplier=multiplier,max=max_limit,min=min_limit),'retry':tenacity.retry_if_exception(should_retry),'stop':tenacity.stop_after_delay(stop_after_delay),'before':tenacity_before_logger,'after':tenacity_after_logger,}returntenacity.retry(**default_kwargs)(fun)(self,*args,**kwargs)returndecorator_freturnretry_decorator
def_parse_s3_config(config_file_name:str,config_format:Optional[str]="boto",profile:Optional[str]=None)->Tuple[Optional[str],Optional[str]]:""" Parses a config file for s3 credentials. Can currently parse boto, s3cmd.conf and AWS SDK config formats :param config_file_name: path to the config file :param config_format: config type. One of "boto", "s3cmd" or "aws". Defaults to "boto" :param profile: profile name in AWS type config file """config=configparser.ConfigParser()ifconfig.read(config_file_name):# pragma: no coversections=config.sections()else:raiseAirflowException(f"Couldn't read {config_file_name}")# Setting option names depending on file formatifconfig_formatisNone:config_format="boto"conf_format=config_format.lower()ifconf_format=="boto":# pragma: no coverifprofileisnotNoneand"profile "+profileinsections:cred_section="profile "+profileelse:cred_section="Credentials"elifconf_format=="aws"andprofileisnotNone:cred_section=profileelse:cred_section="default"# Option namesifconf_formatin("boto","aws"):# pragma: no coverkey_id_option="aws_access_key_id"secret_key_option="aws_secret_access_key"# security_token_option = 'aws_security_token'else:key_id_option="access_key"secret_key_option="secret_key"# Actual Parsingifcred_sectionnotinsections:raiseAirflowException("This config file format is not recognized")else:try:access_key=config.get(cred_section,key_id_option)secret_key=config.get(cred_section,secret_key_option)exceptException:logging.warning("Option Error in parsing s3 config file")raisereturnaccess_key,secret_key
[docs]defresolve_session_factory()->Type[BaseSessionFactory]:"""Resolves custom SessionFactory class"""clazz=conf.getimport("aws","session_factory",fallback=None)ifnotclazz:returnBaseSessionFactoryifnotissubclass(clazz,BaseSessionFactory):raiseTypeError(f"Your custom AWS SessionFactory class `{clazz.__name__}` is not a subclass "f"of `{BaseSessionFactory.__name__}`.")returnclazz