Source code for airflow.providers.fab.auth_manager.api_endpoints.user_endpoint
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromhttpimportHTTPStatusfromtypingimportTYPE_CHECKING,castfromconnexionimportNoContentfromflaskimportrequestfrommarshmallowimportValidationErrorfromsqlalchemyimportasc,desc,func,selectfromwerkzeug.securityimportgenerate_password_hashfromairflow.api_connexion.exceptionsimportAlreadyExists,BadRequest,NotFound,Unknownfromairflow.api_connexion.parametersimportcheck_limit,format_parametersfromairflow.api_connexion.securityimportrequires_access_custom_viewfromairflow.providers.fab.auth_manager.modelsimportUserfromairflow.providers.fab.auth_manager.schemas.user_schemaimport(UserCollection,user_collection_item_schema,user_collection_schema,user_schema,)fromairflow.providers.fab.auth_manager.security_manager.overrideimportFabAirflowSecurityManagerOverridefromairflow.securityimportpermissionsfromairflow.www.extensions.init_auth_managerimportget_auth_managerifTYPE_CHECKING:fromairflow.api_connexion.typesimportAPIResponse,UpdateMaskfromairflow.providers.fab.auth_manager.modelsimportRole@requires_access_custom_view("GET",permissions.RESOURCE_USER)
[docs]defget_user(*,username:str)->APIResponse:"""Get a user."""security_manager=cast(FabAirflowSecurityManagerOverride,get_auth_manager().security_manager)user=security_manager.find_user(username=username)ifnotuser:raiseNotFound(title="User not found",detail=f"The User with username `{username}` was not found")returnuser_collection_item_schema.dump(user)
[docs]defget_users(*,limit:int,order_by:str="id",offset:str|None=None)->APIResponse:"""Get users."""security_manager=cast(FabAirflowSecurityManagerOverride,get_auth_manager().security_manager)session=security_manager.get_sessiontotal_entries=session.execute(select(func.count(User.id))).scalar()direction=desciforder_by.startswith("-")elseascto_replace={"user_id":"id"}order_param=order_by.strip("-")order_param=to_replace.get(order_param,order_param)allowed_sort_attrs=["id","first_name","last_name","user_name","email","is_active","role",]iforder_bynotinallowed_sort_attrs:raiseBadRequest(detail=f"Ordering with '{order_by}' is disallowed or "f"the attribute does not exist on the model")query=select(User).order_by(direction(getattr(User,order_param))).offset(offset).limit(limit)users=session.scalars(query).all()returnuser_collection_schema.dump(UserCollection(users=users,total_entries=total_entries))
[docs]defpost_user()->APIResponse:"""Create a new user."""try:data=user_schema.load(request.json)exceptValidationErrorase:raiseBadRequest(detail=str(e.messages))security_manager=cast(FabAirflowSecurityManagerOverride,get_auth_manager().security_manager)username=data["username"]email=data["email"]ifsecurity_manager.find_user(username=username):detail=f"Username `{username}` already exists. Use PATCH to update."raiseAlreadyExists(detail=detail)ifsecurity_manager.find_user(email=email):detail=f"The email `{email}` is already taken."raiseAlreadyExists(detail=detail)roles_to_add=[]missing_role_names=[]forrole_dataindata.pop("roles",()):role_name=role_data["name"]role=security_manager.find_role(role_name)ifroleisNone:missing_role_names.append(role_name)else:roles_to_add.append(role)ifmissing_role_names:detail=f"Unknown roles: {', '.join(repr(n)forninmissing_role_names)}"raiseBadRequest(detail=detail)ifnotroles_to_add:# No roles provided, use the F.A.B's default registered user role.roles_to_add.append(security_manager.find_role(security_manager.auth_user_registration_role))user=security_manager.add_user(role=roles_to_add,**data)ifnotuser:detail=f"Failed to add user `{username}`."raiseUnknown(detail=detail)returnuser_schema.dump(user)
[docs]defdelete_user(*,username:str)->APIResponse:"""Delete a user."""security_manager=cast(FabAirflowSecurityManagerOverride,get_auth_manager().security_manager)user=security_manager.find_user(username=username)ifuserisNone:detail=f"The User with username `{username}` was not found"raiseNotFound(title="User not found",detail=detail)user.roles=[]# Clear foreign keys on this user first.security_manager.get_session.delete(user)security_manager.get_session.commit()returnNoContent,HTTPStatus.NO_CONTENT