Source code for airflow.providers.amazon.aws.auth_manager.avp.facade
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonfromfunctoolsimportcached_propertyfrompathlibimportPathfromtypingimportTYPE_CHECKING,Sequence,TypedDictfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.auth_manager.avp.entitiesimportAvpEntities,get_action_id,get_entity_typefromairflow.providers.amazon.aws.auth_manager.constantsimport(CONF_AVP_POLICY_STORE_ID_KEY,CONF_CONN_ID_KEY,CONF_REGION_NAME_KEY,CONF_SECTION_NAME,)fromairflow.providers.amazon.aws.hooks.verified_permissionsimportVerifiedPermissionsHookfromairflow.utils.helpersimportprune_dictfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromairflow.auth.managers.base_auth_managerimportResourceMethodfromairflow.providers.amazon.aws.auth_manager.userimportAwsAuthManagerUser# Amazon Verified Permissions allows only up to 30 requests per batch_is_authorized call. See# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/verifiedpermissions/client/batch_is_authorized.html
[docs]classAwsAuthManagerAmazonVerifiedPermissionsFacade(LoggingMixin):""" Facade for Amazon Verified Permissions. Used as an intermediate layer between AWS auth manager and Amazon Verified Permissions. """@cached_property
[docs]defavp_policy_store_id(self):"""Get the Amazon Verified Permission policy store ID from config."""returnconf.get_mandatory_value(CONF_SECTION_NAME,CONF_AVP_POLICY_STORE_ID_KEY)
[docs]defis_authorized(self,*,method:ResourceMethod|str,entity_type:AvpEntities,user:AwsAuthManagerUser|None,entity_id:str|None=None,context:dict|None=None,)->bool:""" Make an authorization decision against Amazon Verified Permissions. Check whether the user has permissions to access given resource. :param method: the method to perform. The method can also be a string if the action has been defined in a plugin. In that case, the action can be anything (e.g. can_do). See https://github.com/apache/airflow/issues/39144 :param entity_type: the entity type the user accesses :param user: the user :param entity_id: the entity ID the user accesses. If not provided, all entities of the type will be considered. :param context: optional additional context to pass to Amazon Verified Permissions. """ifuserisNone:returnFalseentity_list=self._get_user_group_entities(user)self.log.debug("Making authorization request for user=%s, method=%s, entity_type=%s, entity_id=%s",user.get_id(),method,entity_type,entity_id,)request_params=prune_dict({"policyStoreId":self.avp_policy_store_id,"principal":{"entityType":get_entity_type(AvpEntities.USER),"entityId":user.get_id()},"action":{"actionType":get_entity_type(AvpEntities.ACTION),"actionId":get_action_id(entity_type,method),},"resource":{"entityType":get_entity_type(entity_type),"entityId":entity_idor"*"},"entities":{"entityList":entity_list},"context":self._build_context(context),})resp=self.avp_client.is_authorized(**request_params)self.log.debug("Authorization response: %s",resp)iflen(resp.get("errors",[]))>0:self.log.error("Error occurred while making an authorization decision. Errors: %s",resp["errors"])raiseAirflowException("Error occurred while making an authorization decision.")returnresp["decision"]=="ALLOW"
[docs]defget_batch_is_authorized_results(self,*,requests:Sequence[IsAuthorizedRequest],user:AwsAuthManagerUser,)->list[dict]:""" Make a batch authorization decision against Amazon Verified Permissions. Return a list of results for each request. :param requests: the list of requests containing the method, the entity_type and the entity ID :param user: the user """entity_list=self._get_user_group_entities(user)self.log.debug("Making batch authorization request for user=%s, requests=%s",user.get_id(),requests)avp_requests=[self._build_is_authorized_request_payload(request,user)forrequestinrequests]avp_requests_chunks=[avp_requests[i:i+NB_REQUESTS_PER_BATCH]foriinrange(0,len(avp_requests),NB_REQUESTS_PER_BATCH)]results=[]foravp_requestsinavp_requests_chunks:resp=self.avp_client.batch_is_authorized(policyStoreId=self.avp_policy_store_id,requests=avp_requests,entities={"entityList":entity_list},)self.log.debug("Authorization response: %s",resp)has_errors=any(len(result.get("errors",[]))>0forresultinresp["results"])ifhas_errors:self.log.error("Error occurred while making a batch authorization decision. Result: %s",resp["results"])raiseAirflowException("Error occurred while making a batch authorization decision.")results.extend(resp["results"])returnresults
[docs]defbatch_is_authorized(self,*,requests:Sequence[IsAuthorizedRequest],user:AwsAuthManagerUser|None,)->bool:""" Make a batch authorization decision against Amazon Verified Permissions. Check whether the user has permissions to access all resources. :param requests: the list of requests containing the method, the entity_type and the entity ID :param user: the user """ifuserisNone:returnFalseresults=self.get_batch_is_authorized_results(requests=requests,user=user)returnall(result["decision"]=="ALLOW"forresultinresults)
[docs]defget_batch_is_authorized_single_result(self,*,batch_is_authorized_results:list[dict],request:IsAuthorizedRequest,user:AwsAuthManagerUser,)->dict:""" Get a specific authorization result from the output of ``get_batch_is_authorized_results``. :param batch_is_authorized_results: the response from the ``batch_is_authorized`` API :param request: the request information. Used to find the result in the response. :param user: the user """request_payload=self._build_is_authorized_request_payload(request,user)forresultinbatch_is_authorized_results:ifresult["request"]==request_payload:returnresultself.log.error("Could not find the authorization result for request %s in results %s.",request_payload,batch_is_authorized_results,)raiseAirflowException("Could not find the authorization result.")
[docs]defis_policy_store_schema_up_to_date(self)->bool:"""Return whether the policy store schema equals the latest version of the schema."""resp=self.avp_client.get_schema(policyStoreId=self.avp_policy_store_id,)policy_store_schema=json.loads(resp["schema"])schema_path=Path(__file__).parents[0]/"schema.json"withopen(schema_path)asschema_file:latest_schema=json.load(schema_file)returnpolicy_store_schema==latest_schema
@staticmethoddef_get_user_group_entities(user:AwsAuthManagerUser)->list[dict]:user_entity={"identifier":{"entityType":get_entity_type(AvpEntities.USER),"entityId":user.get_id()},"parents":[{"entityType":get_entity_type(AvpEntities.GROUP),"entityId":group}forgroupinuser.get_groups()],}group_entities=[{"identifier":{"entityType":get_entity_type(AvpEntities.GROUP),"entityId":group}}forgroupinuser.get_groups()]return[user_entity,*group_entities]@staticmethoddef_build_context(context:dict|None)->dict|None:ifcontextisNoneorlen(context)==0:returnNonereturn{"contextMap":context,}def_build_is_authorized_request_payload(self,request:IsAuthorizedRequest,user:AwsAuthManagerUser):""" Build a payload of an individual authorization request that could be sent through the ``batch_is_authorized`` API. :param request: the request information :param user: the user """returnprune_dict({"principal":{"entityType":get_entity_type(AvpEntities.USER),"entityId":user.get_id()},"action":{"actionType":get_entity_type(AvpEntities.ACTION),"actionId":get_action_id(request["entity_type"],request["method"]),},"resource":{"entityType":get_entity_type(request["entity_type"]),"entityId":request.get("entity_id","*"),},"context":self._build_context(request.get("context")),})