mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-08-03 04:25:24 +02:00
feat: LDAP Improvements and E2E testing (#2199)
* add option to enable starttls for ldap * add integration test for ldap service * document new, optional environment variable * fix: support anonymous bind * id and mail attributes in LDAP_USER_FILTER should be implied * remove print statement
This commit is contained in:
parent
93eb2af087
commit
7d9be67432
8 changed files with 276 additions and 28 deletions
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
import os
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
import pytest
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
from mealie.repos.repository_factory import AllRepositories
|
||||
|
@ -57,3 +58,192 @@ def test_user_lockout_after_bad_attemps(api_client: TestClient, unique_user: Tes
|
|||
user_service = UserService(database)
|
||||
user = database.users.get_one(unique_user.user_id)
|
||||
user_service.unlock_user(user)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login(api_client: TestClient):
|
||||
form_data = {"username": "bender", "password": "bender"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "bender"
|
||||
assert data.get("fullName") == "Bender Bending Rodríguez"
|
||||
assert data.get("email") == "bender@planetexpress.com"
|
||||
assert data.get("admin") is False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login_bad_password(api_client: TestClient):
|
||||
form_data = {"username": "bender", "password": "BAD_PASS"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_admin_login(api_client: TestClient):
|
||||
form_data = {"username": "professor", "password": "professor"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "professor"
|
||||
assert data.get("fullName") == "Hubert J. Farnsworth"
|
||||
assert data.get("email") in ["professor@planetexpress.com", "hubert@planetexpress.com"]
|
||||
assert data.get("admin") is True
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_not_in_filter(api_client: TestClient):
|
||||
form_data = {"username": "amy", "password": "amy"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login_starttls(api_client: TestClient):
|
||||
settings = get_app_settings()
|
||||
settings.LDAP_ENABLE_STARTTLS = True
|
||||
|
||||
form_data = {"username": "bender", "password": "bender"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "bender"
|
||||
assert data.get("fullName") == "Bender Bending Rodríguez"
|
||||
assert data.get("email") == "bender@planetexpress.com"
|
||||
assert data.get("admin") is False
|
||||
|
||||
get_app_settings.cache_clear()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login_anonymous_bind(api_client: TestClient):
|
||||
settings = get_app_settings()
|
||||
settings.LDAP_QUERY_BIND = None
|
||||
settings.LDAP_QUERY_PASSWORD = None
|
||||
|
||||
form_data = {"username": "bender", "password": "bender"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "bender"
|
||||
assert data.get("fullName") == "Bender Bending Rodríguez"
|
||||
assert data.get("email") == "bender@planetexpress.com"
|
||||
assert data.get("admin") is False
|
||||
|
||||
get_app_settings.cache_clear()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login_no_filter(api_client: TestClient):
|
||||
settings = get_app_settings()
|
||||
settings.LDAP_USER_FILTER = None
|
||||
|
||||
form_data = {"username": "amy", "password": "amy"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "amy"
|
||||
assert data.get("fullName") == "Amy Wong"
|
||||
assert data.get("email") == "amy@planetexpress.com"
|
||||
assert data.get("admin") is False
|
||||
|
||||
get_app_settings.cache_clear()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login_simple_filter(api_client: TestClient):
|
||||
settings = get_app_settings()
|
||||
settings.LDAP_USER_FILTER = "(memberOf=cn=ship_crew,ou=people,dc=planetexpress,dc=com)"
|
||||
|
||||
form_data = {"username": "bender", "password": "bender"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "bender"
|
||||
assert data.get("fullName") == "Bender Bending Rodríguez"
|
||||
assert data.get("email") == "bender@planetexpress.com"
|
||||
assert data.get("admin") is False
|
||||
|
||||
get_app_settings.cache_clear()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("GITHUB_ACTIONS", False), reason="requires ldap service in github actions")
|
||||
def test_ldap_user_login_complex_filter(api_client: TestClient):
|
||||
settings = get_app_settings()
|
||||
settings.LDAP_USER_FILTER = "(&(objectClass=inetOrgPerson)(|(memberOf=cn=ship_crew,ou=people,dc=planetexpress,dc=com)(memberOf=cn=admin_staff,ou=people,dc=planetexpress,dc=com)))"
|
||||
|
||||
form_data = {"username": "professor", "password": "professor"}
|
||||
response = api_client.post(api_routes.auth_token, data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data is not None
|
||||
assert data.get("access_token") is not None
|
||||
|
||||
response = api_client.get(api_routes.users_self, headers={"Authorization": f"Bearer {data.get('access_token')}"})
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data.get("username") == "professor"
|
||||
assert data.get("fullName") == "Hubert J. Farnsworth"
|
||||
assert data.get("email") in ["professor@planetexpress.com", "hubert@planetexpress.com"]
|
||||
assert data.get("admin") is True
|
||||
|
||||
get_app_settings.cache_clear()
|
||||
|
|
|
@ -52,11 +52,18 @@ class LdapConnMock:
|
|||
self.app_settings.LDAP_NAME_ATTRIBUTE,
|
||||
self.app_settings.LDAP_MAIL_ATTRIBUTE,
|
||||
]
|
||||
assert filter == self.app_settings.LDAP_USER_FILTER.format(
|
||||
user_filter = self.app_settings.LDAP_USER_FILTER.format(
|
||||
id_attribute=self.app_settings.LDAP_ID_ATTRIBUTE,
|
||||
mail_attribute=self.app_settings.LDAP_MAIL_ATTRIBUTE,
|
||||
input=self.user,
|
||||
)
|
||||
search_filter = "(&(|({id_attribute}={input})({mail_attribute}={input})){filter})".format(
|
||||
id_attribute=self.app_settings.LDAP_ID_ATTRIBUTE,
|
||||
mail_attribute=self.app_settings.LDAP_MAIL_ATTRIBUTE,
|
||||
input=self.user,
|
||||
filter=user_filter,
|
||||
)
|
||||
assert filter == search_filter
|
||||
assert dn == self.app_settings.LDAP_BASE_DN
|
||||
assert scope == ldap.SCOPE_SUBTREE
|
||||
|
||||
|
@ -77,6 +84,9 @@ class LdapConnMock:
|
|||
def unbind_s(self):
|
||||
pass
|
||||
|
||||
def start_tls_s(self):
|
||||
pass
|
||||
|
||||
|
||||
def setup_env(monkeypatch: MonkeyPatch):
|
||||
user = random_string(10)
|
||||
|
@ -204,6 +214,9 @@ def test_ldap_disabled(monkeypatch: MonkeyPatch):
|
|||
def unbind_s(self):
|
||||
pass
|
||||
|
||||
def start_tls_s(self):
|
||||
pass
|
||||
|
||||
def ldap_initialize_mock(url):
|
||||
assert url == ""
|
||||
return LdapConnMock()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue