Source code for airflow.providers.google.cloud.utils.external_token_supplier
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportabcimporttimefromfunctoolsimportwrapsfromtypingimportTYPE_CHECKING,Anyimportrequestsfromgoogle.auth.exceptionsimportRefreshErrorfromgoogle.auth.identity_poolimportSubjectTokenSupplierifTYPE_CHECKING:fromgoogle.auth.external_accountimportSupplierContextfromgoogle.auth.transportimportRequestfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]defcache_token_decorator(get_subject_token_method):""" Cache calls to ``SubjectTokenSupplier`` instances' ``get_token_supplier`` methods. Different instances of a same SubjectTokenSupplier class with the same attributes share the OIDC token cache. :param get_subject_token_method: A method that returns both a token and an integer specifying the time in seconds until the token expires See also: https://googleapis.dev/python/google-auth/latest/reference/google.auth.identity_pool.html#google.auth.identity_pool.SubjectTokenSupplier.get_subject_token """cache={}@wraps(get_subject_token_method)defwrapper(supplier_instance:CacheTokenSupplier,*args,**kwargs)->str:""" Obeys the interface set by ``SubjectTokenSupplier`` for ``get_subject_token`` methods. :param supplier_instance: the SubjectTokenSupplier instance whose get_subject_token method is being decorated :return: The token string """nonlocalcachecache_key=supplier_instance.get_subject_key()token:dict[str,str|float]={}ifcache_keynotincacheorcache[cache_key]["expiration_time"]<time.monotonic():supplier_instance.log.info("OIDC token missing or expired")try:access_token,expires_in=get_subject_token_method(supplier_instance,*args,**kwargs)ifnotisinstance(expires_in,int)ornotisinstance(access_token,str):raiseRefreshError# assume error if strange values are providedexceptRefreshError:supplier_instance.log.error("Failed retrieving new OIDC Token from IdP")raiseexpiration_time=time.monotonic()+float(expires_in)token["access_token"]=access_tokentoken["expiration_time"]=expiration_timecache[cache_key]=tokensupplier_instance.log.info("New OIDC token retrieved, expires in %s seconds.",expires_in)returncache[cache_key]["access_token"]returnwrapper
[docs]classCacheTokenSupplier(LoggingMixin,SubjectTokenSupplier):""" A superclass for all Subject Token Supplier classes that wish to implement a caching mechanism. Child classes must implement the ``get_subject_key`` method to generate a string that serves as the cache key, ensuring that tokens are shared appropriately among instances. Methods: get_subject_key: Abstract method to be implemented by child classes. It should return a string that serves as the cache key. """def__init__(self):super().__init__()@abc.abstractmethod
[docs]classClientCredentialsGrantFlowTokenSupplier(CacheTokenSupplier):""" Class that retrieves an OIDC token from an external IdP using OAuth2.0 Client Credentials Grant flow. This class implements the ``SubjectTokenSupplier`` interface class used by ``google.auth.identity_pool.Credentials`` :params oidc_issuer_url: URL of the IdP that performs OAuth2.0 Client Credentials Grant flow and returns an OIDC token. :params client_id: Client ID of the application requesting the token :params client_secret: Client secret of the application requesting the token :params extra_params_kwargs: Extra parameters to be passed in the payload of the POST request to the `oidc_issuer_url` See also: https://googleapis.dev/python/google-auth/latest/reference/google.auth.identity_pool.html#google.auth.identity_pool.SubjectTokenSupplier """def__init__(self,oidc_issuer_url:str,client_id:str,client_secret:str,**extra_params_kwargs:Any,)->None:super().__init__()self.oidc_issuer_url=oidc_issuer_urlself.client_id=client_idself.client_secret=client_secretself.extra_params_kwargs=extra_params_kwargs@cache_token_decorator
[docs]defget_subject_token(self,context:SupplierContext,request:Request)->tuple[str,int]:"""Perform Client Credentials Grant flow with IdP and retrieves an OIDC token and expiration time."""self.log.info("Requesting new OIDC token from external IdP.")try:response=requests.post(self.oidc_issuer_url,data={"grant_type":"client_credentials","client_id":self.client_id,"client_secret":self.client_secret,**self.extra_params_kwargs,},)response.raise_for_status()exceptrequests.HTTPErrorase:raiseRefreshError(str(e))exceptrequests.ConnectionErrorase:raiseRefreshError(str(e))try:response_dict=response.json()exceptrequests.JSONDecodeError:raiseRefreshError(f"Didn't get a json response from {self.oidc_issuer_url}")# These fields are requiredif{"access_token","expires_in"}-set(response_dict.keys()):# TODO more information about the error can be provided in the exception by inspecting the responseraiseRefreshError(f"No access token returned from {self.oidc_issuer_url}")returnresponse_dict["access_token"],response_dict["expires_in"]
[docs]defget_subject_key(self)->str:""" Create a cache key using the OIDC issuer URL, client ID, client secret and additional parameters. Instances with the same credentials will share tokens. """cache_key=(self.oidc_issuer_url+self.client_id+self.client_secret+",".join(sorted(self.extra_params_kwargs)))returncache_key