## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#importwarningsfromtypingimportDict,List,Optional,Sequence,Set,Tuplefromflaskimportcurrent_app,gfromflask_appbuilder.security.sqlaimportmodelsassqla_modelsfromflask_appbuilder.security.sqla.modelsimportPermission,PermissionView,Role,User,ViewMenufromsqlalchemyimportor_fromsqlalchemy.ormimportjoinedloadfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportDagBag,DagModelfromairflow.securityimportpermissionsfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.sessionimportprovide_sessionfromairflow.www.fab_security.sqla.managerimportSecurityManagerfromairflow.www.utilsimportCustomSQLAInterfacefromairflow.www.viewsimport(CustomPermissionModelView,CustomPermissionViewModelView,CustomResetMyPasswordView,CustomResetPasswordView,CustomRoleModelView,CustomUserDBModelView,CustomUserInfoEditView,CustomUserLDAPModelView,CustomUserOAuthModelView,CustomUserOIDModelView,CustomUserRemoteUserModelView,CustomUserStatsChartView,CustomViewMenuModelView,)EXISTING_ROLES={'Admin','Viewer','User','Op','Public',}classAirflowSecurityManager(SecurityManager,LoggingMixin):"""Custom security manager, which introduces a permission model adapted to Airflow"""############################################################################ PERMISSIONS############################################################################ [START security_viewer_perms]VIEWER_PERMISSIONS=[(permissions.ACTION_CAN_READ,permissions.RESOURCE_AUDIT_LOG),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG_DEPENDENCIES),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG_CODE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_READ,permissions.RESOURCE_IMPORT_ERROR),(permissions.ACTION_CAN_READ,permissions.RESOURCE_JOB),(permissions.ACTION_CAN_READ,permissions.RESOURCE_MY_PASSWORD),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_MY_PASSWORD),(permissions.ACTION_CAN_READ,permissions.RESOURCE_MY_PROFILE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_MY_PROFILE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_PLUGIN),(permissions.ACTION_CAN_READ,permissions.RESOURCE_SLA_MISS),(permissions.ACTION_CAN_READ,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_TASK_LOG),(permissions.ACTION_CAN_READ,permissions.RESOURCE_XCOM),(permissions.ACTION_CAN_READ,permissions.RESOURCE_WEBSITE),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_BROWSE_MENU),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DAG_DEPENDENCIES),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DOCS),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DOCS_MENU),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_JOB),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_AUDIT_LOG),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_PLUGIN),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_SLA_MISS),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_TASK_INSTANCE),]# [END security_viewer_perms]# [START security_user_perms]USER_PERMISSIONS=[(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_DAG_RUN),]# [END security_user_perms]# [START security_op_perms]OP_PERMISSIONS=[(permissions.ACTION_CAN_READ,permissions.RESOURCE_CONFIG),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_ADMIN_MENU),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_CONFIG),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_XCOM),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_READ,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_READ,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_READ,permissions.RESOURCE_PROVIDER),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_XCOM),]# [END security_op_perms]ADMIN_PERMISSIONS=[(permissions.ACTION_CAN_READ,permissions.RESOURCE_TASK_RESCHEDULE),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_TASK_RESCHEDULE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_TRIGGER),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_TRIGGER),(permissions.ACTION_CAN_READ,permissions.RESOURCE_PASSWORD),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_PASSWORD),(permissions.ACTION_CAN_READ,permissions.RESOURCE_ROLE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_ROLE),]# global resource for dag-level accessDAG_RESOURCES={permissions.RESOURCE_DAG}DAG_ACTIONS=permissions.DAG_ACTIONS############################################################################ DEFAULT ROLE CONFIGURATIONS###########################################################################ROLE_CONFIGS=[{'role':'Public','perms':[]},{'role':'Viewer','perms':VIEWER_PERMISSIONS},{'role':'User','perms':VIEWER_PERMISSIONS+USER_PERMISSIONS,},{'role':'Op','perms':VIEWER_PERMISSIONS+USER_PERMISSIONS+OP_PERMISSIONS,},{'role':'Admin','perms':VIEWER_PERMISSIONS+USER_PERMISSIONS+OP_PERMISSIONS+ADMIN_PERMISSIONS,},]permissionmodelview=CustomPermissionModelViewpermissionviewmodelview=CustomPermissionViewModelViewrolemodelview=CustomRoleModelViewviewmenumodelview=CustomViewMenuModelViewuserdbmodelview=CustomUserDBModelViewresetmypasswordview=CustomResetMyPasswordViewresetpasswordview=CustomResetPasswordViewuserinfoeditview=CustomUserInfoEditViewuserldapmodelview=CustomUserLDAPModelViewuseroauthmodelview=CustomUserOAuthModelViewuserremoteusermodelview=CustomUserRemoteUserModelViewuseroidmodelview=CustomUserOIDModelViewuserstatschartview=CustomUserStatsChartViewdef__init__(self,appbuilder):super().__init__(appbuilder)# Go and fix up the SQLAInterface used from the stock one to our subclass.# This is needed to support the "hack" where we had to edit# FieldConverter.conversion_table in place in airflow.www.utilsforattrindir(self):ifnotattr.endswith('view'):continueview=getattr(self,attr,None)ifnotviewornotgetattr(view,'datamodel',None):continueview.datamodel=CustomSQLAInterface(view.datamodel.obj)self.perms=Nonedefinit_role(self,role_name,perms):""" Initialize the role with actions and related resources. :param role_name: :param perms: :return: """warnings.warn("`init_role` has been deprecated. Please use `bulk_sync_roles` instead.",DeprecationWarning,stacklevel=2,)self.bulk_sync_roles([{'role':role_name,'perms':perms}])defbulk_sync_roles(self,roles):"""Sync the provided roles and permissions."""existing_roles=self._get_all_roles_with_permissions()non_dag_perms=self._get_all_non_dag_permissions()forconfiginroles:role_name=config['role']perms=config['perms']role=existing_roles.get(role_name)orself.add_role(role_name)foraction_name,resource_nameinperms:perm=non_dag_perms.get((action_name,resource_name))orself.create_permission(action_name,resource_name)ifpermnotinrole.permissions:self.add_permission_to_role(role,perm)defadd_permissions(self,role,perms):"""Adds permissions to a given role."""foraction_name,resource_nameinperms:permission=self.create_permission(action_name,resource_name)self.add_permission_to_role(role,permission)defget_resource(self,name:str)->ViewMenu:""" Returns a resource record by name, if it exists. :param name: Name of resource :type name: str :return: Resource record :rtype: ViewMenu """returnself.find_view_menu(name)defget_all_resources(self)->List[ViewMenu]:""" Gets all existing resource records. :return: List of all resources :rtype: List[ViewMenu] """returnself.get_all_view_menu()defget_action(self,name:str)->Permission:""" Gets an existing action record. :param name: name :type name: str :return: Action record, if it exists :rtype: Permission """returnself.find_permission(name)defget_permission(self,action_name:str,resource_name:str)->PermissionView:""" Gets a permission made with the given action->resource pair, if the permission already exists. :param action_name: Name of action :type action_name: str :param resource_name: Name of resource :type resource_name: str :return: The existing permission :rtype: PermissionView """returnself.find_permission_view_menu(action_name,resource_name)defcreate_permission(self,action_name:str,resource_name:str)->PermissionView:""" Creates a permission linking an action and resource. :param action_name: Name of existing action :type action_name: str :param resource_name: Name of existing resource :type resource_name: str :return: Resource created :rtype: PermissionView """returnself.add_permission_view_menu(action_name,resource_name)defdelete_permission(self,action_name:str,resource_name:str)->None:""" Deletes the permission linking an action->resource pair. Doesn't delete the underlying action or resource. :param action_name: Name of existing action :type action_name: str :param resource_name: Name of existing resource :type resource_name: str :return: None :rtype: None """self.del_permission_view_menu(action_name,resource_name)defdelete_role(self,role_name):""" Delete the given Role :param role_name: the name of a role in the ab_role table """session=self.get_sessionrole=session.query(sqla_models.Role).filter(sqla_models.Role.name==role_name).first()ifrole:self.log.info("Deleting role '%s'",role_name)session.delete(role)session.commit()else:raiseAirflowException(f"Role named '{role_name}' does not exist")@staticmethoddefget_user_roles(user=None):""" Get all the roles associated with the user. :param user: the ab_user in FAB model. :return: a list of roles associated with the user. """ifuserisNone:user=g.userifuser.is_anonymous:public_role=current_app.appbuilder.get_app.config["AUTH_ROLE_PUBLIC"]return[current_app.appbuilder.sm.find_role(public_role)]ifpublic_roleelse[]returnuser.rolesdefget_current_user_permissions(self):"""Returns permissions for logged in user as a set of tuples with the action and resource name"""perms=set()forroleinself.get_user_roles():perms.update({(perm.permission.name,perm.view_menu.name)forperminrole.permissions})returnpermsdefcurrent_user_has_permissions(self)->bool:forroleinself.get_user_roles():ifrole.permissions:returnTruereturnFalsedefget_readable_dags(self,user):"""Gets the DAGs readable by authenticated user."""returnself.get_accessible_dags([permissions.ACTION_CAN_READ],user)defget_editable_dags(self,user):"""Gets the DAGs editable by authenticated user."""returnself.get_accessible_dags([permissions.ACTION_CAN_EDIT],user)defget_readable_dag_ids(self,user)->Set[str]:"""Gets the DAG IDs readable by authenticated user."""return{dag.dag_idfordaginself.get_readable_dags(user)}defget_editable_dag_ids(self,user)->Set[str]:"""Gets the DAG IDs editable by authenticated user."""return{dag.dag_idfordaginself.get_editable_dags(user)}defget_accessible_dag_ids(self,user)->Set[str]:"""Gets the DAG IDs editable or readable by authenticated user."""accessible_dags=self.get_accessible_dags([permissions.ACTION_CAN_EDIT,permissions.ACTION_CAN_READ],user)return{dag.dag_idfordaginaccessible_dags}@provide_sessiondefget_accessible_dags(self,user_actions,user,session=None):"""Generic function to get readable or writable DAGs for user."""ifuser.is_anonymous:roles=self.get_user_roles(user)else:user_query=(session.query(User).options(joinedload(User.roles).subqueryload(Role.permissions).options(joinedload(PermissionView.permission),joinedload(PermissionView.view_menu))).filter(User.id==user.id).first())roles=user_query.rolesresources=set()forroleinroles:forpermissioninrole.permissions:action=permission.permission.nameifactionnotinuser_actions:continueresource=permission.view_menu.nameifresource==permissions.RESOURCE_DAG:returnsession.query(DagModel)ifresource.startswith(permissions.RESOURCE_DAG_PREFIX):resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX):])else:resources.add(resource)returnsession.query(DagModel).filter(DagModel.dag_id.in_(resources))defcan_access_some_dags(self,action:str,dag_id:Optional[str]=None)->bool:"""Checks if user has read or write access to some dags."""ifdag_idanddag_id!='~':returnself.has_access(action,permissions.resource_name_for_dag(dag_id))user=g.userifaction==permissions.ACTION_CAN_READ:returnany(self.get_readable_dags(user))returnany(self.get_editable_dags(user))defcan_read_dag(self,dag_id,user=None)->bool:"""Determines whether a user has DAG read access."""ifnotuser:user=g.user# To account for SubDagsroot_dag_id=dag_id.split(".")[0]dag_resource_name=permissions.resource_name_for_dag(root_dag_id)returnself._has_access(user,permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG)orself._has_access(user,permissions.ACTION_CAN_READ,dag_resource_name)defcan_edit_dag(self,dag_id,user=None)->bool:"""Determines whether a user has DAG edit access."""ifnotuser:user=g.user# To account for SubDagsroot_dag_id=dag_id.split(".")[0]dag_resource_name=permissions.resource_name_for_dag(root_dag_id)returnself._has_access(user,permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG)orself._has_access(user,permissions.ACTION_CAN_EDIT,dag_resource_name)defprefixed_dag_id(self,dag_id):"""Returns the permission name for a DAG id."""warnings.warn("`prefixed_dag_id` has been deprecated. ""Please use `airflow.security.permissions.resource_name_for_dag` instead.",DeprecationWarning,stacklevel=2,)returnpermissions.resource_name_for_dag(dag_id)defis_dag_resource(self,resource_name):"""Determines if a resource belongs to a DAG or all DAGs."""ifresource_name==permissions.RESOURCE_DAG:returnTruereturnresource_name.startswith(permissions.RESOURCE_DAG_PREFIX)def_has_view_access(self,user,action,resource)->bool:""" Overriding the method to ensure that it always returns a bool _has_view_access can return NoneType which gives us issues later on, this fixes that. """returnbool(super()._has_view_access(user,action,resource))defhas_access(self,action_name,resource_name,user=None)->bool:""" Verify whether a given user could perform a certain action (e.g can_read, can_write) on the given resource. :param action_name: action_name on resource (e.g can_read, can_edit). :type action_name: str :param resource_name: name of view-menu or resource. :type resource_name: str :param user: user name :type user: str :return: Whether user could perform certain action on the resource. :rtype bool """ifnotuser:user=g.userifuser.is_anonymous:user.roles=self.get_user_roles(user)has_access=self._has_access(user,action_name,resource_name)# FAB built-in view access method. Won't work for AllDag access.ifself.is_dag_resource(resource_name):ifaction_name==permissions.ACTION_CAN_READ:has_access|=self.can_read_dag(resource_name,user)elifaction_name==permissions.ACTION_CAN_EDIT:has_access|=self.can_edit_dag(resource_name,user)returnhas_accessdef_has_access(self,user:User,action_name:str,resource_name:str)->bool:""" Wraps the FAB built-in view access method. Won't work for AllDag access. :param user: user object :type user: User :param action_name: action_name on resource (e.g can_read, can_edit). :type action_name: str :param resource_name: name of resource. :type resource_name: str :return: a bool whether user could perform certain action on the resource. :rtype bool """returnbool(self._has_view_access(user,action_name,resource_name))def_get_and_cache_perms(self):"""Cache permissions"""self.perms=self.get_current_user_permissions()def_has_role(self,role_name_or_list):"""Whether the user has this role name"""ifnotisinstance(role_name_or_list,list):role_name_or_list=[role_name_or_list]returnany(r.nameinrole_name_or_listforrinself.get_user_roles())def_has_perm(self,action_name,resource_name):"""Whether the user has this perm"""ifhasattr(self,'perms')andself.permsisnotNone:if(action_name,resource_name)inself.perms:returnTrue# rebuild the permissions setself._get_and_cache_perms()return(action_name,resource_name)inself.permsdefhas_all_dags_access(self):""" Has all the dag access in any of the 3 cases: 1. Role needs to be in (Admin, Viewer, User, Op). 2. Has can_read action on dags resource. 3. Has can_edit action on dags resource. """return(self._has_role(['Admin','Viewer','Op','User'])orself._has_perm(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG)orself._has_perm(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG))defclean_perms(self):"""FAB leaves faulty permissions that need to be cleaned up"""self.log.debug('Cleaning faulty perms')sesh=self.get_sessionperms=sesh.query(sqla_models.PermissionView).filter(or_(sqla_models.PermissionView.permission==None,# noqasqla_models.PermissionView.view_menu==None,))# Since FAB doesn't define ON DELETE CASCADE on these tables, we need# to delete the _object_ so that SQLA knows to delete the many-to-many# relationship object too. :(deleted_count=0forperminperms:sesh.delete(perm)deleted_count+=1sesh.commit()ifdeleted_count:self.log.info('Deleted %s faulty permissions',deleted_count)def_merge_perm(self,action_name,resource_name):""" Add the new (action, resource) to assoc_permissionview_role if it doesn't exist. It will add the related entry to ab_permission and ab_view_menu two meta tables as well. :param action_name: Name of the action :type action_name: str :param resource_name: Name of the resource :type resource_name: str :return: """action=self.get_action(action_name)resource=self.get_resource(resource_name)perm=Noneifactionandresource:perm=(self.get_session.query(self.permissionview_model).filter_by(permission=action,view_menu=resource).first())ifnotpermandaction_nameandresource_name:self.create_permission(action_name,resource_name)defadd_homepage_access_to_custom_roles(self):""" Add Website.can_read access to all custom roles. :return: None. """website_permission=self.create_permission(permissions.ACTION_CAN_READ,permissions.RESOURCE_WEBSITE)custom_roles=[roleforroleinself.get_all_roles()ifrole.namenotinEXISTING_ROLES]forroleincustom_roles:self.add_permission_to_role(role,website_permission)self.get_session.commit()defadd_permission_to_role(self,role:Role,permission:PermissionView)->None:""" Add an existing permission pair to a role. :param role: The role about to get a new permission. :type role: Role :param permission: The permission pair to add to a role. :type permission: PermissionView :return: None :rtype: None """self.add_permission_role(role,permission)defremove_permission_from_role(self,role:Role,permission:PermissionView)->None:""" Remove a permission pair from a role. :param role: User role containing permissions. :type role: Role :param permission: Object representing resource-> action pair :type permission: PermissionView """self.del_permission_role(role,permission)defdelete_action(self,name:str)->bool:""" Deletes a permission action. :param name: Name of action to delete (e.g. can_read). :type name: str :return: Whether or not delete was successful. :rtype: bool """returnself.del_permission(name)defget_all_permissions(self)->Set[Tuple[str,str]]:"""Returns all permissions as a set of tuples with the action and resource names"""returnset(self.get_session.query(self.permissionview_model).join(self.permission_model).join(self.viewmenu_model).with_entities(self.permission_model.name,self.viewmenu_model.name).all())def_get_all_non_dag_permissions(self)->Dict[Tuple[str,str],PermissionView]:""" Returns a dict with a key of (action_name, resource_name) and value of permission with all permissions except those that are for specific DAGs. """return{(action_name,resource_name):viewmodelforaction_name,resource_name,viewmodelin(self.get_session.query(self.permissionview_model).join(self.permission_model).join(self.viewmenu_model).filter(~self.viewmenu_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")).with_entities(self.permission_model.name,self.viewmenu_model.name,self.permissionview_model).all())}def_get_all_roles_with_permissions(self)->Dict[str,Role]:"""Returns a dict with a key of role name and value of role with eagrly loaded permissions"""return{r.name:rforrin(self.get_session.query(self.role_model).options(joinedload(self.role_model.permissions)).all())}defcreate_dag_specific_permissions(self)->None:""" Creates 'can_read' and 'can_edit' permissions for all DAGs, along with any `access_control` permissions provided in them. This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` if you only need to sync a single DAG. :return: None. """perms=self.get_all_permissions()dagbag=DagBag(read_dags_from_db=True)dagbag.collect_dags_from_db()dags=dagbag.dags.values()fordagindags:dag_resource_name=permissions.resource_name_for_dag(dag.dag_id)foraction_nameinself.DAG_ACTIONS:if(action_name,dag_resource_name)notinperms:self._merge_perm(action_name,dag_resource_name)ifdag.access_control:self.sync_perm_for_dag(dag_resource_name,dag.access_control)defupdate_admin_permission(self):""" Admin should have all the permissions, except the dag permissions. because Admin already has Dags permission. Add the missing ones to the table for admin. :return: None. """dag_resources=(self.get_session.query(sqla_models.ViewMenu).filter(sqla_models.ViewMenu.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")).all())resource_ids=[resource.idforresourceindag_resources]perms=(self.get_session.query(sqla_models.PermissionView).filter(~sqla_models.PermissionView.view_menu_id.in_(resource_ids)).all())perms=[pforpinpermsifp.permissionandp.view_menu]admin=self.find_role('Admin')admin.permissions=list(set(admin.permissions)|set(perms))self.get_session.commit()defsync_roles(self):""" 1. Init the default role(Admin, Viewer, User, Op, public) with related permissions. 2. Init the custom role(dag-user) with related permissions. :return: None. """# Create global all-dag permissionsself.create_perm_vm_for_all_dag()# Sync the default roles (Admin, Viewer, User, Op, public) with related permissionsself.bulk_sync_roles(self.ROLE_CONFIGS)self.add_homepage_access_to_custom_roles()# init existing roles, the rest role could be created through UI.self.update_admin_permission()self.clean_perms()defsync_resource_permissions(self,perms=None):"""Populates resource-based permissions."""ifnotperms:returnforaction_name,resource_nameinperms:self.create_resource(resource_name)self.create_permission(action_name,resource_name)defsync_perm_for_dag(self,dag_id,access_control=None):""" Sync permissions for given dag id. The dag id surely exists in our dag bag as only / refresh button or DagBag will call this function :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str :param access_control: a dict where each key is a rolename and each value is a set() of action names (e.g., {'can_read'} :type access_control: dict :return: """dag_resource_name=permissions.resource_name_for_dag(dag_id)fordag_action_nameinself.DAG_ACTIONS:self.create_permission(dag_action_name,dag_resource_name)ifaccess_control:self._sync_dag_view_permissions(dag_resource_name,access_control)defget_resource_permissions(self,resource:ViewMenu)->PermissionView:""" Retrieve permission pairs associated with a specific resource object. :param resource: Object representing a single resource. :type resource: ViewMenu :return: Permission objects representing resource->action pair :rtype: PermissionView """returnself.find_permissions_view_menu(resource)def_sync_dag_view_permissions(self,dag_id,access_control):""" Set the access policy on the given DAG's ViewModel. :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str :param access_control: a dict where each key is a rolename and each value is a set() of action names (e.g. {'can_read'}) :type access_control: dict """dag_resource_name=permissions.resource_name_for_dag(dag_id)def_get_or_create_dag_permission(action_name:str)->PermissionView:perm=self.get_permission(action_name,dag_resource_name)ifnotperm:self.log.info("Creating new action '%s' on resource '%s'",action_name,dag_resource_name)perm=self.create_permission(action_name,dag_resource_name)returnpermdef_revoke_stale_permissions(resource:ViewMenu):existing_dag_perms=self.get_resource_permissions(resource)forperminexisting_dag_perms:non_admin_roles=[roleforroleinperm.roleifrole.name!='Admin']forroleinnon_admin_roles:target_perms_for_role=access_control.get(role.name,{})ifperm.permission.namenotintarget_perms_for_role:self.log.info("Revoking '%s' on DAG '%s' for role '%s'",perm.permission,dag_resource_name,role.name,)self.remove_permission_from_role(role,perm)resource=self.get_resource(dag_resource_name)ifresource:_revoke_stale_permissions(resource)forrolename,action_namesinaccess_control.items():role=self.find_role(rolename)ifnotrole:raiseAirflowException("The access_control mapping for DAG '{}' includes a role ""named '{}', but that role does not exist".format(dag_id,rolename))action_names=set(action_names)invalid_action_names=action_names-self.DAG_ACTIONSifinvalid_action_names:raiseAirflowException("The access_control map for DAG '{}' includes the following ""invalid permissions: {}; The set of valid permissions ""is: {}".format(dag_resource_name,invalid_action_names,self.DAG_ACTIONS))foraction_nameinaction_names:dag_perm=_get_or_create_dag_permission(action_name)self.add_permission_to_role(role,dag_perm)defcreate_resource(self,name:str)->ViewMenu:""" Create a resource with the given name. :param name: The name of the resource to create created. :type name: str :return: The FAB resource created. :rtype: ViewMenu """returnself.add_view_menu(name)defcreate_perm_vm_for_all_dag(self):"""Create perm-vm if not exist and insert into FAB security model for all-dags."""# create perm for global logical dagforresource_nameinself.DAG_RESOURCES:foraction_nameinself.DAG_ACTIONS:self._merge_perm(action_name,resource_name)defcheck_authorization(self,perms:Optional[Sequence[Tuple[str,str]]]=None,dag_id:Optional[str]=None)->bool:"""Checks that the logged in user has the specified permissions."""ifnotperms:returnTrueforperminperms:ifpermin((permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG),):can_access_all_dags=self.has_access(*perm)ifcan_access_all_dags:continueaction=perm[0]ifself.can_access_some_dags(action,dag_id):continuereturnFalseelifnotself.has_access(*perm):returnFalsereturnTruedefreset_all_permissions(self)->None:""" Deletes all permission records and removes from roles, then re-syncs them. :return: None :rtype: None """session=self.get_sessionforroleinself.get_all_roles():role.permissions=[]session.commit()session.query(PermissionView).delete()session.query(ViewMenu).delete()session.query(Permission).delete()session.commit()self.sync_roles()classApplessAirflowSecurityManager(AirflowSecurityManager):"""Security Manager that doesn't need the whole flask app"""def__init__(self,session=None):self.session=session@propertydefget_session(self):returnself.session