Source code for airflow.providers.amazon.aws.auth_manager.aws_auth_manager
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportargparsefromcollectionsimportdefaultdictfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Container,Sequence,castfromflaskimportsession,url_forfromairflow.cli.cli_configimportCLICommand,DefaultHelpParser,GroupCommandfromairflow.exceptionsimportAirflowOptionalProviderFeatureExceptionfromairflow.providers.amazon.aws.auth_manager.avp.entitiesimportAvpEntitiesfromairflow.providers.amazon.aws.auth_manager.avp.facadeimport(AwsAuthManagerAmazonVerifiedPermissionsFacade,IsAuthorizedRequest,)fromairflow.providers.amazon.aws.auth_manager.cli.definitionimport(AWS_AUTH_MANAGER_COMMANDS,)fromairflow.providers.amazon.aws.auth_manager.security_manager.aws_security_manager_overrideimport(AwsSecurityManagerOverride,)fromairflow.providers.amazon.aws.auth_manager.views.authimportAwsAuthManagerAuthenticationViewstry:fromairflow.auth.managers.base_auth_managerimportBaseAuthManager,ResourceMethodfromairflow.auth.managers.models.resource_detailsimport(AccessView,ConnectionDetails,DagAccessEntity,DagDetails,PoolDetails,VariableDetails,)exceptImportError:raiseAirflowOptionalProviderFeatureException("Failed to import BaseUser. This feature is only available in Airflow versions >= 2.8.0")ifTYPE_CHECKING:fromflask_appbuilder.menuimportMenuItemfromairflow.auth.managers.models.base_userimportBaseUserfromairflow.auth.managers.models.batch_apisimport(IsAuthorizedConnectionRequest,IsAuthorizedDagRequest,IsAuthorizedPoolRequest,IsAuthorizedVariableRequest,)fromairflow.auth.managers.models.resource_detailsimportAssetDetails,ConfigurationDetailsfromairflow.providers.amazon.aws.auth_manager.userimportAwsAuthManagerUserfromairflow.www.extensions.init_appbuilderimportAirflowAppBuilder
[docs]classAwsAuthManager(BaseAuthManager):""" AWS auth manager. Leverages AWS services such as Amazon Identity Center and Amazon Verified Permissions to perform authentication and authorization in Airflow. :param appbuilder: the flask app builder """def__init__(self,appbuilder:AirflowAppBuilder)->None:frompackaging.versionimportVersionfromairflow.versionimportversion# TODO: remove this if block when min_airflow_version is set to higher than 2.9.0ifVersion(version)<Version("2.9"):raiseAirflowOptionalProviderFeatureException("``AwsAuthManager`` is compatible with Airflow versions >= 2.9.")super().__init__(appbuilder)self._check_avp_schema_version()@cached_property
[docs]defbatch_is_authorized_connection(self,requests:Sequence[IsAuthorizedConnectionRequest],)->bool:""" Batch version of ``is_authorized_connection``. :param requests: a list of requests containing the parameters for ``is_authorized_connection`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.CONNECTION,"entity_id":cast(ConnectionDetails,request["details"]).conn_idifrequest.get("details")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]defbatch_is_authorized_dag(self,requests:Sequence[IsAuthorizedDagRequest],)->bool:""" Batch version of ``is_authorized_dag``. :param requests: a list of requests containing the parameters for ``is_authorized_dag`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.DAG,"entity_id":cast(DagDetails,request["details"]).idifrequest.get("details")elseNone,"context":{"dag_entity":{"string":cast(DagAccessEntity,request["access_entity"]).value,},}ifrequest.get("access_entity")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]defbatch_is_authorized_pool(self,requests:Sequence[IsAuthorizedPoolRequest],)->bool:""" Batch version of ``is_authorized_pool``. :param requests: a list of requests containing the parameters for ``is_authorized_pool`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.POOL,"entity_id":cast(PoolDetails,request["details"]).nameifrequest.get("details")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]defbatch_is_authorized_variable(self,requests:Sequence[IsAuthorizedVariableRequest],)->bool:""" Batch version of ``is_authorized_variable``. :param requests: a list of requests containing the parameters for ``is_authorized_variable`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.VARIABLE,"entity_id":cast(VariableDetails,request["details"]).keyifrequest.get("details")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]deffilter_permitted_dag_ids(self,*,dag_ids:set[str],methods:Container[ResourceMethod]|None=None,user=None,):""" Filter readable or writable DAGs for user. :param dag_ids: the list of DAG ids :param methods: whether filter readable or writable :param user: the current user """ifnotmethods:methods=["PUT","GET"]ifnotuser:user=self.get_user()requests:dict[str,dict[ResourceMethod,IsAuthorizedRequest]]=defaultdict(dict)requests_list:list[IsAuthorizedRequest]=[]fordag_idindag_ids:formethodin["GET","PUT"]:ifmethodinmethods:request:IsAuthorizedRequest={"method":cast(ResourceMethod,method),"entity_type":AvpEntities.DAG,"entity_id":dag_id,}requests[dag_id][cast(ResourceMethod,method)]=requestrequests_list.append(request)batch_is_authorized_results=self.avp_facade.get_batch_is_authorized_results(requests=requests_list,user=user)def_has_access_to_dag(request:IsAuthorizedRequest):result=self.avp_facade.get_batch_is_authorized_single_result(batch_is_authorized_results=batch_is_authorized_results,request=request,user=user)returnresult["decision"]=="ALLOW"return{dag_idfordag_idindag_idsif("GET"inmethodsand_has_access_to_dag(requests[dag_id]["GET"])or"PUT"inmethodsand_has_access_to_dag(requests[dag_id]["PUT"]))}
[docs]deffilter_permitted_menu_items(self,menu_items:list[MenuItem])->list[MenuItem]:""" Filter menu items based on user permissions. :param menu_items: list of all menu items """user=self.get_user()ifnotuser:return[]requests:dict[str,IsAuthorizedRequest]={}formenu_iteminmenu_items:ifmenu_item.childs:forchildinmenu_item.childs:requests[child.name]=self._get_menu_item_request(child.name)else:requests[menu_item.name]=self._get_menu_item_request(menu_item.name)batch_is_authorized_results=self.avp_facade.get_batch_is_authorized_results(requests=list(requests.values()),user=user)def_has_access_to_menu_item(request:IsAuthorizedRequest):result=self.avp_facade.get_batch_is_authorized_single_result(batch_is_authorized_results=batch_is_authorized_results,request=request,user=user)returnresult["decision"]=="ALLOW"accessible_items=[]formenu_iteminmenu_items:ifmenu_item.childs:accessible_children=[]forchildinmenu_item.childs:if_has_access_to_menu_item(requests[child.name]):accessible_children.append(child)menu_item.childs=accessible_children# Display the menu if the user has access to at least one sub itemiflen(accessible_children)>0:accessible_items.append(menu_item)elif_has_access_to_menu_item(requests[menu_item.name]):accessible_items.append(menu_item)returnaccessible_items
[docs]defget_cli_commands()->list[CLICommand]:"""Vends CLI commands to be included in Airflow CLI."""return[GroupCommand(name="aws-auth-manager",help="Manage resources used by AWS auth manager",subcommands=AWS_AUTH_MANAGER_COMMANDS,),]
@staticmethoddef_get_menu_item_request(resource_name:str)->IsAuthorizedRequest:return{"method":"MENU","entity_type":AvpEntities.MENU,"entity_id":resource_name,}def_check_avp_schema_version(self):ifnotself.avp_facade.is_policy_store_schema_up_to_date():self.log.warning("The Amazon Verified Permissions policy store schema is different from the latest version ""(https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/auth_manager/avp/schema.json). ""Please update it to its latest version. ""See doc: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/auth-manager/setup/amazon-verified-permissions.html#update-the-policy-store-schema.")
[docs]defget_parser()->argparse.ArgumentParser:"""Generate documentation; used by Sphinx argparse."""fromairflow.cli.cli_parserimportAirflowHelpFormatter,_add_commandparser=DefaultHelpParser(prog="airflow",formatter_class=AirflowHelpFormatter)subparsers=parser.add_subparsers(dest="subcommand",metavar="GROUP_OR_COMMAND")forgroup_commandinAwsAuthManager.get_cli_commands():_add_command(subparsers,group_command)returnparser