mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-08-05 05:25:26 +02:00
security: implement user lockout (#1552)
* add data-types required for login security * implement user lockout checking at login * cleanup legacy patterns * expose passwords in test_user * test user lockout after bad attempts * test user service * bump alembic version * save increment to database * add locked_at to datetime transformer on import * do proper test cleanup * implement scheduled task * spelling * document env variables * implement context manager for session * use context manager * implement reset script * cleanup generator * run generator * implement API endpoint for resetting locked users * add button to reset all locked users * add info when account is locked * use ignore instead of expect-error
This commit is contained in:
parent
ca64584fd1
commit
b3c41a4bd0
35 changed files with 450 additions and 46 deletions
1
tests/fixtures/fixture_admin.py
vendored
1
tests/fixtures/fixture_admin.py
vendored
|
@ -32,6 +32,7 @@ def admin_user(api_client: TestClient, api_routes: utils.AppRoutes):
|
|||
yield utils.TestUser(
|
||||
_group_id=user_data.get("groupId"),
|
||||
user_id=user_data.get("id"),
|
||||
password=settings.DEFAULT_PASSWORD,
|
||||
username=user_data.get("username"),
|
||||
email=user_data.get("email"),
|
||||
token=token,
|
||||
|
|
4
tests/fixtures/fixture_users.py
vendored
4
tests/fixtures/fixture_users.py
vendored
|
@ -26,6 +26,7 @@ def build_unique_user(group: str, api_client: TestClient) -> utils.TestUser:
|
|||
_group_id=user_data.get("groupId"),
|
||||
user_id=user_data.get("id"),
|
||||
email=user_data.get("email"),
|
||||
password=registration.password,
|
||||
username=user_data.get("username"),
|
||||
token=token,
|
||||
)
|
||||
|
@ -67,6 +68,7 @@ def g2_user(admin_token, api_client: TestClient, api_routes: utils.AppRoutes):
|
|||
user_id=user_id,
|
||||
_group_id=group_id,
|
||||
token=token,
|
||||
password="useruser",
|
||||
email=create_data["email"],
|
||||
username=create_data.get("username"),
|
||||
)
|
||||
|
@ -92,6 +94,7 @@ def unique_user(api_client: TestClient, api_routes: utils.AppRoutes):
|
|||
yield utils.TestUser(
|
||||
_group_id=user_data.get("groupId"),
|
||||
user_id=user_data.get("id"),
|
||||
password=registration.password,
|
||||
email=user_data.get("email"),
|
||||
username=user_data.get("username"),
|
||||
token=token,
|
||||
|
@ -144,6 +147,7 @@ def user_tuple(admin_token, api_client: TestClient, api_routes: utils.AppRoutes)
|
|||
_group_id=user_data.get("groupId"),
|
||||
user_id=user_data.get("id"),
|
||||
username=user_data.get("username"),
|
||||
password="useruser",
|
||||
email=user_data.get("email"),
|
||||
token=token,
|
||||
)
|
||||
|
|
|
@ -3,6 +3,8 @@ import json
|
|||
from fastapi.testclient import TestClient
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
from mealie.repos.repository_factory import AllRepositories
|
||||
from mealie.services.user_services.user_service import UserService
|
||||
from tests.utils.app_routes import AppRoutes
|
||||
from tests.utils.fixture_schemas import TestUser
|
||||
|
||||
|
@ -35,3 +37,27 @@ def test_user_token_refresh(api_client: TestClient, api_routes: AppRoutes, admin
|
|||
response = api_client.post(api_routes.auth_refresh, headers=admin_user.token)
|
||||
response = api_client.get(api_routes.users_self, headers=admin_user.token)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_user_lockout_after_bad_attemps(api_client: TestClient, unique_user: TestUser, database: AllRepositories):
|
||||
"""
|
||||
if the user has more than 5 bad login attemps the user will be locked out for 4 hours
|
||||
This only applies if there is a user in the database with the same username
|
||||
"""
|
||||
routes = AppRoutes()
|
||||
settings = get_app_settings()
|
||||
|
||||
for _ in range(settings.SECURITY_MAX_LOGIN_ATTEMPTS):
|
||||
form_data = {"username": unique_user.email, "password": "bad_password"}
|
||||
response = api_client.post(routes.auth_token, form_data)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
valid_data = {"username": unique_user.email, "password": unique_user.password}
|
||||
response = api_client.post(routes.auth_token, valid_data)
|
||||
assert response.status_code == 423
|
||||
|
||||
# Cleanup
|
||||
user_service = UserService(database)
|
||||
user = database.users.get_one(unique_user.user_id)
|
||||
user_service.unlock_user(user)
|
||||
|
|
|
@ -4,7 +4,7 @@ from mealie.core.config import get_app_settings
|
|||
from mealie.services.backups_v2.alchemy_exporter import AlchemyExporter
|
||||
|
||||
ALEMBIC_VERSIONS = [
|
||||
{"version_num": "f30cf048c228"},
|
||||
{"version_num": "188374910655"},
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from mealie.repos.repository_factory import AllRepositories
|
||||
from mealie.services.user_services.user_service import UserService
|
||||
from tests.utils.fixture_schemas import TestUser
|
||||
|
||||
|
||||
def test_get_locked_users(database: AllRepositories, user_tuple: list[TestUser]) -> None:
|
||||
usr_1, usr_2 = user_tuple
|
||||
|
||||
# Setup
|
||||
user_service = UserService(database)
|
||||
|
||||
user_1 = database.users.get_one(usr_1.user_id)
|
||||
user_2 = database.users.get_one(usr_2.user_id)
|
||||
|
||||
locked_users = user_service.get_locked_users()
|
||||
assert len(locked_users) == 0
|
||||
|
||||
user_1 = user_service.lock_user(user_1)
|
||||
|
||||
locked_users = user_service.get_locked_users()
|
||||
assert len(locked_users) == 1
|
||||
assert locked_users[0].id == user_1.id
|
||||
|
||||
user_2 = user_service.lock_user(user_2)
|
||||
|
||||
locked_users = user_service.get_locked_users()
|
||||
assert len(locked_users) == 2
|
||||
|
||||
for locked_user in locked_users:
|
||||
if locked_user.id == user_1.id:
|
||||
assert locked_user.locked_at == user_1.locked_at
|
||||
elif locked_user.id == user_2.id:
|
||||
assert locked_user.locked_at == user_2.locked_at
|
||||
else:
|
||||
assert False
|
||||
|
||||
# Cleanup
|
||||
user_service.unlock_user(user_1)
|
||||
user_service.unlock_user(user_2)
|
||||
|
||||
|
||||
def test_lock_unlocker_user(database: AllRepositories, unique_user: TestUser) -> None:
|
||||
user_service = UserService(database)
|
||||
|
||||
# Test that the user is unlocked
|
||||
user = database.users.get_one(unique_user.user_id)
|
||||
assert not user.locked_at
|
||||
|
||||
# Test that the user is locked
|
||||
locked_user = user_service.lock_user(user)
|
||||
|
||||
assert locked_user.locked_at
|
||||
assert locked_user.is_locked
|
||||
|
||||
unlocked_user = user_service.unlock_user(locked_user)
|
||||
assert not unlocked_user.locked_at
|
||||
assert not unlocked_user.is_locked
|
||||
|
||||
# Sanity check that the is_locked property is working
|
||||
user.locked_at = datetime.now() - timedelta(days=2)
|
||||
assert not user.is_locked
|
|
@ -8,6 +8,7 @@ class TestUser:
|
|||
email: str
|
||||
user_id: UUID
|
||||
username: str
|
||||
password: str
|
||||
_group_id: UUID
|
||||
token: Any
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue