mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-08-03 04:25:24 +02:00
Feature/database backups (#1040)
* add annotations to docs * alchemy data dumper * initial tests * sourcery refactor * db backups/restore * potential postgres fix * potential postgres fix * this is terrible * potential pg fix * cleanup * remove unused import * fix comparison * generate frontend types * update timestamp and add directory filter * rewrite to new admin-api * update backup routers * add file_token response helper * update imports * remove test_backup
This commit is contained in:
parent
2d1ef7173d
commit
8eefa05393
32 changed files with 756 additions and 229 deletions
|
@ -0,0 +1,48 @@
|
|||
import json
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
from mealie.services.backups_v2.alchemy_exporter import AlchemyExporter
|
||||
|
||||
|
||||
def test_alchemy_exporter():
|
||||
settings = get_app_settings()
|
||||
exporter = AlchemyExporter(settings.DB_URL)
|
||||
data = exporter.dump()
|
||||
|
||||
assert data["alembic_version"] == [{"version_num": "6b0f5f32d602"}]
|
||||
assert json.dumps(data, indent=4) # Make sure data is json-serializable
|
||||
|
||||
|
||||
def test_validate_schemas():
|
||||
schema = {
|
||||
"alembic_version": [{"version_num": "6b0f5f32d602"}],
|
||||
}
|
||||
match = {
|
||||
"alembic_version": [{"version_num": "6b0f5f32d602"}],
|
||||
}
|
||||
|
||||
invalid_version = {
|
||||
"alembic_version": [{"version_num": "not-valid-schema"}],
|
||||
}
|
||||
|
||||
assert AlchemyExporter.validate_schemas(schema, match)
|
||||
assert not AlchemyExporter.validate_schemas(schema, invalid_version)
|
||||
|
||||
schema_with_tables = {
|
||||
"alembic_version": [{"version_num": "6b0f5f32d602"}],
|
||||
"recipes": [
|
||||
{
|
||||
"id": 1,
|
||||
}
|
||||
],
|
||||
}
|
||||
match_with_tables = {
|
||||
"alembic_version": [{"version_num": "6b0f5f32d602"}],
|
||||
"recipes": [
|
||||
{
|
||||
"id": 2,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
assert AlchemyExporter.validate_schemas(schema_with_tables, match_with_tables)
|
|
@ -0,0 +1,56 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
from mealie.services.backups_v2.backup_file import BackupFile
|
||||
from tests import utils
|
||||
|
||||
|
||||
def zip_factory(temp_dir) -> Path:
|
||||
temp_zip = temp_dir / f"{utils.random_string()}.zip"
|
||||
|
||||
with ZipFile(temp_zip, "w") as zip_file:
|
||||
zip_file.writestr("test.txt", "test")
|
||||
|
||||
return temp_zip
|
||||
|
||||
|
||||
def test_backup_file_context_manager(tmp_path: Path):
|
||||
temp_zip = zip_factory(tmp_path)
|
||||
|
||||
backup_file = BackupFile(temp_zip)
|
||||
|
||||
with backup_file as _:
|
||||
assert backup_file.temp_dir.exists()
|
||||
temp_dir_path = backup_file.temp_dir
|
||||
|
||||
assert not backup_file.temp_dir
|
||||
assert not temp_dir_path.exists()
|
||||
|
||||
|
||||
def test_backup_file_invalid_zip(tmp_path: Path):
|
||||
temp_zip = zip_factory(tmp_path)
|
||||
|
||||
backup_file = BackupFile(temp_zip)
|
||||
|
||||
with backup_file as content:
|
||||
assert not content.validate()
|
||||
|
||||
|
||||
def test_backup_file_valid_zip(tmp_path: Path):
|
||||
dummy_dict = {"hello": "world"}
|
||||
|
||||
temp_zip = zip_factory(tmp_path)
|
||||
|
||||
# Add contents
|
||||
with ZipFile(temp_zip, "a") as zip_file:
|
||||
zip_file.writestr("data/test.txt", "test")
|
||||
zip_file.writestr("database.json", json.dumps(dummy_dict))
|
||||
|
||||
backup_file = BackupFile(temp_zip)
|
||||
|
||||
with backup_file as content:
|
||||
assert content.validate()
|
||||
|
||||
assert content.read_tables() == dummy_dict
|
||||
assert content.data_directory.joinpath("test.txt").is_file()
|
|
@ -0,0 +1,58 @@
|
|||
import filecmp
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
from mealie.services.backups_v2.alchemy_exporter import AlchemyExporter
|
||||
from mealie.services.backups_v2.backup_file import BackupFile
|
||||
from mealie.services.backups_v2.backup_v2 import BackupV2
|
||||
|
||||
|
||||
def dict_sorter(d: dict) -> Any:
|
||||
possible_keys = {"created_at", "id"}
|
||||
|
||||
return next((d[key] for key in possible_keys if key in d), 1)
|
||||
|
||||
|
||||
# For Future Use
|
||||
def match_file_tree(path_a: Path, path_b: Path):
|
||||
if path_a.is_dir() and path_b.is_dir():
|
||||
for a_file in path_a.iterdir():
|
||||
b_file = path_b.joinpath(a_file.name)
|
||||
assert b_file.exists()
|
||||
match_file_tree(a_file, b_file)
|
||||
else:
|
||||
assert filecmp(path_a, path_b)
|
||||
|
||||
|
||||
def test_database_backup():
|
||||
backup_v2 = BackupV2()
|
||||
path_to_backup = backup_v2.backup()
|
||||
|
||||
assert path_to_backup.exists()
|
||||
|
||||
backup = BackupFile(path_to_backup)
|
||||
|
||||
with backup as contents:
|
||||
assert contents.validate()
|
||||
|
||||
|
||||
def test_database_restore():
|
||||
settings = get_app_settings()
|
||||
|
||||
# Capture existing database snapshot
|
||||
original_exporter = AlchemyExporter(settings.DB_URL)
|
||||
snapshop_1 = original_exporter.dump()
|
||||
|
||||
# Create Backup
|
||||
backup_v2 = BackupV2(settings.DB_URL)
|
||||
path_to_backup = backup_v2.backup()
|
||||
|
||||
assert path_to_backup.exists()
|
||||
backup_v2.restore(path_to_backup)
|
||||
|
||||
new_exporter = AlchemyExporter(settings.DB_URL)
|
||||
snapshop_2 = new_exporter.dump()
|
||||
|
||||
for s1, s2 in zip(snapshop_1, snapshop_2):
|
||||
assert snapshop_1[s1].sort(key=dict_sorter) == snapshop_2[s2].sort(key=dict_sorter)
|
Loading…
Add table
Add a link
Reference in a new issue