1
0
Fork 0
mirror of https://github.com/mealie-recipes/mealie.git synced 2025-08-05 05:25:26 +02:00

API security hardening (#571)

* Enhance security and safety around user update API

- Prevent a regular user from promoting themself to admin
- Prevent an admin from demoting themself
- Refactor token fixture to admin + regular user tokens

* Restrict user CRUD API to admins

* Secure admin API routes

* Refactor APIrouter into Admin/UserAPIRouter

* Secure theme routes

* Make 'all recipes' routes public

* Secure favorite routes

* Remove redundant checks

* Fix public routes mistakenly flagged user routes

* Make webhooks changeable only by admin

* Allow users to create categories and tags

* Address lint issues
This commit is contained in:
sephrat 2021-06-22 20:22:15 +02:00 committed by GitHub
parent f5faff66d3
commit 6320ba7ec5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 456 additions and 347 deletions

View file

@ -9,7 +9,7 @@ from tests.app_routes import AppRoutes
@fixture(scope="session")
def default_user():
def admin_user():
return UserOut(
id=1,
fullName="Change Me",
@ -24,7 +24,7 @@ def default_user():
@fixture(scope="session")
def new_user():
return UserOut(
id=3,
id=4,
fullName="My New User",
username="My New User",
email="newuser@email.com",
@ -41,27 +41,27 @@ def test_failed_login(api_client: TestClient, api_routes: AppRoutes):
assert response.status_code == 401
def test_superuser_login(api_client: TestClient, api_routes: AppRoutes, token):
def test_superuser_login(api_client: TestClient, api_routes: AppRoutes, admin_token):
form_data = {"username": "changeme@email.com", "password": "MyPassword"}
response = api_client.post(api_routes.auth_token, form_data)
assert response.status_code == 200
new_token = json.loads(response.text).get("access_token")
response = api_client.get(api_routes.users_self, headers=token)
response = api_client.get(api_routes.users_self, headers=admin_token)
assert response.status_code == 200
return {"Authorization": f"Bearer {new_token}"}
def test_init_superuser(api_client: TestClient, api_routes: AppRoutes, token, default_user: UserOut):
response = api_client.get(api_routes.users_id(1), headers=token)
def test_init_superuser(api_client: TestClient, api_routes: AppRoutes, admin_token, admin_user: UserOut):
response = api_client.get(api_routes.users_id(1), headers=admin_token)
assert response.status_code == 200
assert json.loads(response.text) == default_user.dict(by_alias=True)
assert json.loads(response.text) == admin_user.dict(by_alias=True)
def test_create_user(api_client: TestClient, api_routes: AppRoutes, token, new_user):
def test_create_user(api_client: TestClient, api_routes: AppRoutes, admin_token, new_user):
create_data = {
"fullName": "My New User",
"email": "newuser@email.com",
@ -71,32 +71,74 @@ def test_create_user(api_client: TestClient, api_routes: AppRoutes, token, new_u
"tokens": [],
}
response = api_client.post(api_routes.users, json=create_data, headers=token)
response = api_client.post(api_routes.users, json=create_data, headers=admin_token)
assert response.status_code == 201
assert json.loads(response.text) == new_user.dict(by_alias=True)
assert True
def test_get_all_users(api_client: TestClient, api_routes: AppRoutes, token, new_user, default_user):
response = api_client.get(api_routes.users, headers=token)
def test_create_user_as_non_admin(api_client: TestClient, api_routes: AppRoutes, user_token):
create_data = {
"fullName": "My New User",
"email": "newuser@email.com",
"password": "MyStrongPassword",
"group": "Home",
"admin": False,
"tokens": [],
}
response = api_client.post(api_routes.users, json=create_data, headers=user_token)
assert response.status_code == 403
def test_get_all_users(api_client: TestClient, api_routes: AppRoutes, admin_token, new_user, admin_user):
response = api_client.get(api_routes.users, headers=admin_token)
assert response.status_code == 200
all_users = json.loads(response.text)
assert default_user.dict(by_alias=True) in all_users
assert admin_user.dict(by_alias=True) in all_users
assert new_user.dict(by_alias=True) in all_users
def test_update_user(api_client: TestClient, api_routes: AppRoutes, token):
def test_update_user(api_client: TestClient, api_routes: AppRoutes, admin_token):
update_data = {"id": 1, "fullName": "Updated Name", "email": "changeme@email.com", "group": "Home", "admin": True}
response = api_client.put(api_routes.users_id(1), headers=token, json=update_data)
response = api_client.put(api_routes.users_id(1), headers=admin_token, json=update_data)
assert response.status_code == 200
assert json.loads(response.text).get("access_token")
def test_reset_user_password(api_client: TestClient, api_routes: AppRoutes, token):
response = api_client.put(api_routes.users_id_reset_password(3), headers=token)
def test_update_other_user_as_not_admin(api_client: TestClient, api_routes: AppRoutes, user_token):
update_data = {"id": 1, "fullName": "Updated Name", "email": "changeme@email.com", "group": "Home", "admin": True}
response = api_client.put(api_routes.users_id(1), headers=user_token, json=update_data)
assert response.status_code == 403
def test_update_self_as_not_admin(api_client: TestClient, api_routes: AppRoutes, user_token):
update_data = {"id": 3, "fullName": "User fullname", "email": "user@email.com", "group": "Home", "admin": False}
response = api_client.put(api_routes.users_id(3), headers=user_token, json=update_data)
assert response.status_code == 200
def test_self_demote_admin(api_client: TestClient, api_routes: AppRoutes, admin_token):
update_data = {"id": 1, "fullName": "Updated Name", "email": "changeme@email.com", "group": "Home", "admin": False}
response = api_client.put(api_routes.users_id(1), headers=admin_token, json=update_data)
assert response.status_code == 403
def test_self_promote_admin(api_client: TestClient, api_routes: AppRoutes, user_token):
update_data = {"id": 3, "fullName": "Updated Name", "email": "user@email.com", "group": "Home", "admin": True}
response = api_client.put(api_routes.users_id(3), headers=user_token, json=update_data)
assert response.status_code == 403
def test_reset_user_password(api_client: TestClient, api_routes: AppRoutes, admin_token):
response = api_client.put(api_routes.users_id_reset_password(4), headers=admin_token)
assert response.status_code == 200
@ -106,23 +148,23 @@ def test_reset_user_password(api_client: TestClient, api_routes: AppRoutes, toke
assert response.status_code == 200
def test_delete_user(api_client: TestClient, api_routes: AppRoutes, token):
response = api_client.delete(api_routes.users_id(2), headers=token)
def test_delete_user(api_client: TestClient, api_routes: AppRoutes, admin_token):
response = api_client.delete(api_routes.users_id(2), headers=admin_token)
assert response.status_code == 200
def test_update_user_image(
api_client: TestClient, api_routes: AppRoutes, test_image_jpg: Path, test_image_png: Path, token
api_client: TestClient, api_routes: AppRoutes, test_image_jpg: Path, test_image_png: Path, admin_token
):
response = api_client.post(
api_routes.users_id_image(2), files={"profile_image": test_image_jpg.open("rb")}, headers=token
api_routes.users_id_image(2), files={"profile_image": test_image_jpg.open("rb")}, headers=admin_token
)
assert response.status_code == 200
response = api_client.post(
api_routes.users_id_image(2), files={"profile_image": test_image_png.open("rb")}, headers=token
api_routes.users_id_image(2), files={"profile_image": test_image_png.open("rb")}, headers=admin_token
)
assert response.status_code == 200