Source code for airflow.providers.amazon.aws.auth_manager.aws_auth_manager
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportargparsefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Sequence,castfromflaskimportsession,url_forfromairflow.cli.cli_configimportCLICommand,DefaultHelpParser,GroupCommandfromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowOptionalProviderFeatureExceptionfromairflow.providers.amazon.aws.auth_manager.avp.entitiesimportAvpEntitiesfromairflow.providers.amazon.aws.auth_manager.avp.facadeimport(AwsAuthManagerAmazonVerifiedPermissionsFacade,IsAuthorizedRequest,)fromairflow.providers.amazon.aws.auth_manager.cli.definitionimport(AWS_AUTH_MANAGER_COMMANDS,)fromairflow.providers.amazon.aws.auth_manager.constantsimport(CONF_ENABLE_KEY,CONF_SECTION_NAME,)fromairflow.providers.amazon.aws.auth_manager.security_manager.aws_security_manager_overrideimport(AwsSecurityManagerOverride,)fromairflow.security.permissionsimport(RESOURCE_AUDIT_LOG,RESOURCE_CLUSTER_ACTIVITY,RESOURCE_CONFIG,RESOURCE_CONNECTION,RESOURCE_DAG,RESOURCE_DAG_CODE,RESOURCE_DAG_DEPENDENCIES,RESOURCE_DAG_RUN,RESOURCE_DATASET,RESOURCE_DOCS,RESOURCE_JOB,RESOURCE_PLUGIN,RESOURCE_POOL,RESOURCE_PROVIDER,RESOURCE_SLA_MISS,RESOURCE_TASK_INSTANCE,RESOURCE_TASK_RESCHEDULE,RESOURCE_TRIGGER,RESOURCE_VARIABLE,RESOURCE_XCOM,)try:fromairflow.auth.managers.base_auth_managerimportBaseAuthManager,ResourceMethodfromairflow.auth.managers.models.resource_detailsimport(AccessView,ConnectionDetails,DagAccessEntity,DagDetails,PoolDetails,VariableDetails,)exceptImportError:raiseAirflowOptionalProviderFeatureException("Failed to import BaseUser. This feature is only available in Airflow versions >= 2.8.0")ifTYPE_CHECKING:fromflask_appbuilder.menuimportMenuItemfromairflow.auth.managers.models.base_userimportBaseUserfromairflow.auth.managers.models.batch_apisimport(IsAuthorizedConnectionRequest,IsAuthorizedDagRequest,IsAuthorizedPoolRequest,IsAuthorizedVariableRequest,)fromairflow.auth.managers.models.resource_detailsimport(ConfigurationDetails,DatasetDetails,)fromairflow.providers.amazon.aws.auth_manager.userimportAwsAuthManagerUserfromairflow.www.extensions.init_appbuilderimportAirflowAppBuilder_MENU_ITEM_REQUESTS:dict[str,IsAuthorizedRequest]={RESOURCE_AUDIT_LOG:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.AUDIT_LOG.value,},},},RESOURCE_CLUSTER_ACTIVITY:{"method":"GET","entity_type":AvpEntities.VIEW,"entity_id":AccessView.CLUSTER_ACTIVITY.value,},RESOURCE_CONFIG:{"method":"GET","entity_type":AvpEntities.CONFIGURATION,},RESOURCE_CONNECTION:{"method":"GET","entity_type":AvpEntities.CONNECTION,},RESOURCE_DAG:{"method":"GET","entity_type":AvpEntities.DAG,},RESOURCE_DAG_CODE:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.CODE.value,},},},RESOURCE_DAG_DEPENDENCIES:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.DEPENDENCIES.value,},},},RESOURCE_DAG_RUN:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.RUN.value,},},},RESOURCE_DATASET:{"method":"GET","entity_type":AvpEntities.DATASET,},RESOURCE_DOCS:{"method":"GET","entity_type":AvpEntities.VIEW,"entity_id":AccessView.DOCS.value,},RESOURCE_PLUGIN:{"method":"GET","entity_type":AvpEntities.VIEW,"entity_id":AccessView.PLUGINS.value,},RESOURCE_JOB:{"method":"GET","entity_type":AvpEntities.VIEW,"entity_id":AccessView.JOBS.value,},RESOURCE_POOL:{"method":"GET","entity_type":AvpEntities.POOL,},RESOURCE_PROVIDER:{"method":"GET","entity_type":AvpEntities.VIEW,"entity_id":AccessView.PROVIDERS.value,},RESOURCE_SLA_MISS:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.SLA_MISS.value,},},},RESOURCE_TASK_INSTANCE:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.TASK_INSTANCE.value,},},},RESOURCE_TASK_RESCHEDULE:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.TASK_RESCHEDULE.value,},},},RESOURCE_TRIGGER:{"method":"GET","entity_type":AvpEntities.VIEW,"entity_id":AccessView.TRIGGERS.value,},RESOURCE_VARIABLE:{"method":"GET","entity_type":AvpEntities.VARIABLE,},RESOURCE_XCOM:{"method":"GET","entity_type":AvpEntities.DAG,"context":{"dag_entity":{"string":DagAccessEntity.XCOM.value,},},},}
[docs]classAwsAuthManager(BaseAuthManager):""" AWS auth manager. Leverages AWS services such as Amazon Identity Center and Amazon Verified Permissions to perform authentication and authorization in Airflow. :param appbuilder: the flask app builder """def__init__(self,appbuilder:AirflowAppBuilder)->None:super().__init__(appbuilder)enable=conf.getboolean(CONF_SECTION_NAME,CONF_ENABLE_KEY)ifnotenable:raiseNotImplementedError("The AWS auth manager is currently being built. It is not finalized. It is not intended to be used yet.")@cached_property
[docs]defbatch_is_authorized_connection(self,requests:Sequence[IsAuthorizedConnectionRequest],)->bool:""" Batch version of ``is_authorized_connection``. :param requests: a list of requests containing the parameters for ``is_authorized_connection`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.CONNECTION,"entity_id":cast(ConnectionDetails,request["details"]).conn_idifrequest.get("details")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]defbatch_is_authorized_dag(self,requests:Sequence[IsAuthorizedDagRequest],)->bool:""" Batch version of ``is_authorized_dag``. :param requests: a list of requests containing the parameters for ``is_authorized_dag`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.DAG,"entity_id":cast(DagDetails,request["details"]).idifrequest.get("details")elseNone,"context":{"dag_entity":{"string":cast(DagAccessEntity,request["access_entity"]).value,},}ifrequest.get("access_entity")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]defbatch_is_authorized_pool(self,requests:Sequence[IsAuthorizedPoolRequest],)->bool:""" Batch version of ``is_authorized_pool``. :param requests: a list of requests containing the parameters for ``is_authorized_pool`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.POOL,"entity_id":cast(PoolDetails,request["details"]).nameifrequest.get("details")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]defbatch_is_authorized_variable(self,requests:Sequence[IsAuthorizedVariableRequest],)->bool:""" Batch version of ``is_authorized_variable``. :param requests: a list of requests containing the parameters for ``is_authorized_variable`` """facade_requests:Sequence[IsAuthorizedRequest]=[{"method":request["method"],"entity_type":AvpEntities.VARIABLE,"entity_id":cast(VariableDetails,request["details"]).keyifrequest.get("details")elseNone,}forrequestinrequests]returnself.avp_facade.batch_is_authorized(requests=facade_requests,user=self.get_user())
[docs]deffilter_permitted_menu_items(self,menu_items:list[MenuItem])->list[MenuItem]:""" Filter menu items based on user permissions. :param menu_items: list of all menu items """user=self.get_user()ifnotuser:return[]requests:dict[str,IsAuthorizedRequest]={}formenu_iteminmenu_items:ifmenu_item.childs:forchildinmenu_item.childs:requests[child.name]=self._get_menu_item_request(child.name)else:requests[menu_item.name]=self._get_menu_item_request(menu_item.name)batch_is_authorized_results=self.avp_facade.get_batch_is_authorized_results(requests=list(requests.values()),user=user)accessible_items=[]formenu_iteminmenu_items:ifmenu_item.childs:accessible_children=[]forchildinmenu_item.childs:ifself._has_access_to_menu_item(batch_is_authorized_results,requests[child.name],user):accessible_children.append(child)menu_item.childs=accessible_children# Display the menu if the user has access to at least one sub itemiflen(accessible_children)>0:accessible_items.append(menu_item)elifself._has_access_to_menu_item(batch_is_authorized_results,requests[menu_item.name],user):accessible_items.append(menu_item)returnaccessible_items
[docs]defget_cli_commands()->list[CLICommand]:"""Vends CLI commands to be included in Airflow CLI."""return[GroupCommand(name="aws-auth-manager",help="Manage resources used by AWS auth manager",subcommands=AWS_AUTH_MANAGER_COMMANDS,),]
@staticmethoddef_get_menu_item_request(fab_resource_name:str)->IsAuthorizedRequest:menu_item_request=_MENU_ITEM_REQUESTS.get(fab_resource_name)ifmenu_item_request:returnmenu_item_requestelse:raiseAirflowException(f"Unknown resource name {fab_resource_name}")def_has_access_to_menu_item(self,batch_is_authorized_results:list[dict],request:IsAuthorizedRequest,user:AwsAuthManagerUser):result=self.avp_facade.get_batch_is_authorized_single_result(batch_is_authorized_results=batch_is_authorized_results,request=request,user=user)returnresult["decision"]=="ALLOW"
[docs]defget_parser()->argparse.ArgumentParser:"""Generate documentation; used by Sphinx argparse."""fromairflow.cli.cli_parserimportAirflowHelpFormatter,_add_commandparser=DefaultHelpParser(prog="airflow",formatter_class=AirflowHelpFormatter)subparsers=parser.add_subparsers(dest="subcommand",metavar="GROUP_OR_COMMAND")forgroup_commandinAwsAuthManager.get_cli_commands():_add_command(subparsers,group_command)returnparser