Source code for airflow.providers.amazon.aws.hooks.base_aws
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Base AWS Hook... seealso:: For more information on how to use this hook, take a look at the guide: :ref:`howto/connection:AWSHook`"""importconfigparserimportdatetimeimportloggingfromfunctoolsimportwrapsfromtypingimportAny,Callable,Dict,Optional,Tuple,Unionimportboto3importbotocoreimportbotocore.sessionimportrequestsimporttenacityfrombotocore.configimportConfigfrombotocore.credentialsimportReadOnlyCredentialsfromslugifyimportslugifytry:fromfunctoolsimportcached_propertyexceptImportError:fromcached_propertyimportcached_propertyfromdateutil.tzimporttzlocalfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.models.connectionimportConnectionfromairflow.utils.log.logging_mixinimportLoggingMixinclass_SessionFactory(LoggingMixin):def__init__(self,conn:Connection,region_name:Optional[str],config:Config)->None:super().__init__()self.conn=connself.region_name=region_nameself.config=configself.extra_config=self.conn.extra_dejsonself.basic_session=Noneself.role_arn=Nonedefcreate_session(self)->boto3.session.Session:"""Create AWS session."""session_kwargs={}if"session_kwargs"inself.extra_config:self.log.info("Retrieving session_kwargs from Connection.extra_config['session_kwargs']: %s",self.extra_config["session_kwargs"],)session_kwargs=self.extra_config["session_kwargs"]self.basic_session=self._create_basic_session(session_kwargs=session_kwargs)self.role_arn=self._read_role_arn_from_extra_config()# If role_arn was specified then STS + assume_roleifself.role_arnisNone:returnself.basic_sessionreturnself._create_session_with_assume_role(session_kwargs=session_kwargs)def_create_basic_session(self,session_kwargs:Dict[str,Any])->boto3.session.Session:aws_access_key_id,aws_secret_access_key=self._read_credentials_from_connection()aws_session_token=self.extra_config.get("aws_session_token")region_name=self.region_nameifself.region_nameisNoneand'region_name'inself.extra_config:self.log.info("Retrieving region_name from Connection.extra_config['region_name']")region_name=self.extra_config["region_name"]self.log.info("Creating session with aws_access_key_id=%s region_name=%s",aws_access_key_id,region_name,)returnboto3.session.Session(aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=region_name,aws_session_token=aws_session_token,**session_kwargs,)def_create_session_with_assume_role(self,session_kwargs:Dict[str,Any])->boto3.session.Session:assume_role_method=self.extra_config.get('assume_role_method','assume_role')self.log.info("assume_role_method=%s",assume_role_method)supported_methods=['assume_role','assume_role_with_saml','assume_role_with_web_identity']ifassume_role_methodnotinsupported_methods:raiseNotImplementedError(f'assume_role_method={assume_role_method} in Connection {self.conn.conn_id} Extra.'f'Currently {supported_methods} are supported.''(Exclude this setting will default to "assume_role").')ifassume_role_method=='assume_role_with_web_identity':# Deferred credentials have no initial credentialscredential_fetcher=self._get_web_identity_credential_fetcher()credentials=botocore.credentials.DeferredRefreshableCredentials(method='assume-role-with-web-identity',refresh_using=credential_fetcher.fetch_credentials,time_fetcher=lambda:datetime.datetime.now(tz=tzlocal()),)else:# Refreshable credentials do have initial credentialscredentials=botocore.credentials.RefreshableCredentials.create_from_metadata(metadata=self._refresh_credentials(),refresh_using=self._refresh_credentials,method="sts-assume-role",)session=botocore.session.get_session()session._credentials=credentials# pylint: disable=protected-accessregion_name=self.basic_session.region_namesession.set_config_variable("region",region_name)returnboto3.session.Session(botocore_session=session,**session_kwargs)def_refresh_credentials(self)->Dict[str,Any]:self.log.info('Refreshing credentials')assume_role_method=self.extra_config.get('assume_role_method','assume_role')sts_session=self.basic_sessionifassume_role_method=='assume_role':sts_client=sts_session.client("sts",config=self.config)sts_response=self._assume_role(sts_client=sts_client)elifassume_role_method=='assume_role_with_saml':sts_client=sts_session.client("sts",config=self.config)sts_response=self._assume_role_with_saml(sts_client=sts_client)else:raiseNotImplementedError(f'assume_role_method={assume_role_method} not expected')sts_response_http_status=sts_response['ResponseMetadata']['HTTPStatusCode']ifnotsts_response_http_status==200:raiseException(f'sts_response_http_status={sts_response_http_status}')credentials=sts_response['Credentials']expiry_time=credentials.get('Expiration').isoformat()self.log.info(f'New credentials expiry_time:{expiry_time}')credentials={"access_key":credentials.get("AccessKeyId"),"secret_key":credentials.get("SecretAccessKey"),"token":credentials.get("SessionToken"),"expiry_time":expiry_time,}returncredentialsdef_read_role_arn_from_extra_config(self)->Optional[str]:aws_account_id=self.extra_config.get("aws_account_id")aws_iam_role=self.extra_config.get("aws_iam_role")role_arn=self.extra_config.get("role_arn")ifrole_arnisNoneandaws_account_idisnotNoneandaws_iam_roleisnotNone:self.log.info("Constructing role_arn from aws_account_id and aws_iam_role")role_arn=f"arn:aws:iam::{aws_account_id}:role/{aws_iam_role}"self.log.info("role_arn is %s",role_arn)returnrole_arndef_read_credentials_from_connection(self)->Tuple[Optional[str],Optional[str]]:aws_access_key_id=Noneaws_secret_access_key=Noneifself.conn.login:aws_access_key_id=self.conn.loginaws_secret_access_key=self.conn.passwordself.log.info("Credentials retrieved from login")elif"aws_access_key_id"inself.extra_configand"aws_secret_access_key"inself.extra_config:aws_access_key_id=self.extra_config["aws_access_key_id"]aws_secret_access_key=self.extra_config["aws_secret_access_key"]self.log.info("Credentials retrieved from extra_config")elif"s3_config_file"inself.extra_config:aws_access_key_id,aws_secret_access_key=_parse_s3_config(self.extra_config["s3_config_file"],self.extra_config.get("s3_config_format"),self.extra_config.get("profile"),)self.log.info("Credentials retrieved from extra_config['s3_config_file']")else:self.log.info("No credentials retrieved from Connection")returnaws_access_key_id,aws_secret_access_keydef_strip_invalid_session_name_characters(self,role_session_name:str)->str:returnslugify(role_session_name,regex_pattern=r'[^\w+=,.@-]+')def_assume_role(self,sts_client:boto3.client)->Dict:assume_role_kwargs=self.extra_config.get("assume_role_kwargs",{})if"external_id"inself.extra_config:# Backwards compatibilityassume_role_kwargs["ExternalId"]=self.extra_config.get("external_id")role_session_name=self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}")self.log.info("Doing sts_client.assume_role to role_arn=%s (role_session_name=%s)",self.role_arn,role_session_name,)returnsts_client.assume_role(RoleArn=self.role_arn,RoleSessionName=role_session_name,**assume_role_kwargs)def_assume_role_with_saml(self,sts_client:boto3.client)->Dict[str,Any]:saml_config=self.extra_config['assume_role_with_saml']principal_arn=saml_config['principal_arn']idp_auth_method=saml_config['idp_auth_method']ifidp_auth_method=='http_spegno_auth':saml_assertion=self._fetch_saml_assertion_using_http_spegno_auth(saml_config)else:raiseNotImplementedError(f'idp_auth_method={idp_auth_method} in Connection {self.conn.conn_id} Extra.''Currently only "http_spegno_auth" is supported, and must be specified.')self.log.info("Doing sts_client.assume_role_with_saml to role_arn=%s",self.role_arn)assume_role_kwargs=self.extra_config.get("assume_role_kwargs",{})returnsts_client.assume_role_with_saml(RoleArn=self.role_arn,PrincipalArn=principal_arn,SAMLAssertion=saml_assertion,**assume_role_kwargs,)def_get_idp_response(self,saml_config:Dict[str,Any],auth:requests.auth.AuthBase)->requests.models.Response:idp_url=saml_config["idp_url"]self.log.info("idp_url= %s",idp_url)session=requests.Session()# Configurable Retry when querying the IDP endpointif"idp_request_retry_kwargs"insaml_config:idp_request_retry_kwargs=saml_config["idp_request_retry_kwargs"]self.log.info("idp_request_retry_kwargs= %s",idp_request_retry_kwargs)fromrequests.adaptersimportHTTPAdapterfromrequests.packages.urllib3.util.retryimportRetryretry_strategy=Retry(**idp_request_retry_kwargs)adapter=HTTPAdapter(max_retries=retry_strategy)session.mount("https://",adapter)session.mount("http://",adapter)idp_request_kwargs={}if"idp_request_kwargs"insaml_config:idp_request_kwargs=saml_config["idp_request_kwargs"]idp_response=session.get(idp_url,auth=auth,**idp_request_kwargs)idp_response.raise_for_status()returnidp_responsedef_fetch_saml_assertion_using_http_spegno_auth(self,saml_config:Dict[str,Any])->str:# requests_gssapi will need paramiko > 2.6 since you'll need# 'gssapi' not 'python-gssapi' from PyPi.# https://github.com/paramiko/paramiko/pull/1311importrequests_gssapifromlxmlimportetreeauth=requests_gssapi.HTTPSPNEGOAuth()if'mutual_authentication'insaml_config:mutual_auth=saml_config['mutual_authentication']ifmutual_auth=='REQUIRED':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.REQUIRED)elifmutual_auth=='OPTIONAL':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.OPTIONAL)elifmutual_auth=='DISABLED':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.DISABLED)else:raiseNotImplementedError(f'mutual_authentication={mutual_auth} in Connection {self.conn.conn_id} Extra.''Currently "REQUIRED", "OPTIONAL" and "DISABLED" are supported.''(Exclude this setting will default to HTTPSPNEGOAuth() ).')# Query the IDPidp_response=self._get_idp_response(saml_config,auth=auth)# Assist with debugging. Note: contains sensitive info!xpath=saml_config['saml_response_xpath']log_idp_response='log_idp_response'insaml_configandsaml_config['log_idp_response']iflog_idp_response:self.log.warning('The IDP response contains sensitive information, but log_idp_response is ON (%s).',log_idp_response,)self.log.info('idp_response.content= %s',idp_response.content)self.log.info('xpath= %s',xpath)# Extract SAML Assertion from the returned HTML / XMLxml=etree.fromstring(idp_response.content)saml_assertion=xml.xpath(xpath)ifisinstance(saml_assertion,list):iflen(saml_assertion)==1:saml_assertion=saml_assertion[0]ifnotsaml_assertion:raiseValueError('Invalid SAML Assertion')returnsaml_assertiondef_get_web_identity_credential_fetcher(self,)->botocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher:base_session=self.basic_session._sessionorbotocore.session.get_session()client_creator=base_session.create_clientfederation=self.extra_config.get('assume_role_with_web_identity_federation')iffederation=='google':web_identity_token_loader=self._get_google_identity_token_loader()else:raiseAirflowException(f'Unsupported federation: {federation}. Currently "google" only are supported.')assume_role_kwargs=self.extra_config.get("assume_role_kwargs",{})returnbotocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher(client_creator=client_creator,web_identity_token_loader=web_identity_token_loader,role_arn=self.role_arn,extra_args=assume_role_kwargs,)def_get_google_identity_token_loader(self):fromgoogle.auth.transportimportrequestsasrequests_transportfromairflow.providers.google.common.utils.id_token_credentialsimport(get_default_id_token_credentials,)audience=self.extra_config.get('assume_role_with_web_identity_federation_audience')google_id_token_credentials=get_default_id_token_credentials(target_audience=audience)defweb_identity_token_loader():ifnotgoogle_id_token_credentials.valid:request_adapter=requests_transport.Request()google_id_token_credentials.refresh(request=request_adapter)returngoogle_id_token_credentials.tokenreturnweb_identity_token_loader
[docs]classAwsBaseHook(BaseHook):""" Interact with AWS. This class is a thin wrapper around the boto3 python library. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :type aws_conn_id: str :param verify: Whether or not to verify SSL certificates. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :type verify: Union[bool, str, None] :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :type region_name: Optional[str] :param client_type: boto3.client client_type. Eg 's3', 'emr' etc :type client_type: Optional[str] :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc :type resource_type: Optional[str] :param config: Configuration for botocore client. (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html) :type config: Optional[botocore.client.Config] """
def__init__(self,aws_conn_id:Optional[str]=default_conn_name,verify:Union[bool,str,None]=None,region_name:Optional[str]=None,client_type:Optional[str]=None,resource_type:Optional[str]=None,config:Optional[Config]=None,)->None:super().__init__()self.aws_conn_id=aws_conn_idself.verify=verifyself.client_type=client_typeself.resource_type=resource_typeself.region_name=region_nameself.config=configifnot(self.client_typeorself.resource_type):raiseAirflowException('Either client_type or resource_type must be provided.')def_get_credentials(self,region_name:Optional[str])->Tuple[boto3.session.Session,Optional[str]]:ifnotself.aws_conn_id:session=boto3.session.Session(region_name=region_name)returnsession,Noneself.log.info("Airflow Connection: aws_conn_id=%s",self.aws_conn_id)try:# Fetch the Airflow connection objectconnection_object=self.get_connection(self.aws_conn_id)extra_config=connection_object.extra_dejsonendpoint_url=extra_config.get("host")# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html#botocore.config.Configif"config_kwargs"inextra_config:self.log.info("Retrieving config_kwargs from Connection.extra_config['config_kwargs']: %s",extra_config["config_kwargs"],)self.config=Config(**extra_config["config_kwargs"])session=_SessionFactory(conn=connection_object,region_name=region_name,config=self.config).create_session()returnsession,endpoint_urlexceptAirflowException:self.log.warning("Unable to use Airflow Connection for credentials.")self.log.info("Fallback on boto3 credential strategy")# http://boto3.readthedocs.io/en/latest/guide/configuration.htmlself.log.info("Creating session using boto3 credential strategy region_name=%s",region_name,)session=boto3.session.Session(region_name=region_name)returnsession,None
[docs]defget_client_type(self,client_type:str,region_name:Optional[str]=None,config:Optional[Config]=None,)->boto3.client:"""Get the underlying boto3 client using boto3 session"""session,endpoint_url=self._get_credentials(region_name)# No AWS Operators use the config argument to this method.# Keep backward compatibility with other users who might use itifconfigisNone:config=self.configreturnsession.client(client_type,endpoint_url=endpoint_url,config=config,verify=self.verify)
[docs]defget_resource_type(self,resource_type:str,region_name:Optional[str]=None,config:Optional[Config]=None,)->boto3.resource:"""Get the underlying boto3 resource using boto3 session"""session,endpoint_url=self._get_credentials(region_name)# No AWS Operators use the config argument to this method.# Keep backward compatibility with other users who might use itifconfigisNone:config=self.configreturnsession.resource(resource_type,endpoint_url=endpoint_url,config=config,verify=self.verify)
@cached_property
[docs]defconn(self)->Union[boto3.client,boto3.resource]:""" Get the underlying boto3 client/resource (cached) :return: boto3.client or boto3.resource :rtype: Union[boto3.client, boto3.resource] """ifself.client_type:returnself.get_client_type(self.client_type,region_name=self.region_name)elifself.resource_type:returnself.get_resource_type(self.resource_type,region_name=self.region_name)else:# Rare possibility - subclasses have not specified a client_type or resource_typeraiseNotImplementedError('Could not get boto3 connection!')
[docs]defget_conn(self)->Union[boto3.client,boto3.resource]:""" Get the underlying boto3 client/resource (cached) Implemented so that caching works as intended. It exists for compatibility with subclasses that rely on a super().get_conn() method. :return: boto3.client or boto3.resource :rtype: Union[boto3.client, boto3.resource] """# Compat shimreturnself.conn
[docs]defget_session(self,region_name:Optional[str]=None)->boto3.session.Session:"""Get the underlying boto3.session."""session,_=self._get_credentials(region_name)returnsession
[docs]defget_credentials(self,region_name:Optional[str]=None)->ReadOnlyCredentials:""" Get the underlying `botocore.Credentials` object. This contains the following authentication attributes: access_key, secret_key and token. """session,_=self._get_credentials(region_name)# Credentials are refreshable, so accessing your access key and# secret key separately can lead to a race condition.# See https://stackoverflow.com/a/36291428/8283373returnsession.get_credentials().get_frozen_credentials()
[docs]defexpand_role(self,role:str)->str:""" If the IAM role is a role name, get the Amazon Resource Name (ARN) for the role. If IAM role is already an IAM role ARN, no change is made. :param role: IAM role name or ARN :return: IAM role ARN """if"/"inrole:returnroleelse:returnself.get_client_type("iam").get_role(RoleName=role)["Role"]["Arn"]
@staticmethod
[docs]defretry(should_retry:Callable[[Exception],bool]):""" A decorator that provides a mechanism to repeat requests in response to exceeding a temporary quote limit. """defretry_decorator(fun:Callable):@wraps(fun)defdecorator_f(self,*args,**kwargs):retry_args=getattr(self,'retry_args',None)ifretry_argsisNone:returnfun(self,*args,**kwargs)multiplier=retry_args.get('multiplier',1)min_limit=retry_args.get('min',1)max_limit=retry_args.get('max',1)stop_after_delay=retry_args.get('stop_after_delay',10)tenacity_logger=tenacity.before_log(self.log,logging.DEBUG)ifself.logelseNonedefault_kwargs={'wait':tenacity.wait_exponential(multiplier=multiplier,max=max_limit,min=min_limit),'retry':tenacity.retry_if_exception(should_retry),'stop':tenacity.stop_after_delay(stop_after_delay),'before':tenacity_logger,'after':tenacity_logger,}returntenacity.retry(**default_kwargs)(fun)(self,*args,**kwargs)returndecorator_freturnretry_decorator
def_parse_s3_config(config_file_name:str,config_format:Optional[str]="boto",profile:Optional[str]=None)->Tuple[Optional[str],Optional[str]]:""" Parses a config file for s3 credentials. Can currently parse boto, s3cmd.conf and AWS SDK config formats :param config_file_name: path to the config file :type config_file_name: str :param config_format: config type. One of "boto", "s3cmd" or "aws". Defaults to "boto" :type config_format: str :param profile: profile name in AWS type config file :type profile: str """config=configparser.ConfigParser()ifconfig.read(config_file_name):# pragma: no coversections=config.sections()else:raiseAirflowException(f"Couldn't read {config_file_name}")# Setting option names depending on file formatifconfig_formatisNone:config_format="boto"conf_format=config_format.lower()ifconf_format=="boto":# pragma: no coverifprofileisnotNoneand"profile "+profileinsections:cred_section="profile "+profileelse:cred_section="Credentials"elifconf_format=="aws"andprofileisnotNone:cred_section=profileelse:cred_section="default"# Option namesifconf_formatin("boto","aws"):# pragma: no coverkey_id_option="aws_access_key_id"secret_key_option="aws_secret_access_key"# security_token_option = 'aws_security_token'else:key_id_option="access_key"secret_key_option="secret_key"# Actual Parsingifcred_sectionnotinsections:raiseAirflowException("This config file format is not recognized")else:try:access_key=config.get(cred_section,key_id_option)secret_key=config.get(cred_section,secret_key_option)exceptException:logging.warning("Option Error in parsing s3 config file")raisereturnaccess_key,secret_key