mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-08-04 21:15:22 +02:00
Convert scraper to use async (#1915)
* add httpx depedency for async http requests * rework scraper strategies to download recipe html asynchronously * rework recipe_data_service to download recipe images asynchronously * fix recipe_parser test, so it can use async results * fix bulk import so that it also works with async scraper * fix broken recipe_parser tests * Fix issues found by scanners * Add additional checks for ingredient and instruction count in test_create_by_url * Revert changes in test recipe_data Since we are checking ingredients and instructions in test_create_url now, these would fail with the stored html of recipe data * Add explicit type annotation in recipe_data_service.largest_content_len * Fix typo in annotation
This commit is contained in:
parent
7275dd2696
commit
3415a9c310
11 changed files with 129 additions and 115 deletions
|
@ -162,10 +162,10 @@ class RecipeController(BaseRecipeController):
|
|||
# URL Scraping Operations
|
||||
|
||||
@router.post("/create-url", status_code=201, response_model=str)
|
||||
def parse_recipe_url(self, req: ScrapeRecipe):
|
||||
async def parse_recipe_url(self, req: ScrapeRecipe):
|
||||
"""Takes in a URL and attempts to scrape data and load it into the database"""
|
||||
try:
|
||||
recipe, extras = create_from_url(req.url)
|
||||
recipe, extras = await create_from_url(req.url)
|
||||
except ForceTimeoutException as e:
|
||||
raise HTTPException(
|
||||
status_code=408, detail=ErrorResponse.respond(message="Recipe Scraping Timed Out")
|
||||
|
@ -206,10 +206,10 @@ class RecipeController(BaseRecipeController):
|
|||
return {"reportId": report_id}
|
||||
|
||||
@router.post("/test-scrape-url")
|
||||
def test_parse_recipe_url(self, url: ScrapeRecipeTest):
|
||||
async def test_parse_recipe_url(self, url: ScrapeRecipeTest):
|
||||
# Debugger should produce the same result as the scraper sees before cleaning
|
||||
try:
|
||||
if scraped_data := RecipeScraperPackage(url.url).scrape_url():
|
||||
if scraped_data := await RecipeScraperPackage(url.url).scrape_url():
|
||||
return scraped_data.schema.data
|
||||
except ForceTimeoutException as e:
|
||||
raise HTTPException(
|
||||
|
@ -381,12 +381,12 @@ class RecipeController(BaseRecipeController):
|
|||
# Image and Assets
|
||||
|
||||
@router.post("/{slug}/image", tags=["Recipe: Images and Assets"])
|
||||
def scrape_image_url(self, slug: str, url: ScrapeRecipe):
|
||||
async def scrape_image_url(self, slug: str, url: ScrapeRecipe):
|
||||
recipe = self.mixins.get_one(slug)
|
||||
data_service = RecipeDataService(recipe.id)
|
||||
|
||||
try:
|
||||
data_service.scrape_image(url.url)
|
||||
await data_service.scrape_image(url.url)
|
||||
except NotAnImageError as e:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import asyncio
|
||||
import shutil
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from httpx import AsyncClient, Response
|
||||
from pydantic import UUID4
|
||||
|
||||
from mealie.pkgs import img
|
||||
|
@ -13,28 +12,31 @@ from mealie.services._base_service import BaseService
|
|||
_FIREFOX_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0"
|
||||
|
||||
|
||||
async def gather_with_concurrency(n, *coros):
|
||||
semaphore = asyncio.Semaphore(n)
|
||||
|
||||
async def sem_coro(coro):
|
||||
async with semaphore:
|
||||
return await coro
|
||||
|
||||
return await asyncio.gather(*(sem_coro(c) for c in coros))
|
||||
|
||||
|
||||
async def largest_content_len(urls: list[str]) -> tuple[str, int]:
|
||||
largest_url = ""
|
||||
largest_len = 0
|
||||
|
||||
def do(session: requests.Session, url: str):
|
||||
def _do() -> requests.Response:
|
||||
return session.head(url, headers={"User-Agent": _FIREFOX_UA})
|
||||
async def do(client: AsyncClient, url: str) -> Response:
|
||||
return await client.head(url, headers={"User-Agent": _FIREFOX_UA})
|
||||
|
||||
return _do
|
||||
|
||||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||||
with requests.Session() as session:
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
tasks = [loop.run_in_executor(executor, do(session, url)) for url in urls]
|
||||
|
||||
response: requests.Response # required for type hinting within the loop
|
||||
for response in await asyncio.gather(*tasks):
|
||||
async with AsyncClient() as client:
|
||||
tasks = [do(client, url) for url in urls]
|
||||
responses: list[Response] = await gather_with_concurrency(10, *tasks)
|
||||
for response in responses:
|
||||
|
||||
len_int = int(response.headers.get("Content-Length", 0))
|
||||
if len_int > largest_len:
|
||||
largest_url = response.url
|
||||
largest_url = str(response.url)
|
||||
largest_len = len_int
|
||||
|
||||
return largest_url, largest_len
|
||||
|
@ -107,8 +109,8 @@ class RecipeDataService(BaseService):
|
|||
|
||||
return True
|
||||
|
||||
def scrape_image(self, image_url) -> None:
|
||||
self.logger.debug(f"Image URL: {image_url}")
|
||||
async def scrape_image(self, image_url) -> None:
|
||||
self.logger.info(f"Image URL: {image_url}")
|
||||
|
||||
if not self._validate_image_url(image_url):
|
||||
self.logger.error(f"Invalid image URL: {image_url}")
|
||||
|
@ -121,15 +123,7 @@ class RecipeDataService(BaseService):
|
|||
# Multiple images have been defined in the schema - usually different resolutions
|
||||
# Typically would be in smallest->biggest order, but can't be certain so test each.
|
||||
# 'Google will pick the best image to display in Search results based on the aspect ratio and resolution.'
|
||||
|
||||
# TODO: We should refactor the scraper to use a async session provided by FastAPI using a sync
|
||||
# route instead of bootstrapping async behavior this far down the chain. Will require some work
|
||||
# so leaving this improvement here for now.
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
future = asyncio.ensure_future(largest_content_len(image_url))
|
||||
loop.run_until_complete(future)
|
||||
image_url, _ = future.result()
|
||||
image_url, _ = await largest_content_len(image_url)
|
||||
|
||||
elif isinstance(image_url, dict): # Handles Dictionary Types
|
||||
for key in image_url:
|
||||
|
@ -144,8 +138,9 @@ class RecipeDataService(BaseService):
|
|||
file_name = f"{str(self.recipe_id)}.{ext}"
|
||||
file_path = Recipe.directory_from_id(self.recipe_id).joinpath("images", file_name)
|
||||
|
||||
async with AsyncClient() as client:
|
||||
try:
|
||||
r = requests.get(image_url, stream=True, headers={"User-Agent": _FIREFOX_UA})
|
||||
r = await client.get(image_url, headers={"User-Agent": _FIREFOX_UA})
|
||||
except Exception:
|
||||
self.logger.exception("Fatal Image Request Exception")
|
||||
return None
|
||||
|
@ -161,8 +156,7 @@ class RecipeDataService(BaseService):
|
|||
self.logger.error(f"Content-Type: {content_type} is not an image")
|
||||
raise NotAnImageError(f"Content-Type {content_type} is not an image")
|
||||
|
||||
r.raw.decode_content = True
|
||||
self.logger.info(f"File Name Suffix {file_path.suffix}")
|
||||
self.write_image(r.raw, file_path.suffix)
|
||||
self.write_image(r.read(), file_path.suffix)
|
||||
|
||||
file_path.unlink(missing_ok=True)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
from asyncio import gather
|
||||
|
||||
from pydantic import UUID4
|
||||
|
||||
from mealie.repos.repository_factory import AllRepositories
|
||||
from mealie.schema.recipe.recipe import CreateRecipeByUrlBulk
|
||||
from mealie.schema.recipe.recipe import CreateRecipeByUrlBulk, Recipe
|
||||
from mealie.schema.reports.reports import ReportCategory, ReportCreate, ReportEntryCreate, ReportSummaryStatus
|
||||
from mealie.schema.user.user import GroupInDB
|
||||
from mealie.services._base_service import BaseService
|
||||
|
@ -65,18 +67,24 @@ class RecipeBulkScraperService(BaseService):
|
|||
|
||||
self.repos.group_reports.update(self.report.id, self.report)
|
||||
|
||||
def scrape(self, urls: CreateRecipeByUrlBulk) -> None:
|
||||
if self.report is None:
|
||||
self.get_report_id()
|
||||
|
||||
for b in urls.imports:
|
||||
|
||||
async def scrape(self, urls: CreateRecipeByUrlBulk) -> None:
|
||||
async def _do(url: str) -> Recipe | None:
|
||||
try:
|
||||
recipe, _ = create_from_url(b.url)
|
||||
recipe, _ = await create_from_url(url)
|
||||
return recipe
|
||||
except Exception as e:
|
||||
self.service.logger.error(f"failed to scrape url during bulk url import {b.url}")
|
||||
self.service.logger.exception(e)
|
||||
self._add_error_entry(f"failed to scrape url {b.url}", str(e))
|
||||
self._add_error_entry(f"failed to scrape url {url}", str(e))
|
||||
return None
|
||||
|
||||
if self.report is None:
|
||||
self.get_report_id()
|
||||
tasks = [_do(b.url) for b in urls.imports]
|
||||
results = await gather(*tasks)
|
||||
for b, recipe in zip(urls.imports, results, strict=True):
|
||||
|
||||
if not recipe:
|
||||
continue
|
||||
|
||||
if b.tags:
|
||||
|
|
|
@ -21,14 +21,14 @@ class RecipeScraper:
|
|||
|
||||
self.scrapers = scrapers
|
||||
|
||||
def scrape(self, url: str) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]:
|
||||
async def scrape(self, url: str) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]:
|
||||
"""
|
||||
Scrapes a recipe from the web.
|
||||
"""
|
||||
|
||||
for scraper_type in self.scrapers:
|
||||
scraper = scraper_type(url)
|
||||
result = scraper.parse()
|
||||
result = await scraper.parse()
|
||||
|
||||
if result is not None:
|
||||
return result
|
||||
|
|
|
@ -19,7 +19,7 @@ class ParserErrors(str, Enum):
|
|||
CONNECTION_ERROR = "CONNECTION_ERROR"
|
||||
|
||||
|
||||
def create_from_url(url: str) -> tuple[Recipe, ScrapedExtras | None]:
|
||||
async def create_from_url(url: str) -> tuple[Recipe, ScrapedExtras | None]:
|
||||
"""Main entry point for generating a recipe from a URL. Pass in a URL and
|
||||
a Recipe object will be returned if successful.
|
||||
|
||||
|
@ -30,7 +30,7 @@ def create_from_url(url: str) -> tuple[Recipe, ScrapedExtras | None]:
|
|||
Recipe: Recipe Object
|
||||
"""
|
||||
scraper = RecipeScraper()
|
||||
new_recipe, extras = scraper.scrape(url)
|
||||
new_recipe, extras = await scraper.scrape(url)
|
||||
|
||||
if not new_recipe:
|
||||
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": ParserErrors.BAD_RECIPE_DATA.value})
|
||||
|
@ -42,7 +42,7 @@ def create_from_url(url: str) -> tuple[Recipe, ScrapedExtras | None]:
|
|||
recipe_data_service = RecipeDataService(new_recipe.id)
|
||||
|
||||
try:
|
||||
recipe_data_service.scrape_image(new_recipe.image)
|
||||
await recipe_data_service.scrape_image(new_recipe.image)
|
||||
|
||||
if new_recipe.name is None:
|
||||
new_recipe.name = "Untitled"
|
||||
|
|
|
@ -3,8 +3,8 @@ from abc import ABC, abstractmethod
|
|||
from typing import Any, Callable
|
||||
|
||||
import extruct
|
||||
import requests
|
||||
from fastapi import HTTPException, status
|
||||
from httpx import AsyncClient
|
||||
from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, scrape_html
|
||||
from slugify import slugify
|
||||
from w3lib.html import get_base_url
|
||||
|
@ -23,19 +23,19 @@ class ForceTimeoutException(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def safe_scrape_html(url: str) -> str:
|
||||
async def safe_scrape_html(url: str) -> str:
|
||||
"""
|
||||
Scrapes the html from a url but will cancel the request
|
||||
if the request takes longer than 15 seconds. This is used to mitigate
|
||||
DDOS attacks from users providing a url with arbitrary large content.
|
||||
"""
|
||||
resp = requests.get(url, timeout=SCRAPER_TIMEOUT, stream=True, headers={"User-Agent": _FIREFOX_UA})
|
||||
|
||||
async with AsyncClient() as client:
|
||||
html_bytes = b""
|
||||
async with client.stream("GET", url, timeout=SCRAPER_TIMEOUT, headers={"User-Agent": _FIREFOX_UA}) as resp:
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
for chunk in resp.iter_content(chunk_size=1024):
|
||||
async for chunk in resp.aiter_bytes(chunk_size=1024):
|
||||
html_bytes += chunk
|
||||
|
||||
if time.time() - start_time > SCRAPER_TIMEOUT:
|
||||
|
@ -82,7 +82,7 @@ class ABCScraperStrategy(ABC):
|
|||
self.url = url
|
||||
|
||||
@abstractmethod
|
||||
def parse(self) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]:
|
||||
async def parse(self) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]:
|
||||
"""Parse a recipe from a web URL.
|
||||
|
||||
Args:
|
||||
|
@ -159,9 +159,8 @@ class RecipeScraperPackage(ABCScraperStrategy):
|
|||
|
||||
return recipe, extras
|
||||
|
||||
def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None:
|
||||
recipe_html = safe_scrape_html(self.url)
|
||||
|
||||
async def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None:
|
||||
recipe_html = await safe_scrape_html(self.url)
|
||||
try:
|
||||
scraped_schema = scrape_html(recipe_html, org_url=self.url)
|
||||
except (NoSchemaFoundInWildMode, AttributeError):
|
||||
|
@ -188,11 +187,11 @@ class RecipeScraperPackage(ABCScraperStrategy):
|
|||
self.logger.debug(f"Recipe Scraper [Package] was unable to extract a recipe from {self.url}")
|
||||
return None
|
||||
|
||||
def parse(self):
|
||||
async def parse(self):
|
||||
"""
|
||||
Parse a recipe from a given url.
|
||||
"""
|
||||
scraped_data = self.scrape_url()
|
||||
scraped_data = await self.scrape_url()
|
||||
|
||||
if scraped_data is None:
|
||||
return None
|
||||
|
@ -205,8 +204,8 @@ class RecipeScraperOpenGraph(ABCScraperStrategy):
|
|||
Abstract class for all recipe parsers.
|
||||
"""
|
||||
|
||||
def get_html(self) -> str:
|
||||
return safe_scrape_html(self.url)
|
||||
async def get_html(self) -> str:
|
||||
return await safe_scrape_html(self.url)
|
||||
|
||||
def get_recipe_fields(self, html) -> dict | None:
|
||||
"""
|
||||
|
@ -242,11 +241,11 @@ class RecipeScraperOpenGraph(ABCScraperStrategy):
|
|||
"extras": [],
|
||||
}
|
||||
|
||||
def parse(self):
|
||||
async def parse(self):
|
||||
"""
|
||||
Parse a recipe from a given url.
|
||||
"""
|
||||
html = self.get_html()
|
||||
html = await self.get_html()
|
||||
|
||||
og_data = self.get_recipe_fields(html)
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ bcrypt = "^4.0.1"
|
|||
extruct = "^0.14.0"
|
||||
fastapi = "^0.89.0"
|
||||
gunicorn = "^20.1.0"
|
||||
httpx = "^0.23.1"
|
||||
lxml = "^4.7.1"
|
||||
orjson = "^3.8.0"
|
||||
passlib = "^1.7.4"
|
||||
|
@ -40,7 +41,6 @@ recipe-scrapers = "^14.26.0"
|
|||
requests = "^2.25.1"
|
||||
tzdata = "^2022.7"
|
||||
uvicorn = {extras = ["standard"], version = "^0.20.0"}
|
||||
httpx = "^0.23.1"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
black = "^21.12b0"
|
||||
|
@ -53,6 +53,7 @@ pre-commit = "^2.20.0"
|
|||
pydantic-to-typescript = "^1.0.7"
|
||||
pylint = "^2.6.0"
|
||||
pytest = "^7.2.0"
|
||||
pytest-asyncio = "^0.20.3"
|
||||
rich = "^13.0.0"
|
||||
ruff = "^0.0.221"
|
||||
types-PyYAML = "^6.0.4"
|
||||
|
@ -61,6 +62,7 @@ types-python-slugify = "^6.0.0"
|
|||
types-requests = "^2.27.12"
|
||||
types-urllib3 = "^1.26.11"
|
||||
|
||||
|
||||
[build-system]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
|
|
|
@ -21,7 +21,7 @@ def test_bulk_import(api_client: TestClient, unique_user: TestUser):
|
|||
|
||||
response = api_client.post(api_routes.recipes_create_url_bulk, json=recipes, headers=unique_user.token)
|
||||
|
||||
assert response.status_code == 201
|
||||
assert response.status_code == 202
|
||||
|
||||
for slug in slugs:
|
||||
response = api_client.get(api_routes.recipes_slug(slug), headers=unique_user.token)
|
||||
|
|
|
@ -89,6 +89,16 @@ def test_create_by_url(
|
|||
assert response.status_code == 201
|
||||
assert json.loads(response.text) == recipe_data.expected_slug
|
||||
|
||||
recipe = api_client.get(api_routes.recipes_slug(recipe_data.expected_slug), headers=unique_user.token)
|
||||
|
||||
assert recipe.status_code == 200
|
||||
|
||||
recipe_dict: dict = json.loads(recipe.text)
|
||||
|
||||
assert recipe_dict["slug"] == recipe_data.expected_slug
|
||||
assert len(recipe_dict["recipeInstructions"]) == recipe_data.num_steps
|
||||
assert len(recipe_dict["recipeIngredient"]) == recipe_data.num_ingredients
|
||||
|
||||
|
||||
def test_create_by_url_with_tags(
|
||||
api_client: TestClient,
|
||||
|
|
|
@ -16,8 +16,9 @@ and then use this test case by removing the `@pytest.mark.skip` and than testing
|
|||
|
||||
@pytest.mark.skipif(True, reason="Long Running API Test - manually run when updating the parser")
|
||||
@pytest.mark.parametrize("recipe_test_data", test_cases)
|
||||
def test_recipe_parser(recipe_test_data: RecipeSiteTestCase):
|
||||
recipe, _ = scraper.create_from_url(recipe_test_data.url)
|
||||
@pytest.mark.asyncio
|
||||
async def test_recipe_parser(recipe_test_data: RecipeSiteTestCase):
|
||||
recipe, _ = await scraper.create_from_url(recipe_test_data.url)
|
||||
|
||||
assert recipe.slug == recipe_test_data.expected_slug
|
||||
assert len(recipe.recipe_instructions) == recipe_test_data.num_steps
|
||||
|
|
|
@ -45,7 +45,7 @@ def get_recipe_test_cases():
|
|||
html="jam-roly-poly-with-custard.html",
|
||||
html_file=test_data.html_jam_roly_poly_with_custard,
|
||||
expected_slug="jam-roly-poly-with-custard",
|
||||
num_ingredients=13,
|
||||
num_ingredients=11,
|
||||
num_steps=9,
|
||||
),
|
||||
RecipeSiteTestCase(
|
||||
|
@ -54,7 +54,7 @@ def get_recipe_test_cases():
|
|||
html_file=test_data.html_sous_vide_shrimp,
|
||||
expected_slug="sous-vide-shrimp",
|
||||
num_ingredients=5,
|
||||
num_steps=0,
|
||||
num_steps=1,
|
||||
),
|
||||
RecipeSiteTestCase(
|
||||
url="https://www.bonappetit.com/recipe/detroit-style-pepperoni-pizza",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue