Source code for airflow.providers.amazon.aws.hooks.base_aws
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Base AWS Hook... seealso:: For more information on how to use this hook, take a look at the guide: :ref:`howto/connection:AWSHook`"""importdatetimeimportjsonimportloggingimportwarningsfromfunctoolsimportwrapsfromtypingimportAny,Callable,Dict,Generic,Optional,Tuple,Type,TypeVar,Unionimportboto3importbotocoreimportbotocore.sessionimportrequestsimporttenacityfrombotocore.clientimportClientMetafrombotocore.configimportConfigfrombotocore.credentialsimportReadOnlyCredentialsfromdateutil.tzimporttzlocalfromslugifyimportslugifyfromairflow.compat.functoolsimportcached_propertyfromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowNotFoundExceptionfromairflow.hooks.baseimportBaseHookfromairflow.models.connectionimportConnectionfromairflow.providers.amazon.aws.utils.connection_wrapperimportAwsConnectionWrapperfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secret
[docs]classBaseSessionFactory(LoggingMixin):""" Base AWS Session Factory class to handle boto3 session creation. It can handle most of the AWS supported authentication methods. User can also derive from this class to have full control of boto3 session creation or to support custom federation. .. seealso:: :ref:`howto/connection:aws:session-factory` """def__init__(self,conn:Optional[Union[Connection,AwsConnectionWrapper]],region_name:Optional[str]=None,config:Optional[Config]=None,)->None:super().__init__()self._conn=connself._region_name=region_nameself._config=config@cached_property
[docs]defbasic_session(self)->boto3.session.Session:"""Cached property with basic boto3.session.Session."""returnself._create_basic_session(session_kwargs=self.conn.session_kwargs)
[docs]defregion_name(self)->Optional[str]:"""AWS Region Name read-only property."""returnself.conn.region_name
@property
[docs]defconfig(self)->Optional[Config]:"""Configuration for botocore client read-only property."""returnself.conn.botocore_config
@property
[docs]defrole_arn(self)->Optional[str]:"""Assume Role ARN from AWS Connection"""returnself.conn.role_arn
[docs]defcreate_session(self)->boto3.session.Session:"""Create boto3 Session from connection config."""ifnotself.conn:self.log.info("No connection ID provided. Fallback on boto3 credential strategy (region_name=%r). ""See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html",self.region_name,)returnboto3.session.Session(region_name=self.region_name)elifnotself.role_arn:returnself.basic_sessionreturnself._create_session_with_assume_role(session_kwargs=self.conn.session_kwargs)
def_create_basic_session(self,session_kwargs:Dict[str,Any])->boto3.session.Session:returnboto3.session.Session(**session_kwargs)def_create_session_with_assume_role(self,session_kwargs:Dict[str,Any])->boto3.session.Session:ifself.conn.assume_role_method=='assume_role_with_web_identity':# Deferred credentials have no initial credentialscredential_fetcher=self._get_web_identity_credential_fetcher()credentials=botocore.credentials.DeferredRefreshableCredentials(method='assume-role-with-web-identity',refresh_using=credential_fetcher.fetch_credentials,time_fetcher=lambda:datetime.datetime.now(tz=tzlocal()),)else:# Refreshable credentials do have initial credentialscredentials=botocore.credentials.RefreshableCredentials.create_from_metadata(metadata=self._refresh_credentials(),refresh_using=self._refresh_credentials,method="sts-assume-role",)session=botocore.session.get_session()session._credentials=credentialsregion_name=self.basic_session.region_namesession.set_config_variable("region",region_name)returnboto3.session.Session(botocore_session=session,**session_kwargs)def_refresh_credentials(self)->Dict[str,Any]:self.log.debug('Refreshing credentials')assume_role_method=self.conn.assume_role_methodifassume_role_methodnotin('assume_role','assume_role_with_saml'):raiseNotImplementedError(f'assume_role_method={assume_role_method} not expected')sts_client=self.basic_session.client("sts",config=self.config)ifassume_role_method=='assume_role':sts_response=self._assume_role(sts_client=sts_client)else:sts_response=self._assume_role_with_saml(sts_client=sts_client)sts_response_http_status=sts_response['ResponseMetadata']['HTTPStatusCode']ifsts_response_http_status!=200:raiseRuntimeError(f'sts_response_http_status={sts_response_http_status}')credentials=sts_response['Credentials']expiry_time=credentials.get('Expiration').isoformat()self.log.debug('New credentials expiry_time: %s',expiry_time)credentials={"access_key":credentials.get("AccessKeyId"),"secret_key":credentials.get("SecretAccessKey"),"token":credentials.get("SessionToken"),"expiry_time":expiry_time,}returncredentialsdef_assume_role(self,sts_client:boto3.client)->Dict:kw={"RoleSessionName":self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}"),**self.conn.assume_role_kwargs,"RoleArn":self.role_arn,}returnsts_client.assume_role(**kw)def_assume_role_with_saml(self,sts_client:boto3.client)->Dict[str,Any]:saml_config=self.extra_config['assume_role_with_saml']principal_arn=saml_config['principal_arn']idp_auth_method=saml_config['idp_auth_method']ifidp_auth_method=='http_spegno_auth':saml_assertion=self._fetch_saml_assertion_using_http_spegno_auth(saml_config)else:raiseNotImplementedError(f'idp_auth_method={idp_auth_method} in Connection {self.conn.conn_id} Extra.''Currently only "http_spegno_auth" is supported, and must be specified.')self.log.debug("Doing sts_client.assume_role_with_saml to role_arn=%s",self.role_arn)returnsts_client.assume_role_with_saml(RoleArn=self.role_arn,PrincipalArn=principal_arn,SAMLAssertion=saml_assertion,**self.conn.assume_role_kwargs,)def_get_idp_response(self,saml_config:Dict[str,Any],auth:requests.auth.AuthBase)->requests.models.Response:idp_url=saml_config["idp_url"]self.log.debug("idp_url= %s",idp_url)session=requests.Session()# Configurable Retry when querying the IDP endpointif"idp_request_retry_kwargs"insaml_config:idp_request_retry_kwargs=saml_config["idp_request_retry_kwargs"]self.log.info("idp_request_retry_kwargs= %s",idp_request_retry_kwargs)fromrequests.adaptersimportHTTPAdapterfromrequests.packages.urllib3.util.retryimportRetryretry_strategy=Retry(**idp_request_retry_kwargs)adapter=HTTPAdapter(max_retries=retry_strategy)session.mount("https://",adapter)session.mount("http://",adapter)idp_request_kwargs={}if"idp_request_kwargs"insaml_config:idp_request_kwargs=saml_config["idp_request_kwargs"]idp_response=session.get(idp_url,auth=auth,**idp_request_kwargs)idp_response.raise_for_status()returnidp_responsedef_fetch_saml_assertion_using_http_spegno_auth(self,saml_config:Dict[str,Any])->str:# requests_gssapi will need paramiko > 2.6 since you'll need# 'gssapi' not 'python-gssapi' from PyPi.# https://github.com/paramiko/paramiko/pull/1311importrequests_gssapifromlxmlimportetreeauth=requests_gssapi.HTTPSPNEGOAuth()if'mutual_authentication'insaml_config:mutual_auth=saml_config['mutual_authentication']ifmutual_auth=='REQUIRED':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.REQUIRED)elifmutual_auth=='OPTIONAL':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.OPTIONAL)elifmutual_auth=='DISABLED':auth=requests_gssapi.HTTPSPNEGOAuth(requests_gssapi.DISABLED)else:raiseNotImplementedError(f'mutual_authentication={mutual_auth} in Connection {self.conn.conn_id} Extra.''Currently "REQUIRED", "OPTIONAL" and "DISABLED" are supported.''(Exclude this setting will default to HTTPSPNEGOAuth() ).')# Query the IDPidp_response=self._get_idp_response(saml_config,auth=auth)# Assist with debugging. Note: contains sensitive info!xpath=saml_config['saml_response_xpath']log_idp_response='log_idp_response'insaml_configandsaml_config['log_idp_response']iflog_idp_response:self.log.warning('The IDP response contains sensitive information, but log_idp_response is ON (%s).',log_idp_response,)self.log.debug('idp_response.content= %s',idp_response.content)self.log.debug('xpath= %s',xpath)# Extract SAML Assertion from the returned HTML / XMLxml=etree.fromstring(idp_response.content)saml_assertion=xml.xpath(xpath)ifisinstance(saml_assertion,list):iflen(saml_assertion)==1:saml_assertion=saml_assertion[0]ifnotsaml_assertion:raiseValueError('Invalid SAML Assertion')returnsaml_assertiondef_get_web_identity_credential_fetcher(self,)->botocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher:base_session=self.basic_session._sessionorbotocore.session.get_session()client_creator=base_session.create_clientfederation=self.extra_config.get('assume_role_with_web_identity_federation')iffederation=='google':web_identity_token_loader=self._get_google_identity_token_loader()else:raiseAirflowException(f'Unsupported federation: {federation}. Currently "google" only are supported.')returnbotocore.credentials.AssumeRoleWithWebIdentityCredentialFetcher(client_creator=client_creator,web_identity_token_loader=web_identity_token_loader,role_arn=self.role_arn,extra_args=self.conn.assume_role_kwargs,)def_get_google_identity_token_loader(self):fromgoogle.auth.transportimportrequestsasrequests_transportfromairflow.providers.google.common.utils.id_token_credentialsimport(get_default_id_token_credentials,)audience=self.extra_config.get('assume_role_with_web_identity_federation_audience')google_id_token_credentials=get_default_id_token_credentials(target_audience=audience)defweb_identity_token_loader():ifnotgoogle_id_token_credentials.valid:request_adapter=requests_transport.Request()google_id_token_credentials.refresh(request=request_adapter)returngoogle_id_token_credentials.tokenreturnweb_identity_token_loaderdef_strip_invalid_session_name_characters(self,role_session_name:str)->str:returnslugify(role_session_name,regex_pattern=r'[^\w+=,.@-]+')def_get_region_name(self)->Optional[str]:warnings.warn("`BaseSessionFactory._get_region_name` method deprecated and will be removed ""in a future releases. Please use `BaseSessionFactory.region_name` property instead.",DeprecationWarning,stacklevel=2,)returnself.region_namedef_read_role_arn_from_extra_config(self)->Optional[str]:warnings.warn("`BaseSessionFactory._read_role_arn_from_extra_config` method deprecated and will be removed ""in a future releases. Please use `BaseSessionFactory.role_arn` property instead.",DeprecationWarning,stacklevel=2,)returnself.role_arndef_read_credentials_from_connection(self)->Tuple[Optional[str],Optional[str]]:warnings.warn("`BaseSessionFactory._read_credentials_from_connection` method deprecated and will be removed ""in a future releases. Please use `BaseSessionFactory.conn.aws_access_key_id` and ""`BaseSessionFactory.aws_secret_access_key` properties instead.",DeprecationWarning,stacklevel=2,)returnself.conn.aws_access_key_id,self.conn.aws_secret_access_key
[docs]classAwsGenericHook(BaseHook,Generic[BaseAwsConnection]):""" Interact with AWS. This class is a thin wrapper around the boto3 python library. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param client_type: boto3.client client_type. Eg 's3', 'emr' etc :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc :param config: Configuration for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defconn_config(self)->AwsConnectionWrapper:"""Get the Airflow Connection object and wrap it in helper (cached)."""connection=Noneifself.aws_conn_id:try:connection=self.get_connection(self.aws_conn_id)exceptAirflowNotFoundException:warnings.warn(f"Unable to find AWS Connection ID '{self.aws_conn_id}', switching to empty. ""This behaviour is deprecated and will be removed in a future releases. ""Please provide existed AWS connection ID or if required boto3 credential strategy ""explicit set AWS Connection ID to None.",DeprecationWarning,stacklevel=2,)returnAwsConnectionWrapper(conn=connection,region_name=self._region_name,botocore_config=self._config,verify=self._verify
)@property
[docs]defregion_name(self)->Optional[str]:"""AWS Region Name read-only property."""returnself.conn_config.region_name
@property
[docs]defconfig(self)->Optional[Config]:"""Configuration for botocore client read-only property."""returnself.conn_config.botocore_config
@property
[docs]defverify(self)->Optional[Union[bool,str]]:"""Verify or not SSL certificates boto3 client/resource read-only property."""returnself.conn_config.verify
[docs]defget_session(self,region_name:Optional[str]=None)->boto3.session.Session:"""Get the underlying boto3.session.Session(region_name=region_name)."""returnSessionFactory(conn=self.conn_config,region_name=region_name,config=self.config
).create_session()
[docs]defget_client_type(self,region_name:Optional[str]=None,config:Optional[Config]=None,)->boto3.client:"""Get the underlying boto3 client using boto3 session"""client_type=self.client_type# No AWS Operators use the config argument to this method.# Keep backward compatibility with other users who might use itifconfigisNone:config=self.configsession=self.get_session(region_name=region_name)returnsession.client(client_type,endpoint_url=self.conn_config.endpoint_url,config=config,verify=self.verify
)
[docs]defget_resource_type(self,region_name:Optional[str]=None,config:Optional[Config]=None,)->boto3.resource:"""Get the underlying boto3 resource using boto3 session"""resource_type=self.resource_type# No AWS Operators use the config argument to this method.# Keep backward compatibility with other users who might use itifconfigisNone:config=self.configsession=self.get_session(region_name=region_name)returnsession.resource(resource_type,endpoint_url=self.conn_config.endpoint_url,config=config,verify=self.verify
)@cached_property
[docs]defconn(self)->BaseAwsConnection:""" Get the underlying boto3 client/resource (cached) :return: boto3.client or boto3.resource :rtype: Union[boto3.client, boto3.resource] """ifnot((notself.client_type)^(notself.resource_type)):raiseValueError(f"Either client_type={self.client_type!r} or "f"resource_type={self.resource_type!r} must be provided, not both.")elifself.client_type:returnself.get_client_type(region_name=self.region_name)else:returnself.get_resource_type(region_name=self.region_name)
@cached_property
[docs]defconn_client_meta(self)->ClientMeta:"""Get botocore client metadata from Hook connection (cached)."""conn=self.connifisinstance(conn,botocore.client.BaseClient):returnconn.metareturnconn.meta.client.meta
@property
[docs]defconn_region_name(self)->str:"""Get actual AWS Region Name from Hook connection (cached)."""returnself.conn_client_meta.region_name
@property
[docs]defconn_partition(self)->str:"""Get associated AWS Region Partition from Hook connection (cached)."""returnself.conn_client_meta.partition
[docs]defget_conn(self)->BaseAwsConnection:""" Get the underlying boto3 client/resource (cached) Implemented so that caching works as intended. It exists for compatibility with subclasses that rely on a super().get_conn() method. :return: boto3.client or boto3.resource :rtype: Union[boto3.client, boto3.resource] """# Compat shimreturnself.conn
[docs]defget_credentials(self,region_name:Optional[str]=None)->ReadOnlyCredentials:""" Get the underlying `botocore.Credentials` object. This contains the following authentication attributes: access_key, secret_key and token. By use this method also secret_key and token will mask in tasks logs. """# Credentials are refreshable, so accessing your access key and# secret key separately can lead to a race condition.# See https://stackoverflow.com/a/36291428/8283373creds=self.get_session(region_name=region_name).get_credentials().get_frozen_credentials()mask_secret(creds.secret_key)ifcreds.token:mask_secret(creds.token)returncreds
[docs]defexpand_role(self,role:str,region_name:Optional[str]=None)->str:""" If the IAM role is a role name, get the Amazon Resource Name (ARN) for the role. If IAM role is already an IAM role ARN, no change is made. :param role: IAM role name or ARN :param region_name: Optional region name to get credentials for :return: IAM role ARN """if"/"inrole:returnroleelse:session=self.get_session(region_name=region_name)_client=session.client('iam',endpoint_url=self.conn_config.endpoint_url,config=self.config,verify=self.verify)return_client.get_role(RoleName=role)["Role"]["Arn"]
@staticmethod
[docs]defretry(should_retry:Callable[[Exception],bool]):""" A decorator that provides a mechanism to repeat requests in response to exceeding a temporary quote limit. """defretry_decorator(fun:Callable):@wraps(fun)defdecorator_f(self,*args,**kwargs):retry_args=getattr(self,'retry_args',None)ifretry_argsisNone:returnfun(self,*args,**kwargs)multiplier=retry_args.get('multiplier',1)min_limit=retry_args.get('min',1)max_limit=retry_args.get('max',1)stop_after_delay=retry_args.get('stop_after_delay',10)tenacity_before_logger=tenacity.before_log(self.log,logging.INFO)ifself.logelseNonetenacity_after_logger=tenacity.after_log(self.log,logging.INFO)ifself.logelseNonedefault_kwargs={'wait':tenacity.wait_exponential(multiplier=multiplier,max=max_limit,min=min_limit),'retry':tenacity.retry_if_exception(should_retry),'stop':tenacity.stop_after_delay(stop_after_delay),'before':tenacity_before_logger,'after':tenacity_after_logger,}returntenacity.retry(**default_kwargs)(fun)(self,*args,**kwargs)returndecorator_freturnretry_decorator
def_get_credentials(self,region_name:Optional[str])->Tuple[boto3.session.Session,Optional[str]]:warnings.warn("`AwsGenericHook._get_credentials` method deprecated and will be removed in a future releases. ""Please use `AwsGenericHook.get_session` method and ""`AwsGenericHook.conn_config.endpoint_url` property instead.",DeprecationWarning,stacklevel=2,)returnself.get_session(region_name=region_name),self.conn_config.endpoint_url@staticmethod
[docs]defget_ui_field_behaviour()->Dict[str,Any]:"""Returns custom UI field behaviour for AWS Connection."""return{"hidden_fields":["host","schema","port"],"relabeling":{"login":"AWS Access Key ID","password":"AWS Secret Access Key",},"placeholders":{"login":"AKIAIOSFODNN7EXAMPLE","password":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","extra":json.dumps({"region_name":"us-east-1","session_kwargs":{"profile_name":"default"},"config_kwargs":{"retries":{"mode":"standard","max_attempts":10}},"role_arn":"arn:aws:iam::123456789098:role/role-name","assume_role_method":"assume_role","assume_role_kwargs":{"RoleSessionName":"airflow"},"aws_session_token":"AQoDYXdzEJr...EXAMPLETOKEN","endpoint_url":"http://localhost:4566",},indent=2,
),},}
[docs]deftest_connection(self):""" Tests the AWS connection by call AWS STS (Security Token Service) GetCallerIdentity API. .. seealso:: https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html """orig_client_type,self.client_type=self.client_type,'sts'try:res=self.get_client_type().get_caller_identity()metadata=res.pop("ResponseMetadata",{})ifmetadata.get("HTTPStatusCode")==200:returnTrue,json.dumps(res)else:try:returnFalse,json.dumps(metadata)exceptTypeError:returnFalse,str(metadata)exceptExceptionase:returnFalse,str(e)finally:self.client_type=orig_client_type
[docs]classAwsBaseHook(AwsGenericHook[Union[boto3.client,boto3.resource]]):""" Interact with AWS. This class is a thin wrapper around the boto3 python library with basic conn annotation. .. seealso:: :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook` """
[docs]defresolve_session_factory()->Type[BaseSessionFactory]:"""Resolves custom SessionFactory class"""clazz=conf.getimport("aws","session_factory",fallback=None)ifnotclazz:returnBaseSessionFactoryifnotissubclass(clazz,BaseSessionFactory):raiseTypeError(f"Your custom AWS SessionFactory class `{clazz.__name__}` is not a subclass "f"of `{BaseSessionFactory.__name__}`.")returnclazz
def_parse_s3_config(config_file_name:str,config_format:Optional[str]="boto",profile:Optional[str]=None):"""For compatibility with airflow.contrib.hooks.aws_hook"""fromairflow.providers.amazon.aws.utils.connection_wrapperimport_parse_s3_configreturn_parse_s3_config(config_file_name=config_file_name,config_format=config_format,profile=profile,)