1
0
Fork 0
mirror of https://github.com/mealie-recipes/mealie.git synced 2025-07-23 15:19:41 +02:00

fix: prevent users from updating their own household privileges (#4928)

Co-authored-by: Michael Genson <71845777+michael-genson@users.noreply.github.com>
This commit is contained in:
Kuchenpirat 2025-01-22 17:06:41 +01:00 committed by GitHub
parent 8cd2da0abb
commit bf616f9db5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 133 additions and 28 deletions

View file

@ -76,6 +76,9 @@ class HouseholdSelfServiceController(BaseUserController):
if target_user.household_id != self.household_id: if target_user.household_id != self.household_id:
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not a member of this household") raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not a member of this household")
if target_user.id == self.user.id:
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not allowed to change their own permissions")
target_user.can_invite = permissions.can_invite target_user.can_invite = permissions.can_invite
target_user.can_manage = permissions.can_manage target_user.can_manage = permissions.can_manage
target_user.can_manage_household = permissions.can_manage_household target_user.can_manage_household = permissions.can_manage_household

View file

@ -1,10 +1,49 @@
from fastapi import HTTPException, status from fastapi import HTTPException, status
from pydantic import UUID4 from pydantic import UUID4
from mealie.schema.user.user import PrivateUser from mealie.schema.response.responses import ErrorResponse
from mealie.schema.user.user import PrivateUser, UserBase
permission_attrs = ["can_invite", "can_manage", "can_manage_household", "can_organize", "admin"]
def assert_user_change_allowed(id: UUID4, current_user: PrivateUser): def _assert_non_admin_user_change_allowed(user_id: UUID4, current_user: PrivateUser, new_data: UserBase):
if current_user.id != id and not current_user.admin: if current_user.id != user_id:
# only admins can edit other users # User is trying to edit another user
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="NOT_AN_ADMIN") raise HTTPException(status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User cannot edit other users"))
if any(getattr(current_user, p) != getattr(new_data, p) for p in permission_attrs):
# User is trying to change their own permissions
raise HTTPException(
status.HTTP_403_FORBIDDEN,
ErrorResponse.respond("User cannot change their own permissions"),
)
if current_user.group != new_data.group:
# prevent a regular user from changing their group
raise HTTPException(
status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User doesn't have permission to change their group")
)
if current_user.household != new_data.household:
# prevent a regular user from changing their household
raise HTTPException(
status.HTTP_403_FORBIDDEN,
ErrorResponse.respond("User doesn't have permission to change their household"),
)
def assert_user_change_allowed(user_id: UUID4, current_user: PrivateUser, new_data: UserBase):
if not current_user.admin:
_assert_non_admin_user_change_allowed(user_id, current_user, new_data)
return
if current_user.id != user_id:
raise HTTPException(status.HTTP_403_FORBIDDEN, ErrorResponse.respond("Use the Admin API to update other users"))
# Admin is trying to edit themselves
if any(getattr(current_user, p) != getattr(new_data, p) for p in permission_attrs):
# prevent an admin from excalating their own permissions
raise HTTPException(
status.HTTP_403_FORBIDDEN, ErrorResponse.respond("Admins can't change their own permissions")
)

View file

@ -46,13 +46,6 @@ class AdminUserController(BaseAdminController):
@admin_router.delete("/{item_id}") @admin_router.delete("/{item_id}")
def delete_user(self, item_id: UUID4): def delete_user(self, item_id: UUID4):
"""Removes a user from the database. Must be the current user or a super user"""
assert_user_change_allowed(item_id, self.user)
if item_id == 1: # TODO: identify super_user
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="SUPER_USER")
self.mixins.delete_one(item_id) self.mixins.delete_one(item_id)
@ -106,19 +99,7 @@ class UserController(BaseUserController):
@user_router.put("/{item_id}") @user_router.put("/{item_id}")
def update_user(self, item_id: UUID4, new_data: UserBase): def update_user(self, item_id: UUID4, new_data: UserBase):
assert_user_change_allowed(item_id, self.user) assert_user_change_allowed(item_id, self.user, new_data)
if not self.user.admin and (new_data.admin or self.user.group != new_data.group):
# prevent a regular user from doing admin tasks on themself
raise HTTPException(
status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User doesn't have permission to change group")
)
if self.user.id == item_id and self.user.admin and not new_data.admin:
# prevent an admin from demoting themself
raise HTTPException(
status.HTTP_403_FORBIDDEN, ErrorResponse.respond("User doesn't have permission to change group")
)
try: try:
self.repos.users.update(item_id, new_data.model_dump()) self.repos.users.update(item_id, new_data.model_dump())

View file

@ -23,7 +23,7 @@ class UserImageController(BaseUserController):
): ):
"""Updates a User Image""" """Updates a User Image"""
with get_temporary_path() as temp_path: with get_temporary_path() as temp_path:
assert_user_change_allowed(id, self.user) assert_user_change_allowed(id, self.user, self.user)
temp_img = temp_path.joinpath(profile.filename) temp_img = temp_path.joinpath(profile.filename)
with temp_img.open("wb") as buffer: with temp_img.open("wb") as buffer:

View file

@ -54,7 +54,7 @@ class UserRatingsController(BaseUserController):
@router.post("/{id}/ratings/{slug}") @router.post("/{id}/ratings/{slug}")
def set_rating(self, id: UUID4, slug: str, data: UserRatingUpdate): def set_rating(self, id: UUID4, slug: str, data: UserRatingUpdate):
"""Sets the user's rating for a recipe""" """Sets the user's rating for a recipe"""
assert_user_change_allowed(id, self.user) assert_user_change_allowed(id, self.user, self.user)
recipe = self.get_recipe_or_404(slug) recipe = self.get_recipe_or_404(slug)
user_rating = self.repos.user_ratings.get_by_user_and_recipe(id, recipe.id) user_rating = self.repos.user_ratings.get_by_user_and_recipe(id, recipe.id)

View file

@ -86,3 +86,16 @@ def test_set_member_permissions_no_user(
payload = get_permissions_payload(str(uuid4())) payload = get_permissions_payload(str(uuid4()))
response = api_client.put(api_routes.households_permissions, json=payload, headers=unique_user.token) response = api_client.put(api_routes.households_permissions, json=payload, headers=unique_user.token)
assert response.status_code == 404 assert response.status_code == 404
def test_set_own_permissions(api_client: TestClient, unique_user: TestUser):
database = unique_user.repos
user = database.users.get_one(unique_user.user_id)
assert user
user.can_manage = True
database.users.update(user.id, user)
form = {"user_id": str(unique_user.user_id), "canOrganize": not user.can_organize}
response = api_client.put(api_routes.households_permissions, json=form, headers=unique_user.token)
assert response.status_code == 403

View file

@ -1,8 +1,9 @@
import pytest import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from tests.utils import TestUser, api_routes from tests.utils import api_routes
from tests.utils.factories import random_email, random_int, random_string from tests.utils.factories import random_email, random_int, random_string
from tests.utils.fixture_schemas import TestUser
@pytest.mark.parametrize("use_admin_user", [True, False]) @pytest.mark.parametrize("use_admin_user", [True, False])
@ -43,3 +44,71 @@ def test_get_all_users_admin(request: pytest.FixtureRequest, api_client: TestCli
response_user_ids = {user["id"] for user in response.json()["items"]} response_user_ids = {user["id"] for user in response.json()["items"]}
for user_id in user_ids: for user_id in user_ids:
assert user_id in response_user_ids assert user_id in response_user_ids
def test_user_update(api_client: TestClient, unique_user: TestUser, admin_user: TestUser):
response = api_client.get(api_routes.users_self, headers=unique_user.token)
user = response.json()
# valid request without updates
response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=user, headers=unique_user.token)
assert response.status_code == 200
# valid request with updates
tmp_user = user.copy()
tmp_user["email"] = random_email()
tmp_user["full_name"] = random_string()
response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=unique_user.token)
assert response.status_code == 200
# test user attempting to update another user
form = {"email": admin_user.email, "full_name": admin_user.full_name}
response = api_client.put(api_routes.users_item_id(admin_user.user_id), json=form, headers=unique_user.token)
assert response.status_code == 403
# test user attempting permission changes
permissions = ["canInvite", "canManage", "canManageHousehold", "canOrganize", "advanced", "admin"]
for permission in permissions:
tmp_user = user.copy()
tmp_user[permission] = not user[permission]
response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=form, headers=unique_user.token)
assert response.status_code == 403
# test user attempting to change group
tmp_user = user.copy()
tmp_user["group"] = random_string()
response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=unique_user.token)
assert response.status_code == 403
# test user attempting to change household
tmp_user = user.copy()
tmp_user["household"] = random_string()
response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=unique_user.token)
assert response.status_code == 403
def test_admin_updates(api_client: TestClient, admin_user: TestUser, unique_user: TestUser):
response = api_client.get(api_routes.users_item_id(unique_user.user_id), headers=admin_user.token)
user = response.json()
response = api_client.get(api_routes.users_item_id(admin_user.user_id), headers=admin_user.token)
admin = response.json()
# admin updating themselves
tmp_user = admin.copy()
tmp_user["fullName"] = random_string()
response = api_client.put(api_routes.users_item_id(admin_user.user_id), json=tmp_user, headers=admin_user.token)
assert response.status_code == 200
# admin updating another user via the normal user route
tmp_user = user.copy()
tmp_user["fullName"] = random_string()
response = api_client.put(api_routes.users_item_id(unique_user.user_id), json=tmp_user, headers=admin_user.token)
assert response.status_code == 403
# admin updating their own permissions
permissions = ["canInvite", "canManage", "canManageHousehold", "canOrganize", "admin"]
for permission in permissions:
tmp_user = admin.copy()
tmp_user[permission] = not admin[permission]
response = api_client.put(api_routes.users_item_id(admin_user.user_id), json=tmp_user, headers=admin_user.token)
assert response.status_code == 403