Source code for airflow.providers.fab.auth_manager.security_manager.override
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcopyimportdatetimeimportitertoolsimportloggingimportosimportrandomimportuuidimportwarningsfromtypingimportTYPE_CHECKING,Any,Callable,Collection,Container,Iterable,Mapping,Sequenceimportjwtimportpackaging.versionimportre2fromdeprecatedimportdeprecatedfromflaskimportflash,g,has_request_context,sessionfromflask_appbuilderimportconstfromflask_appbuilder.constimport(AUTH_DB,AUTH_LDAP,AUTH_OAUTH,AUTH_OID,AUTH_REMOTE_USER,LOGMSG_ERR_SEC_ADD_REGISTER_USER,LOGMSG_ERR_SEC_AUTH_LDAP,LOGMSG_ERR_SEC_AUTH_LDAP_TLS,LOGMSG_WAR_SEC_LOGIN_FAILED,LOGMSG_WAR_SEC_NOLDAP_OBJ,MICROSOFT_KEY_SET_URL,)fromflask_appbuilder.models.sqlaimportBasefromflask_appbuilder.models.sqla.interfaceimportSQLAInterfacefromflask_appbuilder.security.registerviewsimport(RegisterUserDBView,RegisterUserOAuthView,RegisterUserOIDView,)fromflask_appbuilder.security.viewsimport(AuthDBView,AuthLDAPView,AuthOAuthView,AuthOIDView,AuthRemoteUserView,AuthView,RegisterUserModelView,)fromflask_appbuilder.viewsimportexposefromflask_babelimportlazy_gettextfromflask_jwt_extendedimportJWTManager,current_userascurrent_user_jwtfromflask_loginimportLoginManagerfromitsdangerousimportwant_bytesfrommarkupsafeimportMarkupfromsqlalchemyimportand_,func,inspect,literal,or_,selectfromsqlalchemy.excimportMultipleResultsFoundfromsqlalchemy.ormimportSession,joinedloadfromwerkzeug.securityimportcheck_password_hash,generate_password_hashfromairflowimport__version__asairflow_versionfromairflow.auth.managers.utils.fabimportget_method_from_fab_action_mapfromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarning,RemovedInAirflow3Warningfromairflow.modelsimportDagBag,DagModelfromairflow.providers.fab.auth_manager.modelsimport(Action,Permission,RegisterUser,Resource,Role,User,assoc_permission_role,)fromairflow.providers.fab.auth_manager.models.anonymous_userimportAnonymousUserfromairflow.providers.fab.auth_manager.security_manager.constantsimportEXISTING_ROLESfromairflow.providers.fab.auth_manager.views.permissionsimport(ActionModelView,PermissionPairModelView,ResourceModelView,)fromairflow.providers.fab.auth_manager.views.roles_listimportCustomRoleModelViewfromairflow.providers.fab.auth_manager.views.userimport(CustomUserDBModelView,CustomUserLDAPModelView,CustomUserOAuthModelView,CustomUserOIDModelView,CustomUserRemoteUserModelView,)fromairflow.providers.fab.auth_manager.views.user_editimport(CustomResetMyPasswordView,CustomResetPasswordView,CustomUserInfoEditView,)fromairflow.providers.fab.auth_manager.views.user_statsimportCustomUserStatsChartViewfromairflow.securityimportpermissionsfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromairflow.www.extensions.init_auth_managerimportget_auth_managerfromairflow.www.security_managerimportAirflowSecurityManagerV2fromairflow.www.sessionimportAirflowDatabaseSessionInterfaceifTYPE_CHECKING:fromairflow.auth.managers.base_auth_managerimportResourceMethodfromairflow.security.permissionsimportRESOURCE_ASSETelse:fromairflow.providers.common.compat.security.permissionsimportRESOURCE_ASSET
# This is the limit of DB user sessions that we consider as "healthy". If you have more sessions that this# number then we will refuse to delete sessions that have expired and old user sessions when resetting# user's password, and raise a warning in the UI instead. Usually when you have that many sessions, it means# that there is something wrong with your deployment - for example you have an automated API call that# continuously creates new sessions. Such setup should be fixed by reusing sessions or by periodically# purging the old sessions by using `airflow db clean` command.
# The following logic patches the logout method within AuthView, so it supports POST method# to make CSRF protection effective. It is backward-compatible with Airflow versions <= 2.9.2 as it still# allows utilizing the GET method for them.# You could remove the patch and configure it when it is supported# natively by Flask-AppBuilder (https://github.com/dpgaspar/Flask-AppBuilder/issues/2248)ifpackaging.version.parse(packaging.version.parse(airflow_version).base_version)<packaging.version.parse("2.10.0"):_methods=["GET","POST"]else:_methods=["POST"]class_ModifiedAuthView(AuthView):@expose("/logout/",methods=_methods)deflogout(self):returnsuper().logout()forauth_viewin[AuthDBView,AuthLDAPView,AuthOAuthView,AuthOIDView,AuthRemoteUserView]:auth_view.__bases__=(_ModifiedAuthView,)
[docs]classFabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):""" This security manager overrides the default AirflowSecurityManager security manager. This security manager is used only if the auth manager FabAuthManager is used. It defines everything in the security manager that is needed for the FabAuthManager to work. Any operation specific to the AirflowSecurityManager should be defined here instead of AirflowSecurityManager. :param appbuilder: The appbuilder. """
self._init_config()self._init_auth()self._init_data_model()# can only call super once data model init has been done# because of the view.datamodel hack that's done in the init there.super().__init__(appbuilder=appbuilder)self._builtin_roles:dict=self.create_builtin_roles()self.create_db()# Setup Flask login
# Setup Flask-Jwt-Extendedself.create_jwt_manager()def_get_authentik_jwks(self,jwks_url)->dict:importrequestsresp=requests.get(jwks_url)ifresp.status_code==200:returnresp.json()return{}def_validate_jwt(self,id_token,jwks):fromauthlib.joseimportJsonWebKey,jwtasauthlib_jwtkeyset=JsonWebKey.import_key_set(jwks)claims=authlib_jwt.decode(id_token,keyset)claims.validate()log.info("JWT token is validated")returnclaimsdef_get_authentik_token_info(self,id_token):me=jwt.decode(id_token,options={"verify_signature":False})verify_signature=self.oauth_remotes["authentik"].client_kwargs.get("verify_signature",True)ifverify_signature:# Validate the token using authentik certificatejwks_uri=self.oauth_remotes["authentik"].server_metadata.get("jwks_uri")ifjwks_uri:jwks=self._get_authentik_jwks(jwks_uri)ifjwks:returnself._validate_jwt(id_token,jwks)else:log.error("jwks_uri not specified in OAuth Providers, could not verify token signature")else:# Return the token info without validatinglog.warning("JWT token is not validated!")returnmeraiseAirflowException("OAuth signature verify failed")
[docs]defregister_views(self):"""Register FAB auth manager related views."""ifnotself.appbuilder.get_app.config.get("FAB_ADD_SECURITY_VIEWS",True):returnifself.auth_user_registration:ifself.auth_type==AUTH_DB:self.registeruser_view=self.registeruserdbview()elifself.auth_type==AUTH_OID:self.registeruser_view=self.registeruseroidview()elifself.auth_type==AUTH_OAUTH:self.registeruser_view=self.registeruseroauthview()ifself.registeruser_view:self.appbuilder.add_view_no_menu(self.registeruser_view)self.appbuilder.add_view_no_menu(self.resetpasswordview())self.appbuilder.add_view_no_menu(self.resetmypasswordview())self.appbuilder.add_view_no_menu(self.userinfoeditview())ifself.auth_type==AUTH_DB:self.user_view=self.userdbmodelviewself.auth_view=self.authdbview()elifself.auth_type==AUTH_LDAP:self.user_view=self.userldapmodelviewself.auth_view=self.authldapview()elifself.auth_type==AUTH_OAUTH:self.user_view=self.useroauthmodelviewself.auth_view=self.authoauthview()elifself.auth_type==AUTH_REMOTE_USER:self.user_view=self.userremoteusermodelviewself.auth_view=self.authremoteuserview()else:self.user_view=self.useroidmodelviewself.auth_view=self.authoidview()self.appbuilder.add_view_no_menu(self.auth_view)# this needs to be done after the view is added, otherwise the blueprint# is not initializedifself.is_auth_limited:self.limiter.limit(self.auth_rate_limit,methods=["POST"])(self.auth_view.blueprint)self.user_view=self.appbuilder.add_view(self.user_view,"List Users",icon="fa-user",label=lazy_gettext("List Users"),category="Security",category_icon="fa-cogs",category_label=lazy_gettext("Security"),)role_view=self.appbuilder.add_view(self.rolemodelview,"List Roles",icon="fa-group",label=lazy_gettext("List Roles"),category="Security",category_icon="fa-cogs",)role_view.related_views=[self.user_view.__class__]ifself.userstatschartview:self.appbuilder.add_view(self.userstatschartview,"User's Statistics",icon="fa-bar-chart-o",label=lazy_gettext("User's Statistics"),category="Security",)ifself.auth_user_registration:self.appbuilder.add_view(self.registerusermodelview,"User's Statistics",icon="fa-user-plus",label=lazy_gettext("User Registrations"),category="Security",)self.appbuilder.menu.add_separator("Security")ifself.appbuilder.get_app.config.get("FAB_ADD_SECURITY_PERMISSION_VIEW",True):self.appbuilder.add_view(self.actionmodelview,"Actions",icon="fa-lock",label=lazy_gettext("Actions"),category="Security",)ifself.appbuilder.get_app.config.get("FAB_ADD_SECURITY_VIEW_MENU_VIEW",True):self.appbuilder.add_view(self.resourcemodelview,"Resources",icon="fa-list-alt",label=lazy_gettext("Resources"),category="Security",)ifself.appbuilder.get_app.config.get("FAB_ADD_SECURITY_PERMISSION_VIEWS_VIEW",True):self.appbuilder.add_view(self.permissionmodelview,"Permission Pairs",icon="fa-link",label=lazy_gettext("Permissions"),category="Security",)
[docs]defcreate_login_manager(self)->LoginManager:"""Create the login manager."""lm=LoginManager(self.appbuilder.app)lm.anonymous_user=AnonymousUserlm.login_view="login"lm.user_loader(self.load_user)returnlm
[docs]defcreate_jwt_manager(self):"""Create the JWT manager."""jwt_manager=JWTManager()jwt_manager.init_app(self.appbuilder.app)jwt_manager.user_lookup_loader(self.load_user_jwt)
[docs]defreset_password(self,userid:int,password:str)->bool:""" Change/Reset a user's password for auth db. Password will be hashed and saved. :param userid: the user id to reset the password :param password: the clear text password to reset and save hashed on the db """user=self.get_user_by_id(userid)user.password=generate_password_hash(password)self.reset_user_sessions(user)returnself.update_user(user)
[docs]defreset_user_sessions(self,user:User)->None:ifisinstance(self.appbuilder.get_app.session_interface,AirflowDatabaseSessionInterface):interface=self.appbuilder.get_app.session_interfacesession=interface.db.sessionuser_session_model=interface.sql_session_modelnum_sessions=session.query(user_session_model).count()ifnum_sessions>MAX_NUM_DATABASE_USER_SESSIONS:self._cli_safe_flash(f"The old sessions for user {user.username} have <b>NOT</b> been deleted!<br>"f"You have a lot ({num_sessions}) of user sessions in the 'SESSIONS' table in "f"your database.<br> ""This indicates that this deployment might have an automated API calls that create ""and not reuse sessions.<br>You should consider reusing sessions or cleaning them ""periodically using db clean.<br>""Make sure to reset password for the user again after cleaning the session table ""to remove old sessions of the user.","warning",)else:forsinsession.query(user_session_model):session_details=interface.serializer.loads(want_bytes(s.data))ifsession_details.get("_user_id")==user.id:session.delete(s)session.commit()else:self._cli_safe_flash("Since you are using `securecookie` session backend mechanism, we cannot prevent "f"some old sessions for user {user.username} to be reused.<br> If you want to make sure ""that the user is logged out from all sessions, you should consider using ""`database` session backend mechanism.<br> You can also change the 'secret_key` ""webserver configuration for all your webserver instances and restart the webserver. ""This however will logout all users from all sessions.","warning",)
[docs]defload_user_jwt(self,_jwt_header,jwt_data):identity=jwt_data["sub"]user=self.load_user(identity)ifuser.is_active:# Set flask g.user to JWT user, we can't do it on before requestg.user=userreturnuser
@property
[docs]defauth_type(self):"""Get the auth type."""returnself.appbuilder.get_app.config["AUTH_TYPE"]
@property
[docs]defis_auth_limited(self)->bool:"""Is the auth rate limited."""returnself.appbuilder.get_app.config["AUTH_RATE_LIMITED"]
@property
[docs]defauth_rate_limit(self)->str:"""Get the auth rate limit."""returnself.appbuilder.get_app.config["AUTH_RATE_LIMIT"]
@property
[docs]defauth_role_public(self):"""Get the public role."""returnself.appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC",None)
[docs]defauth_ldap_group_field(self)->str:"""LDAP group field."""returnself.appbuilder.get_app.config["AUTH_LDAP_GROUP_FIELD"]
@property
[docs]defauth_roles_mapping(self)->dict[str,list[str]]:"""The mapping of auth roles."""returnself.appbuilder.get_app.config["AUTH_ROLES_MAPPING"]
@property
[docs]defauth_user_registration_role_jmespath(self)->str:"""The JMESPATH role to use for user registration."""returnself.appbuilder.get_app.config["AUTH_USER_REGISTRATION_ROLE_JMESPATH"]
[docs]defauth_user_registration(self):"""Will user self registration be allowed."""returnself.appbuilder.get_app.config["AUTH_USER_REGISTRATION"]
@property
[docs]defauth_user_registration_role(self):"""The default user self registration role."""returnself.appbuilder.get_app.config["AUTH_USER_REGISTRATION_ROLE"]
@property
[docs]defauth_roles_sync_at_login(self)->bool:"""Should roles be synced at login."""returnself.appbuilder.get_app.config["AUTH_ROLES_SYNC_AT_LOGIN"]
@property
[docs]defauth_role_admin(self):"""Get the admin role."""returnself.appbuilder.get_app.config["AUTH_ROLE_ADMIN"]
@property@deprecated(reason="The 'oauth_whitelists' property is deprecated. Please use 'oauth_allow_list' instead.",category=AirflowProviderDeprecationWarning,)
[docs]defcreate_builtin_roles(self):"""Return FAB builtin roles."""returnself.appbuilder.get_app.config.get("FAB_ROLES",{})
@property
[docs]defbuiltin_roles(self):"""Get the builtin roles."""returnself._builtin_roles
[docs]defcreate_admin_standalone(self)->tuple[str|None,str|None]:"""Create an Admin user with a random password so that users can access airflow."""fromairflow.configurationimportAIRFLOW_HOME,make_group_other_inaccessibleuser_name="admin"# We want a streamlined first-run experience, but we do not want to# use a preset password as people will inevitably run this on a public# server. Thus, we make a random password and store it in AIRFLOW_HOME,# with the reasoning that if you can read that directory, you can see# the database credentials anyway.password_path=os.path.join(AIRFLOW_HOME,"standalone_admin_password.txt")user_exists=self.find_user(user_name)isnotNonewe_know_password=os.path.isfile(password_path)# If the user does not exist, make a random password and make itifnotuser_exists:print(f"FlaskAppBuilder Authentication Manager: Creating {user_name} user")if(role:=self.find_role("Admin"))isNone:raiseAirflowException("Unable to find role 'Admin'")# password does not contain visually similar characters: ijlIJL1oO0password="".join(random.choices("abcdefghkmnpqrstuvwxyzABCDEFGHKMNPQRSTUVWXYZ23456789",k=16))withopen(password_path,"w")asfile:file.write(password)make_group_other_inaccessible(password_path)self.add_user(user_name,"Admin","User","admin@example.com",role,password)print(f"FlaskAppBuilder Authentication Manager: Created {user_name} user")# If the user does exist, and we know its password, read the passwordelifuser_existsandwe_know_password:withopen(password_path)asfile:password=file.read().strip()# Otherwise we don't know the passwordelse:password=Nonereturnuser_name,password
def_init_config(self):""" Initialize config. :meta private: """app=self.appbuilder.get_app# Base Security Configapp.config.setdefault("AUTH_ROLE_ADMIN","Admin")app.config.setdefault("AUTH_TYPE",AUTH_DB)# Self Registrationapp.config.setdefault("AUTH_USER_REGISTRATION",False)app.config.setdefault("AUTH_USER_REGISTRATION_ROLE",self.auth_role_public)app.config.setdefault("AUTH_USER_REGISTRATION_ROLE_JMESPATH",None)# Role Mappingapp.config.setdefault("AUTH_ROLES_MAPPING",{})app.config.setdefault("AUTH_ROLES_SYNC_AT_LOGIN",False)app.config.setdefault("AUTH_API_LOGIN_ALLOW_MULTIPLE_PROVIDERS",False)frompackaging.versionimportVersionfromwerkzeugimport__version__aswerkzeug_versionparsed_werkzeug_version=Version(werkzeug_version)ifparsed_werkzeug_version<Version("3.0.0"):app.config.setdefault("AUTH_DB_FAKE_PASSWORD_HASH_CHECK","pbkdf2:sha256:150000$Z3t6fmj2$22da622d94a1f8118""c0976a03d2f18f680bfff877c9a965db9eedc51bc0be87c",)else:app.config.setdefault("AUTH_DB_FAKE_PASSWORD_HASH_CHECK","scrypt:32768:8:1$wiDa0ruWlIPhp9LM$6e409d093e62ad54df2af895d0e125b05ff6cf6414""8350189ffc4bcc71286edf1b8ad94a442c00f890224bf2b32153d0750c89ee9""401e62f9dcee5399065e4e5",)# LDAP Configifself.auth_type==AUTH_LDAP:if"AUTH_LDAP_SERVER"notinapp.config:raiseValueError("No AUTH_LDAP_SERVER defined on config with AUTH_LDAP authentication type.")app.config.setdefault("AUTH_LDAP_SEARCH","")app.config.setdefault("AUTH_LDAP_SEARCH_FILTER","")app.config.setdefault("AUTH_LDAP_APPEND_DOMAIN","")app.config.setdefault("AUTH_LDAP_USERNAME_FORMAT","")app.config.setdefault("AUTH_LDAP_BIND_USER","")app.config.setdefault("AUTH_LDAP_BIND_PASSWORD","")# TLS optionsapp.config.setdefault("AUTH_LDAP_USE_TLS",False)app.config.setdefault("AUTH_LDAP_ALLOW_SELF_SIGNED",False)app.config.setdefault("AUTH_LDAP_TLS_DEMAND",False)app.config.setdefault("AUTH_LDAP_TLS_CACERTDIR","")app.config.setdefault("AUTH_LDAP_TLS_CACERTFILE","")app.config.setdefault("AUTH_LDAP_TLS_CERTFILE","")app.config.setdefault("AUTH_LDAP_TLS_KEYFILE","")# Mapping optionsapp.config.setdefault("AUTH_LDAP_UID_FIELD","uid")app.config.setdefault("AUTH_LDAP_GROUP_FIELD","memberOf")app.config.setdefault("AUTH_LDAP_FIRSTNAME_FIELD","givenName")app.config.setdefault("AUTH_LDAP_LASTNAME_FIELD","sn")app.config.setdefault("AUTH_LDAP_EMAIL_FIELD","mail")ifself.auth_type==AUTH_REMOTE_USER:app.config.setdefault("AUTH_REMOTE_USER_ENV_VAR","REMOTE_USER")# Rate limitingapp.config.setdefault("AUTH_RATE_LIMITED",True)app.config.setdefault("AUTH_RATE_LIMIT","5 per 40 second")def_init_auth(self):""" Initialize authentication configuration. :meta private: """app=self.appbuilder.get_appifself.auth_type==AUTH_OID:fromflask_openidimportOpenIDlog.warning("AUTH_OID is deprecated and will be removed in version 5. ""Migrate to other authentication methods.")self.oid=OpenID(app)ifself.auth_type==AUTH_OAUTH:fromauthlib.integrations.flask_clientimportOAuthself.oauth=OAuth(app)self.oauth_remotes={}forproviderinself.oauth_providers:provider_name=provider["name"]log.debug("OAuth providers init %s",provider_name)obj_provider=self.oauth.register(provider_name,**provider["remote_app"])obj_provider._tokengetter=self.oauth_token_getterifnotself.oauth_user_info:self.oauth_user_info=self.get_oauth_user_info# Whitelist only users with matching emailsif"whitelist"inprovider:self.oauth_allow_list[provider_name]=provider["whitelist"]self.oauth_remotes[provider_name]=obj_providerdef_init_data_model(self):user_data_model=SQLAInterface(self.user_model)ifself.auth_type==const.AUTH_DB:self.userdbmodelview.datamodel=user_data_modelelifself.auth_type==const.AUTH_LDAP:self.userldapmodelview.datamodel=user_data_modelelifself.auth_type==const.AUTH_OID:self.useroidmodelview.datamodel=user_data_modelelifself.auth_type==const.AUTH_OAUTH:self.useroauthmodelview.datamodel=user_data_modelelifself.auth_type==const.AUTH_REMOTE_USER:self.userremoteusermodelview.datamodel=user_data_modelifself.userstatschartview:self.userstatschartview.datamodel=user_data_modelifself.auth_user_registration:self.registerusermodelview.datamodel=SQLAInterface(self.registeruser_model)self.rolemodelview.datamodel=SQLAInterface(self.role_model)self.actionmodelview.datamodel=SQLAInterface(self.action_model)self.resourcemodelview.datamodel=SQLAInterface(self.resource_model)self.permissionmodelview.datamodel=SQLAInterface(self.permission_model)
[docs]defcreate_db(self):""" Create the database. Creates admin and public roles if they don't exist. """ifnotself.appbuilder.update_perms:log.debug("Skipping db since appbuilder disables update_perms")returntry:engine=self.get_session.get_bind(mapper=None,clause=None)inspector=inspect(engine)if"ab_user"notininspector.get_table_names():log.info(const.LOGMSG_INF_SEC_NO_DB)Base.metadata.create_all(engine)log.info(const.LOGMSG_INF_SEC_ADD_DB)roles_mapping=self.appbuilder.get_app.config.get("FAB_ROLES_MAPPING",{})forpk,nameinroles_mapping.items():self.update_role(pk,name)forrole_nameinself._builtin_roles:self.add_role(role_name)ifself.auth_role_adminnotinself._builtin_roles:self.add_role(self.auth_role_admin)ifself.auth_role_public:self.add_role(self.auth_role_public)ifself.count_users()==0andself.auth_role_public!=self.auth_role_admin:log.warning(const.LOGMSG_WAR_SEC_NO_USER)exceptException:log.exception(const.LOGMSG_ERR_SEC_CREATE_DB)exit(1)
[docs]defget_readable_dags(self,user)->Iterable[DagModel]:"""Get the DAGs readable by authenticated user."""warnings.warn("`get_readable_dags` has been deprecated. Please use `get_auth_manager().get_permitted_dag_ids` ""instead.",RemovedInAirflow3Warning,stacklevel=2,)withwarnings.catch_warnings():warnings.simplefilter("ignore",RemovedInAirflow3Warning)returnself.get_accessible_dags([permissions.ACTION_CAN_READ],user)
[docs]defget_editable_dags(self,user)->Iterable[DagModel]:"""Get the DAGs editable by authenticated user."""warnings.warn("`get_editable_dags` has been deprecated. Please use `get_auth_manager().get_permitted_dag_ids` ""instead.",RemovedInAirflow3Warning,stacklevel=2,)withwarnings.catch_warnings():warnings.simplefilter("ignore",RemovedInAirflow3Warning)returnself.get_accessible_dags([permissions.ACTION_CAN_EDIT],user)
@provide_session
[docs]defget_accessible_dags(self,user_actions:Container[str]|None,user,session:Session=NEW_SESSION,)->Iterable[DagModel]:warnings.warn("`get_accessible_dags` has been deprecated. Please use ""`get_auth_manager().get_permitted_dag_ids` instead.",RemovedInAirflow3Warning,stacklevel=3,)dag_ids=self.get_accessible_dag_ids(user,user_actions,session)returnsession.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
@provide_session
[docs]defget_accessible_dag_ids(self,user,user_actions:Container[str]|None=None,session:Session=NEW_SESSION,)->set[str]:warnings.warn("`get_accessible_dag_ids` has been deprecated. Please use ""`get_auth_manager().get_permitted_dag_ids` instead.",RemovedInAirflow3Warning,stacklevel=3,)ifnotuser_actions:user_actions=[permissions.ACTION_CAN_EDIT,permissions.ACTION_CAN_READ]method_from_fab_action_map=get_method_from_fab_action_map()user_methods:Container[ResourceMethod]=[method_from_fab_action_map[action]foractioninmethod_from_fab_action_mapifactioninuser_actions]returnget_auth_manager().get_permitted_dag_ids(user=user,methods=user_methods,session=session)
@staticmethod
[docs]defget_readable_dag_ids(user=None)->set[str]:"""Get the DAG IDs readable by authenticated user."""returnget_auth_manager().get_permitted_dag_ids(methods=["GET"],user=user)
@staticmethod
[docs]defget_editable_dag_ids(user=None)->set[str]:"""Get the DAG IDs editable by authenticated user."""returnget_auth_manager().get_permitted_dag_ids(methods=["PUT"],user=user)
[docs]defcan_access_some_dags(self,action:str,dag_id:str|None=None)->bool:"""Check if user has read or write access to some dags."""ifdag_idanddag_id!="~":root_dag_id=self._get_root_dag_id(dag_id)returnself.has_access(action,self._resource_name(root_dag_id,permissions.RESOURCE_DAG))user=g.userifaction==permissions.ACTION_CAN_READ:returnany(self.get_readable_dag_ids(user))returnany(self.get_editable_dag_ids(user))
[docs]defget_all_permissions(self)->set[tuple[str,str]]:"""Return all permissions as a set of tuples with the action and resource names."""returnset(self.appbuilder.get_session.execute(select(self.action_model.name,self.resource_model.name).join(self.permission_model.action).join(self.permission_model.resource)))
[docs]defcreate_dag_specific_permissions(self)->None:""" Add permissions to all DAGs. Creates 'can_read', 'can_edit', and 'can_delete' permissions for all DAGs, along with any `access_control` permissions provided in them. This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` if you only need to sync a single DAG. """perms=self.get_all_permissions()dagbag=DagBag(read_dags_from_db=True)dagbag.collect_dags_from_db()dags=dagbag.dags.values()fordagindags:# TODO: Remove this when the minimum version of Airflow is bumped to 3.0root_dag_id=(getattr(dag,"parent_dag",None)ordag).dag_idforresource_name,resource_valuesinself.RESOURCE_DETAILS_MAP.items():dag_resource_name=self._resource_name(root_dag_id,resource_name)foraction_nameinresource_values["actions"]:if(action_name,dag_resource_name)notinperms:self._merge_perm(action_name,dag_resource_name)ifdag.access_controlisnotNone:self.sync_perm_for_dag(root_dag_id,dag.access_control)
[docs]defprefixed_dag_id(self,dag_id:str)->str:"""Return the permission name for a DAG id."""warnings.warn("`prefixed_dag_id` has been deprecated. ""Please use `airflow.security.permissions.resource_name` instead.",RemovedInAirflow3Warning,stacklevel=2,)root_dag_id=self._get_root_dag_id(dag_id)returnself._resource_name(root_dag_id,permissions.RESOURCE_DAG)
[docs]defis_dag_resource(self,resource_name:str)->bool:"""Determine if a resource belongs to a DAG or all DAGs."""ifresource_name==permissions.RESOURCE_DAG:returnTruereturnresource_name.startswith(permissions.RESOURCE_DAG_PREFIX)
[docs]defsync_perm_for_dag(self,dag_id:str,access_control:Mapping[str,Mapping[str,Collection[str]]|Collection[str]]|None=None,)->None:""" Sync permissions for given dag id. The dag id surely exists in our dag bag as only / refresh button or DagBag will call this function. :param dag_id: the ID of the DAG whose permissions should be updated :param access_control: a dict where each key is a role name and each value can be: - a set() of DAGs resource action names (e.g. `{'can_read'}`) - or a dict where each key is a resource name ('DAGs' or 'DAG Runs') and each value is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 'DAGs': {'can_read'}}`) :return: """forresource_name,resource_valuesinself.RESOURCE_DETAILS_MAP.items():dag_resource_name=self._resource_name(dag_id,resource_name)fordag_action_nameinresource_values["actions"]:self.create_permission(dag_action_name,dag_resource_name)ifaccess_controlisnotNone:self.log.debug("Syncing DAG-level permissions for DAG '%s'",dag_id)self._sync_dag_view_permissions(dag_id,copy.copy(access_control))else:self.log.debug("Not syncing DAG-level permissions for DAG '%s' as access control is unset.",dag_id,)
def_resource_name(self,dag_id:str,resource_name:str)->str:""" Get the resource name from permissions. This method is to keep compatibility with new FAB versions running with old airflow versions. """ifhasattr(permissions,"resource_name"):returngetattr(permissions,"resource_name")(dag_id,resource_name)returngetattr(permissions,"resource_name_for_dag")(dag_id)def_sync_dag_view_permissions(self,dag_id:str,access_control:Mapping[str,Mapping[str,Collection[str]]|Collection[str]],)->None:""" Set the access policy on the given DAG's ViewModel. :param dag_id: the ID of the DAG whose permissions should be updated :param access_control: a dict where each key is a role name and each value is: - a dict where each key is a resource name ('DAGs' or 'DAG Runs') and each value is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 'DAGs': {'can_read'}}`) """def_get_or_create_dag_permission(action_name:str,dag_resource_name:str)->Permission|None:perm=self.get_permission(action_name,dag_resource_name)ifnotperm:self.log.info("Creating new action '%s' on resource '%s'",action_name,dag_resource_name)perm=self.create_permission(action_name,dag_resource_name)returnperm# Revoking stale permissions for all possible DAG level resourcesforresource_nameinself.RESOURCE_DETAILS_MAP.keys():dag_resource_name=self._resource_name(dag_id,resource_name)ifresource:=self.get_resource(dag_resource_name):existing_dag_perms=self.get_resource_permissions(resource)forperminexisting_dag_perms:non_admin_roles=[roleforroleinperm.roleifrole.name!="Admin"]forroleinnon_admin_roles:access_control_role=access_control.get(role.name)target_perms_for_role=set()ifaccess_control_role:ifisinstance(access_control_role,set):target_perms_for_role=access_control_roleelifisinstance(access_control_role,dict):target_perms_for_role=access_control_role.get(resource_name,set())ifperm.action.namenotintarget_perms_for_role:self.log.info("Revoking '%s' on DAG '%s' for role '%s'",perm.action,dag_resource_name,role.name,)self.remove_permission_from_role(role,perm)# Adding the access control permissionsforrolename,resource_actionsinaccess_control.items():role=self.find_role(rolename)ifnotrole:raiseAirflowException(f"The access_control mapping for DAG '{dag_id}' includes a role named "f"'{rolename}', but that role does not exist")ifnotisinstance(resource_actions,dict):# Support for old-style access_control where only the actions are specifiedresource_actions={permissions.RESOURCE_DAG:set(resource_actions)}forresource_name,actionsinresource_actions.items():ifresource_namenotinself.RESOURCE_DETAILS_MAP:raiseAirflowException(f"The access_control map for DAG '{dag_id}' includes the following invalid "f"resource name: '{resource_name}'; "f"The set of valid resource names is: {self.RESOURCE_DETAILS_MAP.keys()}")dag_resource_name=self._resource_name(dag_id,resource_name)self.log.debug("Syncing DAG-level permissions for DAG '%s'",dag_resource_name)invalid_actions=set(actions)-self.RESOURCE_DETAILS_MAP[resource_name]["actions"]ifinvalid_actions:raiseAirflowException(f"The access_control map for DAG '{dag_resource_name}' includes "f"the following invalid permissions: {invalid_actions}; "f"The set of valid permissions is: {self.RESOURCE_DETAILS_MAP[resource_name]['actions']}")foraction_nameinactions:dag_perm=_get_or_create_dag_permission(action_name,dag_resource_name)ifdag_perm:self.add_permission_to_role(role,dag_perm)
[docs]defadd_permissions_view(self,base_action_names,resource_name):# Keep name for compatibility with FAB.""" Add an action on a resource to the backend. :param base_action_names: list of permissions from view (all exposed methods): 'can_add','can_edit' etc... :param resource_name: name of the resource to add """resource=self.create_resource(resource_name)perms=self.get_resource_permissions(resource)ifnotperms:# No permissions yet on this viewforaction_nameinbase_action_names:action=self.create_permission(action_name,resource_name)ifself.auth_role_adminnotinself.builtin_roles:admin_role=self.find_role(self.auth_role_admin)self.add_permission_to_role(admin_role,action)else:# Permissions on this view exist but....admin_role=self.find_role(self.auth_role_admin)foraction_nameinbase_action_names:# Check if base view permissions existifnotself.perms_include_action(perms,action_name):action=self.create_permission(action_name,resource_name)ifself.auth_role_adminnotinself.builtin_roles:self.add_permission_to_role(admin_role,action)forperminperms:ifperm.actionisNone:# Skip this perm, it has a null permissioncontinueifperm.action.namenotinbase_action_names:# perm to deleteroles=self.get_all_roles()# del permission from all rolesforroleinroles:# TODO: An action can't be removed from a role.# This is a bug in FAB. It has been reported.self.remove_permission_from_role(role,perm)self.delete_permission(perm.action.name,resource_name)elifself.auth_role_adminnotinself.builtin_rolesandpermnotinadmin_role.permissions:# Role Admin must have all permissionsself.add_permission_to_role(admin_role,perm)
[docs]defadd_permissions_menu(self,resource_name):""" Add menu_access to resource on permission_resource. :param resource_name: The resource name """self.create_resource(resource_name)perm=self.get_permission("menu_access",resource_name)ifnotperm:perm=self.create_permission("menu_access",resource_name)ifself.auth_role_adminnotinself.builtin_roles:role_admin=self.find_role(self.auth_role_admin)self.add_permission_to_role(role_admin,perm)
[docs]defsecurity_cleanup(self,baseviews,menus):""" Cleanup all unused permissions from the database. :param baseviews: A list of BaseViews class :param menus: Menu class """resources=self.get_all_resources()roles=self.get_all_roles()forresourceinresources:found=Falseforbaseviewinbaseviews:ifresource.name==baseview.class_permission_name:found=Truebreakifmenus.find(resource.name):found=Trueifnotfound:permissions=self.get_resource_permissions(resource)forpermissioninpermissions:forroleinroles:self.remove_permission_from_role(role,permission)self.delete_permission(permission.action.name,resource.name)self.delete_resource(resource.name)
[docs]defsync_roles(self)->None:""" Initialize default and custom roles with related permissions. 1. Init the default role(Admin, Viewer, User, Op, public) with related permissions. 2. Init the custom role(dag-user) with related permissions. """# Create global all-dag permissionsself.create_perm_vm_for_all_dag()# Sync the default roles (Admin, Viewer, User, Op, public) with related permissionsself.bulk_sync_roles(self.ROLE_CONFIGS)self.add_homepage_access_to_custom_roles()# init existing roles, the rest role could be created through UI.self.update_admin_permission()self.clean_perms()
[docs]defcreate_perm_vm_for_all_dag(self)->None:"""Create perm-vm if not exist and insert into FAB security model for all-dags."""# create perm for global logical dagforresource_name,action_nameinitertools.product(self.DAG_RESOURCES,self.DAG_ACTIONS):self._merge_perm(action_name,resource_name)
[docs]defadd_homepage_access_to_custom_roles(self)->None:"""Add Website.can_read access to all custom roles."""website_permission=self.create_permission(permissions.ACTION_CAN_READ,permissions.RESOURCE_WEBSITE)custom_roles=[roleforroleinself.get_all_roles()ifrole.namenotinEXISTING_ROLES]forroleincustom_roles:self.add_permission_to_role(role,website_permission)self.appbuilder.get_session.commit()
[docs]defupdate_admin_permission(self)->None:""" Add missing permissions to the table for admin. Admin should get all the permissions, except the dag permissions because Admin already has Dags permission. Add the missing ones to the table for admin. """session=self.appbuilder.get_sessionprefixes=getattr(permissions,"PREFIX_LIST",[permissions.RESOURCE_DAG_PREFIX])dag_resources=session.scalars(select(Resource).where(or_(*[Resource.name.like(f"{prefix}%")forprefixinprefixes])))resource_ids=[resource.idforresourceindag_resources]perms=session.scalars(select(Permission).where(~Permission.resource_id.in_(resource_ids)))perms=[pforpinpermsifp.actionandp.resource]admin=self.find_role("Admin")admin.permissions=list(set(admin.permissions)|set(perms))session.commit()
[docs]defclean_perms(self)->None:"""FAB leaves faulty permissions that need to be cleaned up."""self.log.debug("Cleaning faulty perms")sesh=self.appbuilder.get_sessionperms=sesh.query(Permission).filter(or_(Permission.action==None,# noqa: E711Permission.resource==None,# noqa: E711))# Since FAB doesn't define ON DELETE CASCADE on these tables, we need# to delete the _object_ so that SQLA knows to delete the many-to-many# relationship object too. :(deleted_count=0forperminperms:sesh.delete(perm)deleted_count+=1sesh.commit()ifdeleted_count:self.log.info("Deleted %s faulty permissions",deleted_count)
[docs]defpermission_exists_in_one_or_more_roles(self,resource_name:str,action_name:str,role_ids:list[int])->bool:""" Efficiently check if a certain permission exists on a list of role ids; used by `has_access`. :param resource_name: The view's name to check if exists on one of the roles :param action_name: The permission name to check if exists :param role_ids: a list of Role ids :return: Boolean """q=(self.appbuilder.get_session.query(self.permission_model).join(assoc_permission_role,and_(self.permission_model.id==assoc_permission_role.c.permission_view_id),).join(self.role_model).join(self.action_model).join(self.resource_model).filter(self.resource_model.name==resource_name,self.action_model.name==action_name,self.role_model.id.in_(role_ids),).exists())# Special case for MSSQL/Oracle (works on PG and MySQL > 8)# Note: We need to keep MSSQL compatibility as long as this provider package# might still be updated by Airflow prior 2.9.0 users with MSSQLifself.appbuilder.get_session.bind.dialect.namein("mssql","oracle"):returnself.appbuilder.get_session.query(literal(True)).filter(q).scalar()returnself.appbuilder.get_session.query(q).scalar()
[docs]definit_role(self,role_name,perms)->None:""" Initialize the role with actions and related resources. :param role_name: :param perms: """warnings.warn("`init_role` has been deprecated. Please use `bulk_sync_roles` instead.",RemovedInAirflow3Warning,stacklevel=2,)self.bulk_sync_roles([{"role":role_name,"perms":perms}])
[docs]defbulk_sync_roles(self,roles:Iterable[dict[str,Any]])->None:"""Sync the provided roles and permissions."""existing_roles=self._get_all_roles_with_permissions()non_dag_perms=self._get_all_non_dag_permissions()forconfiginroles:role_name=config["role"]perms=config["perms"]role=existing_roles.get(role_name)orself.add_role(role_name)foraction_name,resource_nameinperms:perm=non_dag_perms.get((action_name,resource_name))orself.create_permission(action_name,resource_name)ifpermnotinrole.permissions:self.add_permission_to_role(role,perm)
[docs]defupdate_role(self,role_id,name:str)->Role|None:"""Update a role in the database."""role=self.get_session.get(self.role_model,role_id)ifnotrole:returnNonetry:role.name=nameself.get_session.merge(role)self.get_session.commit()log.info(const.LOGMSG_INF_SEC_UPD_ROLE,role)exceptExceptionase:log.error(const.LOGMSG_ERR_SEC_UPD_ROLE,e)self.get_session.rollback()returnNonereturnrole
[docs]defadd_role(self,name:str)->Role:"""Add a role in the database."""role=self.find_role(name)ifroleisNone:try:role=self.role_model()role.name=nameself.get_session.add(role)self.get_session.commit()log.info(const.LOGMSG_INF_SEC_ADD_ROLE,name)returnroleexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_ROLE,e)self.get_session.rollback()returnrole
[docs]deffind_role(self,name):""" Find a role in the database. :param name: the role name """returnself.get_session.query(self.role_model).filter_by(name=name).one_or_none()
[docs]defdelete_role(self,role_name:str)->None:""" Delete the given Role. :param role_name: the name of a role in the ab_role table """session=self.get_sessionrole=session.query(Role).filter(Role.name==role_name).first()ifrole:log.info("Deleting role '%s'",role_name)session.delete(role)session.commit()else:raiseAirflowException(f"Role named '{role_name}' does not exist")
[docs]defget_roles_from_keys(self,role_keys:list[str])->set[Role]:""" Construct a list of FAB role objects, from a list of keys. NOTE: - keys are things like: "LDAP group DNs" or "OAUTH group names" - we use AUTH_ROLES_MAPPING to map from keys, to FAB role names :param role_keys: the list of FAB role keys """_roles=set()_role_keys=set(role_keys)forrole_key,fab_role_namesinself.auth_roles_mapping.items():ifrole_keyin_role_keys:forfab_role_nameinfab_role_names:fab_role=self.find_role(fab_role_name)iffab_role:_roles.add(fab_role)else:log.warning("Can't find role specified in AUTH_ROLES_MAPPING: %s",fab_role_name)return_roles
[docs]defadd_user(self,username,first_name,last_name,email,role,password="",hashed_password="",):"""Create a user."""try:user=self.user_model()user.first_name=first_nameuser.last_name=last_nameuser.username=usernameuser.email=emailuser.active=Trueself.get_session.add(user)user.roles=roleifisinstance(role,list)else[role]ifhashed_password:user.password=hashed_passwordelse:user.password=generate_password_hash(password)self.get_session.commit()log.info(const.LOGMSG_INF_SEC_ADD_USER,username)returnuserexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_USER,e)self.get_session.rollback()returnFalse
[docs]defcount_users(self):"""Return the number of users in the database."""returnself.get_session.query(func.count(self.user_model.id)).scalar()
[docs]defadd_register_user(self,username,first_name,last_name,email,password="",hashed_password=""):""" Add a registration request for the user. :rtype : RegisterUser """register_user=self.registeruser_model()register_user.username=usernameregister_user.email=emailregister_user.first_name=first_nameregister_user.last_name=last_nameifhashed_password:register_user.password=hashed_passwordelse:register_user.password=generate_password_hash(password)register_user.registration_hash=str(uuid.uuid1())try:self.get_session.add(register_user)self.get_session.commit()returnregister_userexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER,e)self.get_session.rollback()returnNone
[docs]deffind_user(self,username=None,email=None):"""Find user by username or email."""ifusername:try:ifself.auth_username_ci:return(self.get_session.query(self.user_model).filter(func.lower(self.user_model.username)==func.lower(username)).one_or_none())else:return(self.get_session.query(self.user_model).filter(func.lower(self.user_model.username)==func.lower(username)).one_or_none())exceptMultipleResultsFound:log.error("Multiple results found for user %s",username)returnNoneelifemail:try:returnself.get_session.query(self.user_model).filter_by(email=email).one_or_none()exceptMultipleResultsFound:log.error("Multiple results found for user with email %s",email)returnNone
[docs]defupdate_user_auth_stat(self,user,success=True):""" Update user authentication stats. This is done upon successful/unsuccessful authentication attempts. :param user: The identified (but possibly not successfully authenticated) user model :param success: Defaults to true, if true increments login_count, updates last_login, and resets fail_login_count to 0, if false increments fail_login_count on user model. """ifnotuser.login_count:user.login_count=0ifnotuser.fail_login_count:user.fail_login_count=0ifsuccess:user.login_count+=1user.last_login=datetime.datetime.now()user.fail_login_count=0else:user.fail_login_count+=1self.update_user(user)
""" ------------- Action entity ------------- """
[docs]defget_action(self,name:str)->Action:""" Get an existing action record. :param name: name """returnself.get_session.query(self.action_model).filter_by(name=name).one_or_none()
[docs]defcreate_action(self,name):""" Add an action to the backend, model action. :param name: name of the action: 'can_add','can_edit' etc... """action=self.get_action(name)ifactionisNone:try:action=self.action_model()action.name=nameself.get_session.add(action)self.get_session.commit()returnactionexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION,e)self.get_session.rollback()returnaction
[docs]defdelete_action(self,name:str)->bool:""" Delete a permission action. :param name: Name of action to delete (e.g. can_read). """action=self.get_action(name)ifnotaction:log.warning(const.LOGMSG_WAR_SEC_DEL_PERMISSION,name)returnFalsetry:perms=(self.get_session.query(self.permission_model).filter(self.permission_model.action==action).all())ifperms:log.warning(const.LOGMSG_WAR_SEC_DEL_PERM_PVM,action,perms)returnFalseself.get_session.delete(action)self.get_session.commit()returnTrueexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION,e)self.get_session.rollback()returnFalse
[docs]defget_resource(self,name:str)->Resource:""" Return a resource record by name, if it exists. :param name: Name of resource """returnself.get_session.query(self.resource_model).filter_by(name=name).one_or_none()
[docs]defcreate_resource(self,name)->Resource:""" Create a resource with the given name. :param name: The name of the resource to create created. """resource=self.get_resource(name)ifresourceisNone:try:resource=self.resource_model()resource.name=nameself.get_session.add(resource)self.get_session.commit()returnresourceexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU,e)self.get_session.rollback()returnresource
[docs]defget_all_resources(self)->list[Resource]:"""Get all existing resource records."""returnself.get_session.query(self.resource_model).all()
[docs]defdelete_resource(self,name:str)->bool:""" Delete a Resource from the backend. :param name: name of the resource """resource=self.get_resource(name)ifnotresource:log.warning(const.LOGMSG_WAR_SEC_DEL_VIEWMENU,name)returnFalsetry:perms=(self.get_session.query(self.permission_model).filter(self.permission_model.resource==resource).all())ifperms:log.warning(const.LOGMSG_WAR_SEC_DEL_VIEWMENU_PVM,resource,perms)returnFalseself.get_session.delete(resource)self.get_session.commit()returnTrueexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION,e)self.get_session.rollback()returnFalse
[docs]defget_permission(self,action_name:str,resource_name:str,)->Permission|None:""" Get a permission made with the given action->resource pair, if the permission already exists. :param action_name: Name of action :param resource_name: Name of resource """action=self.get_action(action_name)resource=self.get_resource(resource_name)ifactionandresource:return(self.get_session.query(self.permission_model).filter_by(action=action,resource=resource).one_or_none())returnNone
[docs]defget_resource_permissions(self,resource:Resource)->Permission:""" Retrieve permission pairs associated with a specific resource object. :param resource: Object representing a single resource. """returnself.get_session.query(self.permission_model).filter_by(resource_id=resource.id).all()
[docs]defcreate_permission(self,action_name,resource_name)->Permission|None:""" Add a permission on a resource to the backend. :param action_name: name of the action to add: 'can_add','can_edit' etc... :param resource_name: name of the resource to add """ifnot(action_nameandresource_name):returnNoneperm=self.get_permission(action_name,resource_name)ifperm:returnpermresource=self.create_resource(resource_name)action=self.create_action(action_name)perm=self.permission_model()perm.resource_id,perm.action_id=resource.id,action.idtry:self.get_session.add(perm)self.get_session.commit()log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW,perm)returnpermexceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW,e)self.get_session.rollback()returnNone
[docs]defdelete_permission(self,action_name:str,resource_name:str)->None:""" Delete the permission linking an action->resource pair. Doesn't delete the underlying action or resource. :param action_name: Name of existing action :param resource_name: Name of existing resource """ifnot(action_nameandresource_name):returnperm=self.get_permission(action_name,resource_name)ifnotperm:returnroles=(self.get_session.query(self.role_model).filter(self.role_model.permissions.contains(perm)).first())ifroles:log.warning(const.LOGMSG_WAR_SEC_DEL_PERMVIEW,resource_name,action_name,roles)returntry:# delete permission on resourceself.get_session.delete(perm)self.get_session.commit()# if no more permission on permission view, delete permissionifnotself.get_session.query(self.permission_model).filter_by(action=perm.action).all():self.delete_action(perm.action.name)log.info(const.LOGMSG_INF_SEC_DEL_PERMVIEW,action_name,resource_name)exceptExceptionase:log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW,e)self.get_session.rollback()
[docs]defadd_permission_to_role(self,role:Role,permission:Permission|None)->None:""" Add an existing permission pair to a role. :param role: The role about to get a new permission. :param permission: The permission pair to add to a role. """ifpermissionandpermissionnotinrole.permissions:try:role.permissions.append(permission)self.get_session.merge(role)self.get_session.commit()log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE,permission,role.name)exceptExceptionase:log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE,e)self.get_session.rollback()
[docs]defremove_permission_from_role(self,role:Role,permission:Permission)->None:""" Remove a permission pair from a role. :param role: User role containing permissions. :param permission: Object representing resource-> action pair """ifpermissioninrole.permissions:try:role.permissions.remove(permission)self.get_session.merge(role)self.get_session.commit()log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE,permission,role.name)exceptExceptionase:log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE,e)self.get_session.rollback()
[docs]defget_oid_identity_url(self,provider_name:str)->str|None:"""Return the OIDC identity provider URL."""forproviderinself.openid_providers:ifprovider.get("name")==provider_name:returnprovider.get("url")returnNone
@staticmethod
[docs]defget_user_roles(user=None):""" Get all the roles associated with the user. :param user: the ab_user in FAB model. :return: a list of roles associated with the user. """ifuserisNone:user=g.userreturnuser.roles
""" -------------------- Auth related methods -------------------- """
[docs]defauth_user_ldap(self,username,password):""" Authenticate user with LDAP. NOTE: this depends on python-ldap module. :param username: the username :param password: the password """# If no username is provided, go awayif(usernameisNone)orusername=="":returnNone# Search the DB for this useruser=self.find_user(username=username)# If user is not active, go awayifuserand(notuser.is_active):returnNone# If user is not registered, and not self-registration, go awayif(notuser)and(notself.auth_user_registration):returnNone# Ensure python-ldap is installedtry:importldapexceptImportError:log.error("python-ldap library is not installed")returnNonetry:# LDAP certificate settingsifself.auth_ldap_tls_cacertdir:ldap.set_option(ldap.OPT_X_TLS_CACERTDIR,self.auth_ldap_tls_cacertdir)ifself.auth_ldap_tls_cacertfile:ldap.set_option(ldap.OPT_X_TLS_CACERTFILE,self.auth_ldap_tls_cacertfile)ifself.auth_ldap_tls_certfile:ldap.set_option(ldap.OPT_X_TLS_CERTFILE,self.auth_ldap_tls_certfile)ifself.auth_ldap_tls_keyfile:ldap.set_option(ldap.OPT_X_TLS_KEYFILE,self.auth_ldap_tls_keyfile)ifself.auth_ldap_allow_self_signed:ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,ldap.OPT_X_TLS_ALLOW)ldap.set_option(ldap.OPT_X_TLS_NEWCTX,0)elifself.auth_ldap_tls_demand:ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,ldap.OPT_X_TLS_DEMAND)ldap.set_option(ldap.OPT_X_TLS_NEWCTX,0)# Initialise LDAP connectioncon=ldap.initialize(self.auth_ldap_server)con.set_option(ldap.OPT_REFERRALS,0)ifself.auth_ldap_use_tls:try:con.start_tls_s()exceptException:log.error(LOGMSG_ERR_SEC_AUTH_LDAP_TLS,self.auth_ldap_server)returnNone# Define variables, so we can check if they are set in later stepsuser_dn=Noneuser_attributes={}# Flow 1 - (Indirect Search Bind):# - in this flow, special bind credentials are used to perform the# LDAP search# - in this flow, AUTH_LDAP_SEARCH must be setifself.auth_ldap_bind_user:# Bind with AUTH_LDAP_BIND_USER/AUTH_LDAP_BIND_PASSWORD# (authorizes for LDAP search)self._ldap_bind_indirect(ldap,con)# Search for `username`# - returns the `user_dn` needed for binding to validate credentials# - returns the `user_attributes` needed for# AUTH_USER_REGISTRATION/AUTH_ROLES_SYNC_AT_LOGINifself.auth_ldap_search:user_dn,user_attributes=self._search_ldap(ldap,con,username)else:log.error("AUTH_LDAP_SEARCH must be set when using AUTH_LDAP_BIND_USER")returnNone# If search failed, go awayifuser_dnisNone:log.info(LOGMSG_WAR_SEC_NOLDAP_OBJ,username)returnNone# Bind with user_dn/password (validates credentials)ifnotself._ldap_bind(ldap,con,user_dn,password):ifuser:self.update_user_auth_stat(user,False)# Invalid credentials, go awaylog.info(LOGMSG_WAR_SEC_LOGIN_FAILED,username)returnNone# Flow 2 - (Direct Search Bind):# - in this flow, the credentials provided by the end-user are used# to perform the LDAP search# - in this flow, we only search LDAP if AUTH_LDAP_SEARCH is set# - features like AUTH_USER_REGISTRATION & AUTH_ROLES_SYNC_AT_LOGIN# will only work if AUTH_LDAP_SEARCH is setelse:# Copy the provided username (so we can apply formatters)bind_username=username# update `bind_username` by applying AUTH_LDAP_APPEND_DOMAIN# - for Microsoft AD, which allows binding with userPrincipalNameifself.auth_ldap_append_domain:bind_username=bind_username+"@"+self.auth_ldap_append_domain# Update `bind_username` by applying AUTH_LDAP_USERNAME_FORMAT# - for transforming the username into a DN,# for example: "uid=%s,ou=example,o=test"ifself.auth_ldap_username_format:bind_username=self.auth_ldap_username_format%bind_username# Bind with bind_username/password# (validates credentials & authorizes for LDAP search)ifnotself._ldap_bind(ldap,con,bind_username,password):ifuser:self.update_user_auth_stat(user,False)# Invalid credentials, go awaylog.info(LOGMSG_WAR_SEC_LOGIN_FAILED,bind_username)returnNone# Search for `username` (if AUTH_LDAP_SEARCH is set)# - returns the `user_attributes`# needed for AUTH_USER_REGISTRATION/AUTH_ROLES_SYNC_AT_LOGIN# - we search on `username` not `bind_username`,# because AUTH_LDAP_APPEND_DOMAIN and AUTH_LDAP_USERNAME_FORMAT# would result in an invalid search filterifself.auth_ldap_search:user_dn,user_attributes=self._search_ldap(ldap,con,username)# If search failed, go awayifuser_dnisNone:log.info(LOGMSG_WAR_SEC_NOLDAP_OBJ,username)returnNone# Sync the user's rolesifuseranduser_attributesandself.auth_roles_sync_at_login:user.roles=self._ldap_calculate_user_roles(user_attributes)log.debug("Calculated new roles for user=%r as: %s",user_dn,user.roles)# If the user is new, register themif(notuser)anduser_attributesandself.auth_user_registration:user=self.add_user(username=username,first_name=self.ldap_extract(user_attributes,self.auth_ldap_firstname_field,""),last_name=self.ldap_extract(user_attributes,self.auth_ldap_lastname_field,""),email=self.ldap_extract(user_attributes,self.auth_ldap_email_field,f"{username}@email.notfound",),role=self._ldap_calculate_user_roles(user_attributes),)log.debug("New user registered: %s",user)# If user registration failed, go awayifnotuser:log.info(LOGMSG_ERR_SEC_ADD_REGISTER_USER,username)returnNone# LOGIN SUCCESS (only if user is now registered)ifuser:self._rotate_session_id()self.update_user_auth_stat(user)returnuserelse:returnNoneexceptldap.LDAPErrorase:msg=Noneifisinstance(e,dict):msg=getattr(e,"message",None)if(msgisnotNone)and("desc"inmsg):log.error(LOGMSG_ERR_SEC_AUTH_LDAP,e.message["desc"])returnNoneelse:log.error(e)returnNone
[docs]defauth_user_db(self,username,password):""" Authenticate user, auth db style. :param username: The username or registered email address :param password: The password, will be tested against hashed password on db """ifusernameisNoneorusername=="":returnNoneuser=self.find_user(username=username)ifuserisNone:user=self.find_user(email=username)ifuserisNoneor(notuser.is_active):# Balance failure and successcheck_password_hash(self.appbuilder.get_app.config["AUTH_DB_FAKE_PASSWORD_HASH_CHECK"],"password",)log.info(LOGMSG_WAR_SEC_LOGIN_FAILED,username)returnNoneelifcheck_password_hash(user.password,password):self._rotate_session_id()self.update_user_auth_stat(user,True)returnuserelse:self.update_user_auth_stat(user,False)log.info(LOGMSG_WAR_SEC_LOGIN_FAILED,username)returnNone
[docs]defoauth_user_info_getter(self,func:Callable[[AirflowSecurityManagerV2,str,dict[str,Any]|None],dict[str,Any]],):""" Get OAuth user info for all the providers. Receives provider and response return a dict with the information returned from the provider. The returned user info dict should have its keys with the same name as the User Model. Use it like this an example for GitHub :: @appbuilder.sm.oauth_user_info_getter def my_oauth_user_info(sm, provider, response=None): if provider == "github": me = sm.oauth_remotes[provider].get("user") return {"username": me.data.get("login")} return {} """defwraps(provider:str,response:dict[str,Any]|None=None)->dict[str,Any]:returnfunc(self,provider,response)self.oauth_user_info=wrapsreturnwraps
[docs]defget_oauth_user_info(self,provider:str,resp:dict[str,Any])->dict[str,Any]:""" There are different OAuth APIs with different ways to retrieve user info. All providers have different ways to retrieve user info. """# for GITHUBifprovider=="github"orprovider=="githublocal":me=self.oauth_remotes[provider].get("user")data=me.json()log.debug("User info from GitHub: %s",data)return{"username":"github_"+data.get("login")}# for twitterifprovider=="twitter":me=self.oauth_remotes[provider].get("account/settings.json")data=me.json()log.debug("User info from Twitter: %s",data)return{"username":"twitter_"+data.get("screen_name","")}# for linkedinifprovider=="linkedin":me=self.oauth_remotes[provider].get("people/~:(id,email-address,first-name,last-name)?format=json")data=me.json()log.debug("User info from LinkedIn: %s",data)return{"username":"linkedin_"+data.get("id",""),"email":data.get("email-address",""),"first_name":data.get("firstName",""),"last_name":data.get("lastName",""),}# for Googleifprovider=="google":me=self.oauth_remotes[provider].get("userinfo")data=me.json()log.debug("User info from Google: %s",data)return{"username":"google_"+data.get("id",""),"first_name":data.get("given_name",""),"last_name":data.get("family_name",""),"email":data.get("email",""),}ifprovider=="azure":me=self._decode_and_validate_azure_jwt(resp["id_token"])log.debug("User info from Azure: %s",me)# https://learn.microsoft.com/en-us/azure/active-directory/develop/id-token-claims-reference#payload-claimsreturn{"email":me["email"]if"email"inmeelseme["upn"],"first_name":me.get("given_name",""),"last_name":me.get("family_name",""),"username":me["oid"],"role_keys":me.get("roles",[]),}# for OpenShiftifprovider=="openshift":me=self.oauth_remotes[provider].get("apis/user.openshift.io/v1/users/~")data=me.json()log.debug("User info from OpenShift: %s",data)return{"username":"openshift_"+data.get("metadata").get("name")}# for Oktaifprovider=="okta":me=self.oauth_remotes[provider].get("userinfo")data=me.json()log.debug("User info from Okta: %s",data)if"error"notindata:return{"username":f"{provider}_{data['sub']}","first_name":data.get("given_name",""),"last_name":data.get("family_name",""),"email":data["email"],"role_keys":data.get("groups",[]),}else:log.error(data.get("error_description"))return{}# for Auth0ifprovider=="auth0":data=self.appbuilder.sm.oauth_remotes[provider].userinfo()log.debug("User info from Auth0: %s",data)return{"username":f"{provider}_{data['sub']}","first_name":data.get("given_name",""),"last_name":data.get("family_name",""),"email":data["email"],"role_keys":data.get("groups",[]),}# for Keycloakifproviderin["keycloak","keycloak_before_17"]:me=self.oauth_remotes[provider].get("openid-connect/userinfo")me.raise_for_status()data=me.json()log.debug("User info from Keycloak: %s",data)return{"username":data.get("preferred_username",""),"first_name":data.get("given_name",""),"last_name":data.get("family_name",""),"email":data.get("email",""),"role_keys":data.get("groups",[]),}# for Authentikifprovider=="authentik":id_token=resp["id_token"]me=self._get_authentik_token_info(id_token)log.debug("User info from authentik: %s",me)return{"email":me["preferred_username"],"first_name":me.get("given_name",""),"username":me["nickname"],"role_keys":me.get("groups",[]),}else:return{}
[docs]defcheck_authorization(self,perms:Sequence[tuple[str,str]]|None=None,dag_id:str|None=None,)->bool:"""Check the logged-in user has the specified permissions."""ifnotperms:returnTrueforperminperms:ifpermin((permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_DAG),):can_access_all_dags=self.has_access(*perm)ifnotcan_access_all_dags:action=perm[0]ifnotself.can_access_some_dags(action,dag_id):returnFalseelifnotself.has_access(*perm):returnFalsereturnTrue
[docs]defset_oauth_session(self,provider,oauth_response):"""Set the current session with OAuth user secrets."""# Get this provider key names for token_key and token_secrettoken_key=self.get_oauth_token_key_name(provider)token_secret=self.get_oauth_token_secret_name(provider)# Save users token on encrypted session cookiesession["oauth"]=(oauth_response[token_key],oauth_response.get(token_secret,""),)session["oauth_provider"]=provider
[docs]defget_oauth_token_key_name(self,provider):""" Return the token_key name for the oauth provider. If none is configured defaults to oauth_token this is configured using OAUTH_PROVIDERS and token_key key. """for_providerinself.oauth_providers:if_provider["name"]==provider:return_provider.get("token_key","oauth_token")
[docs]defget_oauth_token_secret_name(self,provider):""" Get the ``token_secret`` name for the oauth provider. If none is configured, defaults to ``oauth_secret``. This is configured using ``OAUTH_PROVIDERS`` and ``token_secret``. """for_providerinself.oauth_providers:if_provider["name"]==provider:return_provider.get("token_secret","oauth_token_secret")
[docs]defauth_user_oauth(self,userinfo):""" Authenticate user with OAuth. :userinfo: dict with user information (keys are the same as User model columns) """# extract the username from `userinfo`if"username"inuserinfo:username=userinfo["username"]elif"email"inuserinfo:username=userinfo["email"]else:log.error("OAUTH userinfo does not have username or email %s",userinfo)returnNone# If username is empty, go awayif(usernameisNone)orusername=="":returnNone# Search the DB for this useruser=self.find_user(username=username)# If user is not active, go awayifuserand(notuser.is_active):returnNone# If user is not registered, and not self-registration, go awayif(notuser)and(notself.auth_user_registration):returnNone# Sync the user's rolesifuserandself.auth_roles_sync_at_login:user.roles=self._oauth_calculate_user_roles(userinfo)log.debug("Calculated new roles for user=%r as: %s",username,user.roles)# If the user is new, register themif(notuser)andself.auth_user_registration:user=self.add_user(username=username,first_name=userinfo.get("first_name",""),last_name=userinfo.get("last_name",""),email=userinfo.get("email","")orf"{username}@email.notfound",role=self._oauth_calculate_user_roles(userinfo),)log.debug("New user registered: %s",user)# If user registration failed, go awayifnotuser:log.error("Error creating a new OAuth user %s",username)returnNone# LOGIN SUCCESS (only if user is now registered)ifuser:self._rotate_session_id()self.update_user_auth_stat(user)returnuserelse:returnNone
[docs]defauth_user_oid(self,email):""" Openid user Authentication. :param email: user's email to authenticate """user=self.find_user(email=email)ifuserisNoneor(notuser.is_active):log.info(LOGMSG_WAR_SEC_LOGIN_FAILED,email)returnNoneelse:self._rotate_session_id()self.update_user_auth_stat(user)returnuser
[docs]defauth_user_remote_user(self,username):""" REMOTE_USER user Authentication. :param username: user's username for remote auth """user=self.find_user(username=username)# User does not exist, create one if auto user registration.ifuserisNoneandself.auth_user_registration:user=self.add_user(# All we have is REMOTE_USER, so we set# the other fields to blank.username=username,first_name=username,last_name="-",email=username+"@email.notfound",role=self.find_role(self.auth_user_registration_role),)# If user does not exist on the DB and not auto user registration,# or user is inactive, go away.elifuserisNoneor(notuser.is_active):log.info(LOGMSG_WAR_SEC_LOGIN_FAILED,username)returnNoneself._rotate_session_id()self.update_user_auth_stat(user)returnuser
[docs]defget_user_menu_access(self,menu_names:list[str]|None=None)->set[str]:ifget_auth_manager().is_logged_in():returnself._get_user_permission_resources(g.user,"menu_access",resource_names=menu_names)elifcurrent_user_jwt:returnself._get_user_permission_resources(# the current_user_jwt is a lazy proxy, so we need to ignore type checkingcurrent_user_jwt,# type: ignore[arg-type]"menu_access",resource_names=menu_names,)else:returnself._get_user_permission_resources(None,"menu_access",resource_names=menu_names)
[docs]defldap_extract(ldap_dict:dict[str,list[bytes]],field_name:str,fallback:str)->str:raw_value=ldap_dict.get(field_name,[b""])# decode - if empty string, default to fallback, otherwise take first elementreturnraw_value[0].decode("utf-8")orfallback
""" --------------- Private methods --------------- """def_rotate_session_id(self):""" Rotate the session ID. We need to do this upon successful authentication when using the database session backend. """ifconf.get("webserver","SESSION_BACKEND")=="database":session.sid=str(uuid.uuid4())def_get_microsoft_jwks(self)->list[dict[str,Any]]:importrequestsreturnrequests.get(MICROSOFT_KEY_SET_URL).json()def_decode_and_validate_azure_jwt(self,id_token:str)->dict[str,str]:verify_signature=self.oauth_remotes["azure"].client_kwargs.get("verify_signature",False)ifverify_signature:fromauthlib.joseimportJsonWebKey,jwtasauthlib_jwtkeyset=JsonWebKey.import_key_set(self._get_microsoft_jwks())claims=authlib_jwt.decode(id_token,keyset)claims.validate()returnclaimsreturnjwt.decode(id_token,options={"verify_signature":False})def_ldap_bind_indirect(self,ldap,con)->None:""" Attempt to bind to LDAP using the AUTH_LDAP_BIND_USER. :param ldap: The ldap module reference :param con: The ldap connection """ifnotself.auth_ldap_bind_user:# always check AUTH_LDAP_BIND_USER is set before calling this methodraiseValueError("AUTH_LDAP_BIND_USER must be set")try:log.debug("LDAP bind indirect TRY with username: %r",self.auth_ldap_bind_user)con.simple_bind_s(self.auth_ldap_bind_user,self.auth_ldap_bind_password)log.debug("LDAP bind indirect SUCCESS with username: %r",self.auth_ldap_bind_user)exceptldap.INVALID_CREDENTIALSasex:log.error("AUTH_LDAP_BIND_USER and AUTH_LDAP_BIND_PASSWORD are not valid LDAP bind credentials")raiseexdef_search_ldap(self,ldap,con,username):""" Search LDAP for user. :param ldap: The ldap module reference :param con: The ldap connection :param username: username to match with AUTH_LDAP_UID_FIELD :return: ldap object array """ifnotself.auth_ldap_search:# always check AUTH_LDAP_SEARCH is set before calling this methodraiseValueError("AUTH_LDAP_SEARCH must be set")# build the filter string for the LDAP searchifself.auth_ldap_search_filter:filter_str=f"(&{self.auth_ldap_search_filter}({self.auth_ldap_uid_field}={username}))"else:filter_str=f"({self.auth_ldap_uid_field}={username})"# build what fields to request in the LDAP searchrequest_fields=[self.auth_ldap_firstname_field,self.auth_ldap_lastname_field,self.auth_ldap_email_field,]ifself.auth_roles_mapping:request_fields.append(self.auth_ldap_group_field)# perform the LDAP searchlog.debug("LDAP search for %r with fields %s in scope %r",filter_str,request_fields,self.auth_ldap_search)raw_search_result=con.search_s(self.auth_ldap_search,ldap.SCOPE_SUBTREE,filter_str,request_fields)log.debug("LDAP search returned: %s",raw_search_result)# Remove any search referrals from resultssearch_result=[(dn,attrs)fordn,attrsinraw_search_resultifdnisnotNoneandisinstance(attrs,dict)]# only continue if 0 or 1 results were returnediflen(search_result)>1:log.error("LDAP search for %r in scope '%a' returned multiple results",self.auth_ldap_search,filter_str,)returnNone,Nonetry:# extract the DNuser_dn=search_result[0][0]# extract the other attributesuser_info=search_result[0][1]# returnreturnuser_dn,user_infoexcept(IndexError,NameError):returnNone,None@staticmethoddef_ldap_bind(ldap,con,dn:str,password:str)->bool:"""Validates/binds the provided dn/password with the LDAP sever."""try:log.debug("LDAP bind TRY with username: %r",dn)con.simple_bind_s(dn,password)log.debug("LDAP bind SUCCESS with username: %r",dn)returnTrueexceptldap.INVALID_CREDENTIALS:returnFalsedef_ldap_calculate_user_roles(self,user_attributes:dict[str,list[bytes]])->list[str]:user_role_objects=set()# apply AUTH_ROLES_MAPPINGifself.auth_roles_mapping:user_role_keys=self.ldap_extract_list(user_attributes,self.auth_ldap_group_field)user_role_objects.update(self.get_roles_from_keys(user_role_keys))# apply AUTH_USER_REGISTRATIONifself.auth_user_registration:registration_role_name=self.auth_user_registration_role# lookup registration role in flask dbfab_role=self.find_role(registration_role_name)iffab_role:user_role_objects.add(fab_role)else:log.warning("Can't find AUTH_USER_REGISTRATION role: %s",registration_role_name)returnlist(user_role_objects)def_oauth_calculate_user_roles(self,userinfo)->list[str]:user_role_objects=set()# apply AUTH_ROLES_MAPPINGifself.auth_roles_mapping:user_role_keys=userinfo.get("role_keys",[])user_role_objects.update(self.get_roles_from_keys(user_role_keys))# apply AUTH_USER_REGISTRATION_ROLEifself.auth_user_registration:registration_role_name=self.auth_user_registration_role# if AUTH_USER_REGISTRATION_ROLE_JMESPATH is set,# use it for the registration roleifself.auth_user_registration_role_jmespath:importjmespathregistration_role_name=jmespath.search(self.auth_user_registration_role_jmespath,userinfo)# lookup registration role in flask dbfab_role=self.find_role(registration_role_name)iffab_role:user_role_objects.add(fab_role)else:log.warning("Can't find AUTH_USER_REGISTRATION role: %s",registration_role_name)returnlist(user_role_objects)def_get_user_permission_resources(self,user:User|None,action_name:str,resource_names:list[str]|None=None)->set[str]:""" Get resource names with a certain action name that a user has access to. Mainly used to fetch all menu permissions on a single db call, will also check public permissions and builtin roles """ifnotresource_names:resource_names=[]db_role_ids=[]ifuserisNone:# include public roleroles=[self.get_public_role()]else:roles=user.roles# First check against builtin (statically configured) roles# because no database query is neededresult=set()forroleinroles:ifrole.nameinself.builtin_roles:forresource_nameinresource_names:ifself._has_access_builtin_roles(role,action_name,resource_name):result.add(resource_name)else:db_role_ids.append(role.id)# Then check against database-stored rolesrole_resource_names=[perm.resource.nameforperminself.filter_roles_by_perm_with_action(action_name,db_role_ids)]result.update(role_resource_names)returnresultdef_has_access_builtin_roles(self,role,action_name:str,resource_name:str)->bool:"""Check permission on builtin role."""perms=self.builtin_roles.get(role.name,[])for_resource_name,_action_nameinperms:ifre2.match(_resource_name,resource_name)andre2.match(_action_name,action_name):returnTruereturnFalsedef_merge_perm(self,action_name:str,resource_name:str)->None:""" Add the new (action, resource) to assoc_permission_role if it doesn't exist. It will add the related entry to ab_permission and ab_resource two meta tables as well. :param action_name: Name of the action :param resource_name: Name of the resource """action=self.get_action(action_name)resource=self.get_resource(resource_name)perm=Noneifactionandresource:perm=self.appbuilder.get_session.scalar(select(self.permission_model).filter_by(action=action,resource=resource).limit(1))ifnotpermandaction_nameandresource_name:self.create_permission(action_name,resource_name)def_get_all_roles_with_permissions(self)->dict[str,Role]:"""Return a dict with a key of role name and value of role with early loaded permissions."""return{r.name:rforrinself.appbuilder.get_session.scalars(select(self.role_model).options(joinedload(self.role_model.permissions))).unique()}def_get_all_non_dag_permissions(self)->dict[tuple[str,str],Permission]:""" Get permissions except those that are for specific DAGs. Return a dict with a key of (action_name, resource_name) and value of permission with all permissions except those that are for specific DAGs. """return{(action_name,resource_name):viewmodelforaction_name,resource_name,viewmodelin(self.appbuilder.get_session.execute(select(self.action_model.name,self.resource_model.name,self.permission_model).join(self.permission_model.action).join(self.permission_model.resource).where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))))}
[docs]deffilter_roles_by_perm_with_action(self,action_name:str,role_ids:list[int]):"""Find roles with permission."""return(self.appbuilder.get_session.query(self.permission_model).join(assoc_permission_role,and_(self.permission_model.id==assoc_permission_role.c.permission_view_id),).join(self.role_model).join(self.action_model).join(self.resource_model).filter(self.action_model.name==action_name,self.role_model.id.in_(role_ids),)).all()
def_get_root_dag_id(self,dag_id:str)->str:# TODO: The "root_dag_id" check can be remove when the minimum version of Airflow is bumped to 3.0if"."indag_idandhasattr(DagModel,"root_dag_id"):dm=self.appbuilder.get_session.execute(select(DagModel.dag_id,DagModel.root_dag_id).where(DagModel.dag_id==dag_id)).one()returndm.root_dag_idordm.dag_idreturndag_id@staticmethoddef_cli_safe_flash(text:str,level:str)->None:"""Show a flash in a web context or prints a message if not."""ifhas_request_context():flash(Markup(text),level)else:getattr(log,level)(text.replace("<br>","\n").replace("<b>","*").replace("</b>","*"))