diff --git a/.gitignore b/.gitignore index 690dec2..a898175 100644 --- a/.gitignore +++ b/.gitignore @@ -129,3 +129,7 @@ dmypy.json # Pyre type checker .pyre/ + + +# Project-specific +html \ No newline at end of file diff --git a/app/information_kp.py b/app/information_kp.py new file mode 100644 index 0000000..8d01276 --- /dev/null +++ b/app/information_kp.py @@ -0,0 +1,92 @@ +from pathlib import Path + +import markdown +import yaml +from openfoodfacts.types import Country, Lang + +from app.models import KnowledgeContent, KnowledgeContentItem + + +def find_kp_html_path( + root_dir: Path, tag_type: str, value_tag: str, country: Country, lang: Lang +) -> Path | None: + """Return the Path of the HTML page related to an information knowledge panel, if it exists. + + We first check that a knowledge panel exists for the provided `country`, with a fallback + to `Country.world` otherwise. + + Args: + root_dir: the root directory where HTML pages are located + tag_type: the tag type (ex: 'category', 'label',...) + value_tag: the tag value (ex: `en:ab-agriculture-biologique`) + country: the Country of the information knowledge panel + lang: the language code of the information knowledge panel + + Returns: + Path: the Path of the HTML page or None if not found + """ + base_dir = root_dir / tag_type / value_tag.replace(":", "_") + if not base_dir.exists(): + return None + + html_file_paths = list(base_dir.glob("*.html")) + # file names follows the schema '{country}_{lang}.html' + # Filter by lang + html_file_paths = [p for p in html_file_paths if lang.value == p.stem.split("_")[1]] + + if not html_file_paths: + return None + + country_targets = [country] if country is Country.world else [country, Country.world] + for country_target in country_targets: + country_specific_html_files = [ + p for p in html_file_paths if country_target.value == p.stem.split("_")[0] + ] + + if country_specific_html_files: + return country_specific_html_files[0] + + return None + + +def generate_file_path(root_dir: Path, item: KnowledgeContentItem) -> Path: + """Generate a file path unique to the knowledge content item. + + The generated path depends on the `tag_type`, the `value_tag`, the + `country` and `lang`. + + Args: + root_dir: the root directory where HTML pages are located + item: the knowledge content item + + Returns: + Path: the path where the HTML page should be saved + """ + return ( + root_dir + / item.tag_type + / item.value_tag.replace(":", "_") + / f"{item.country.name}_{item.lang.name}.html" + ) + + +def build_content(root_dir: Path, file_path: Path): + """Build content as HTML pages from `file_path` (a YAML file). + + The YAML file should follows the schema of `KnowledgeContent`. + Files are saved as HTML files under `root_dir`, see + `generate_file_path` for more information about how paths + are generated. + + Args: + root_dir: the root directory where HTML pages are located + file_path: the input YAML file path + """ + with file_path.open("r") as f: + data = yaml.safe_load(f) + knowledge_items = KnowledgeContent.parse_obj(data) + + for item in knowledge_items.items: + output_path = generate_file_path(root_dir, item) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(markdown.markdown(item.content)) diff --git a/app/knowledge_panels.py b/app/knowledge_panels.py index 672a372..6570b65 100644 --- a/app/knowledge_panels.py +++ b/app/knowledge_panels.py @@ -2,17 +2,12 @@ from typing import Optional, Union from urllib.parse import urlencode +from app.utils import alpha2_to_country_name, country_name_to_alpha2, pluralize + from .config import openFoodFacts, settings from .exception_wrapper import no_exception from .i18n import translate as _ -from .models import ( - HungerGameFilter, - Taxonomies, - alpha2_to_country_name, - country_name_to_alpha2, - pluralize, - singularize, -) +from .models import HungerGameFilter, Taxonomies from .off import data_quality, last_edit, wikidata_helper @@ -25,9 +20,9 @@ def __init__( sec_value: Union[str, None] = None, country: Union[str, None] = None, ): - self.facet = singularize(facet) + self.facet = facet self.value = value - self.sec_facet = singularize(sec_facet) + self.sec_facet = sec_facet self.sec_value = sec_value self.country = alpha2_to_country_name(country) diff --git a/app/main.py b/app/main.py index 8762851..6613290 100644 --- a/app/main.py +++ b/app/main.py @@ -1,19 +1,33 @@ import logging import re -from typing import Optional +from typing import Annotated import asyncer +from aiofile import async_open from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from fastapi_utils.tasks import repeat_every +from openfoodfacts.types import Country, Lang from prometheus_fastapi_instrumentator import Instrumentator -from .i18n import active_translation +from app.information_kp import find_kp_html_path +from app.settings import HTML_DIR + +from .i18n import DEFAULT_LANGUAGE, active_translation from .knowledge_panels import KnowledgePanels -from .models import FacetResponse, QueryData +from .models import ( + COUNTRY_QUERY, + FACET_TAG_QUERY, + LANGUAGE_CODE_QUERY, + SECONDARY_FACET_TAG_QUERY, + SECONDARY_VALUE_TAG_QUERY, + VALUE_TAG_QUERY, + FacetResponse, +) from .off import global_quality_refresh +from .utils import secure_filename, singularize tags_metadata = [ { @@ -103,74 +117,116 @@ async def hello(): @app.get("/knowledge_panel", tags=["knowledge-panel"], response_model=FacetResponse) async def knowledge_panel( request: Request, - facet_tag: str = QueryData.facet_tag_query(), - value_tag: Optional[str] = QueryData.value_tag_query(), - sec_facet_tag: Optional[str] = QueryData.secondary_facet_tag_query(), - sec_value_tag: Optional[str] = QueryData.secondary_value_tag_query(), - lang_code: Optional[str] = QueryData.language_code_query(), - country: Optional[str] = QueryData.country_query(), + facet_tag: Annotated[str, FACET_TAG_QUERY], + value_tag: Annotated[str | None, VALUE_TAG_QUERY] = None, + sec_facet_tag: Annotated[str | None, SECONDARY_FACET_TAG_QUERY] = None, + sec_value_tag: Annotated[str | None, SECONDARY_VALUE_TAG_QUERY] = None, + lang_code: Annotated[Lang, LANGUAGE_CODE_QUERY] = Lang[DEFAULT_LANGUAGE], + country: Annotated[Country, COUNTRY_QUERY] = Country.world, + add_contribution_panels: bool = True, + add_information_panels: bool = True, ): + """Return knowledge panels for a `facet_tag` and an optional `facet_value`. + `sec_facet_tag` and `sec_value_tag` are used when accessing nested facets + on Open Food Facts website. + + This endpoint returns 2 types of knowledge panels (controlled by + `add_contribution_panels` and `add_information_panels` flags respectively): + + - contribution knowledge panels: knowledge panels useful for contributors (Hunger + Game links, last edits,...) + - information knowledge panel: description of the category/label... + + Information knowledge panels are country-specific and language-specific. + If no knowledge panel was found for the requested country, Country.world is + used as a fallback. + + This mechanism allows for example to have a different knowledge panel for `en:organic` + in France and in the USA (where we will mostly talk about en:usda-organic) label. """ - FacetName is the model that have list of values - facet_tag are the list of values connecting to FacetName - eg:- category/beer, here beer is the value - """ - if is_crawling_bot(request): + panels = {} + facet_tag = singularize(facet_tag) + sec_facet_tag = singularize(sec_facet_tag) + + if not is_crawling_bot(request) and add_contribution_panels: # Don't return any knowledge panel if the client is a crawling bot - return {"knowledge_panels": {}} - - with active_translation(lang_code): - # creating object that will compute knowledge panels - obj_kp = KnowledgePanels( - facet=facet_tag, - value=value_tag, - sec_facet=sec_facet_tag, - sec_value=sec_value_tag, - country=country, - ) - # this will contains panels computations - soon_panels = [] - # the task_group will run these knowledge_panels async functions concurrently - async with asyncer.create_task_group() as task_group: - # launch each panels computation - soon_panels.append(task_group.soonify(obj_kp.hunger_game_kp)()) - soon_panels.append(task_group.soonify(obj_kp.data_quality_kp)()) - soon_panels.append(task_group.soonify(obj_kp.last_edits_kp)()) - soon_panels.append(task_group.soonify(obj_kp.wikidata_kp)()) - # collect panels results - panels = {} - for soon_value in soon_panels: - # Appending soon_value value in panels - # as soon_panels needs to access outside taskgroup. - if soon_value.value: - panels.update(soon_value.value) - return {"knowledge_panels": panels} - - -templates = Jinja2Templates(directory="template") + with active_translation(lang_code.value): + # creating object that will compute knowledge panels + + obj_kp = KnowledgePanels( + facet=facet_tag, + value=value_tag, + sec_facet=sec_facet_tag, + sec_value=sec_value_tag, + country=country.value if country is not Country.world else None, + ) + # this will contains panels computations + soon_panels = [] + # the task_group will run these knowledge_panels async functions concurrently + async with asyncer.create_task_group() as task_group: + # launch each panels computation + soon_panels.append(task_group.soonify(obj_kp.hunger_game_kp)()) + soon_panels.append(task_group.soonify(obj_kp.data_quality_kp)()) + soon_panels.append(task_group.soonify(obj_kp.last_edits_kp)()) + soon_panels.append(task_group.soonify(obj_kp.wikidata_kp)()) + # collect panels results + for soon_value in soon_panels: + # Appending soon_value value in panels + # as soon_panels needs to access outside taskgroup. + if soon_value.value: + panels.update(soon_value.value) + + if add_information_panels and value_tag is not None: + # As we're using user-provided data to access filesystem, + # generate secure filename + facet_tag_safe = secure_filename(facet_tag) + value_tag_safe = secure_filename(value_tag) + + if facet_tag_safe and value_tag_safe: + file_path = find_kp_html_path( + HTML_DIR, facet_tag_safe, value_tag_safe, country, lang_code + ) + panel = None + if file_path is not None: + async with async_open(file_path, "r") as f: + html_content = await f.read() + panel = { + "elements": [{"element_type": "text", "text_element": {"html": html_content}}], + "title_element": {"title": "Description"}, + } + panels["Description"] = panel + + return {"knowledge_panels": panels} + + +templates = Jinja2Templates(directory="template", trim_blocks=True, lstrip_blocks=True) @app.get("/render-to-html", tags=["Render to HTML"], response_class=HTMLResponse) async def render_html( request: Request, - facet_tag: str = QueryData.facet_tag_query(), - value_tag: Optional[str] = QueryData.value_tag_query(), - sec_facet_tag: Optional[str] = QueryData.secondary_facet_tag_query(), - sec_value_tag: Optional[str] = QueryData.secondary_value_tag_query(), - lang_code: Optional[str] = QueryData.language_code_query(), - country: Optional[str] = QueryData.country_query(), + facet_tag: Annotated[str, FACET_TAG_QUERY], + value_tag: Annotated[str | None, VALUE_TAG_QUERY] = None, + sec_facet_tag: Annotated[str | None, SECONDARY_FACET_TAG_QUERY] = None, + sec_value_tag: Annotated[str | None, SECONDARY_VALUE_TAG_QUERY] = None, + lang_code: Annotated[Lang, LANGUAGE_CODE_QUERY] = Lang[DEFAULT_LANGUAGE], + country: Annotated[Country, COUNTRY_QUERY] = Country.world, + add_contribution_panels: bool = True, + add_information_panels: bool = True, ): """ Render item.html using jinja2 This is helper function to make thing easier while injecting facet_kp in off-server """ panels = await knowledge_panel( - request, - facet_tag, - value_tag, - sec_facet_tag, - sec_value_tag, - lang_code, - country, + request=request, + facet_tag=facet_tag, + value_tag=value_tag, + sec_facet_tag=sec_facet_tag, + sec_value_tag=sec_value_tag, + lang_code=lang_code, + country=country, + add_contribution_panels=add_contribution_panels, + add_information_panels=add_information_panels, ) return templates.TemplateResponse("item.html", {"request": request, "panels": panels}) diff --git a/app/models.py b/app/models.py index b0f2954..6046988 100644 --- a/app/models.py +++ b/app/models.py @@ -1,10 +1,10 @@ +from collections import Counter from enum import Enum -from typing import Optional, TypedDict +from typing import Literal, Optional, TypedDict -import inflect -import pycountry from fastapi import Query -from pydantic import BaseModel, Field +from openfoodfacts import Country, Lang +from pydantic import BaseModel, Field, constr, validator class HungerGameFilter(str, Enum): @@ -43,99 +43,40 @@ def list(): return [c.value for c in Taxonomies] -def alpha2_to_country_name(value: Optional[str]): - """ - Helper function to return country name for aplha2 code - """ - if value is not None and len(value) == 2: - country = pycountry.countries.get(alpha_2=value) - if country is not None: - return f"{country.name}" - return value +FACET_TAG_QUERY = Query( + title="Facet tag", + description="Facet tag to use", + examples=["category", "brand", "ingredient"], +) +VALUE_TAG_QUERY = Query( + title="Value tag", + description="Value tag to use", + examples=["en:beers", "carrefour"], +) -def country_name_to_alpha2(value: Optional[str]): - """ - Helper function that return alpha2 code for country name - """ - country = pycountry.countries.get(name=value) - if country is not None: - return f"{(country.alpha_2).lower()}-en" - return "world" +SECONDARY_FACET_TAG_QUERY = Query( + title="Secondary facet tag", + description="Secondary facet tag, used on Open Food Facts website on nested facet pages " + "(ex: /brand/[BRAND]/category/[CATEGORY]). It should be different than `facet_tag`", + examples=["category", "brand", "ingredient"], +) +SECONDARY_VALUE_TAG_QUERY = Query( + title="Secondary value tag", + description="Secondary value tag, it should be different than `value_tag`", # noqa: E501 + examples=["en:beers", "carrefour"], +) -inflectEngine = inflect.engine() +LANGUAGE_CODE_QUERY = Query( + title="language code 2-letter code", + description="To return knowledge panels in native language", +) - -def pluralize(facet: str): - """ - Return plural form of facet - """ - return facet if facet == "packaging" else inflectEngine.plural_noun(facet) - - -def singularize(facet: Optional[str]): - """ - Return singular form of facet - """ - if facet is not None: - return ( - facet if not inflectEngine.singular_noun(facet) else inflectEngine.singular_noun(facet) - ) - - -class QueryData: - """ - Helper class for handling repetition of query - """ - - def facet_tag_query(): - - query = Query( - title="Facet tag string", - description="Facet tag string for the items to search in the database eg:- `category` etc.", # noqa: E501 - ) - return query - - def value_tag_query(): - query = Query( - default=None, - title="Value tag string", - description="value tag string for the items to search in the database eg:-`en:beers` etc.", # noqa: E501 - ) - return query - - def secondary_facet_tag_query(): - query = Query( - default=None, - title="secondary facet tag string", - description="secondary facet tag string for the items to search in the database eg:-`brand` etc.", # noqa: E501 - ) - return query - - def secondary_value_tag_query(): - query = Query( - default=None, - title="secondary value tag string", - description="secondary value tag string for the items to search in the database eg:-`lidl` etc.", # noqa: E501 - ) - return query - - def language_code_query(): - query = Query( - default=None, - title="language code string", - description="To return knowledge panels in native language, default lang: `en`.", - ) - return query - - def country_query(): - query = Query( - default=None, - title="Country tag string", - description="To return knowledge panels for specific country, ex: `france` or `fr`.", - ) - return query +COUNTRY_QUERY = Query( + title="Country tag string", + description="To return knowledge panels for specific country, ex: `france` or `fr`.", +) # -------------------------------------------- @@ -211,10 +152,45 @@ class WikidataPanel(TypedDict, total=False): WikiData: KnowledgePanelItem -class KnowledgePanel(HungerGamePanel, DataQualityPanel, LastEditsPanel, WikidataPanel): +class InformationPanel(TypedDict, total=False): + """Panel with facet description.""" + + Description: KnowledgePanelItem + + +class KnowledgePanel( + HungerGamePanel, DataQualityPanel, LastEditsPanel, WikidataPanel, InformationPanel +): pass class FacetResponse(BaseModel): # Return facetresponse l.e, all differnt knowledge panel - knowledge_panels: Optional[KnowledgePanel] = None + knowledge_panels: KnowledgePanel | None = None + + +# Models related to information knowledge panel content + + +class KnowledgeContentItem(BaseModel): + lang: Lang + tag_type: Literal["label", "additive", "category"] + value_tag: constr(min_length=3) + content: constr(min_length=2) + country: Country + category_tag: str | None = None + + +class KnowledgeContent(BaseModel): + items: list[KnowledgeContentItem] + + @validator("items") + def unique_items(cls, v): + count = Counter( + (item.lang, item.tag_type, item.value_tag, item.country, item.category_tag) + for item in v + ) + most_common = count.most_common(1) + if most_common and most_common[0][1] > 1: + raise ValueError(f"more than 1 item with fields={most_common[0][0]}") + return v diff --git a/app/settings.py b/app/settings.py index 1513bcf..4d10ef2 100644 --- a/app/settings.py +++ b/app/settings.py @@ -2,3 +2,6 @@ PROJECT_DIR = Path(__file__).parent.parent I18N_DIR = PROJECT_DIR / "i18n" + +# Directory where the HTML pages for information knowledge panels are located +HTML_DIR = PROJECT_DIR / "html" diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..e719264 --- /dev/null +++ b/app/utils.py @@ -0,0 +1,77 @@ +import os +import re +import unicodedata + +import inflect +import pycountry + +# Adapted from werkzeug source code (BSD-3-Clause license) +# to allow validating without transforming value tags + +_filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.\-:]") + + +def secure_filename(filename: str) -> str: + r"""Pass it a filename and it will return a secure version of it. This + filename can then safely be stored on a regular file system and passed + to :func:`os.path.join`. The filename returned is an ASCII only string + for maximum portability. + + >>> secure_filename("My cool movie.mov") + 'My_cool_movie.mov' + >>> secure_filename("../../../etc/passwd") + 'etc_passwd' + >>> secure_filename('i contain cool \xfcml\xe4uts.txt') + 'i_contain_cool_umlauts.txt' + + The function might return an empty filename. It's your responsibility + to ensure that the filename is unique and that you abort or + generate a random filename if the function returned an empty one. + + :param filename: the filename to secure + """ + filename = unicodedata.normalize("NFKD", filename) + filename = filename.encode("ascii", "ignore").decode("ascii") + + for sep in os.sep, os.path.altsep: + if sep: + filename = filename.replace(sep, " ") + filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip("._") + + return filename + + +def alpha2_to_country_name(value: str | None): + """ + Helper function to return country name for aplha2 code + """ + if value is not None and len(value) == 2: + country = pycountry.countries.get(alpha_2=value) + if country is not None: + return f"{country.name}" + return value + + +def country_name_to_alpha2(value: str | None): + """ + Helper function that return alpha2 code for country name + """ + country = pycountry.countries.get(name=value) + if country is not None: + return f"{(country.alpha_2).lower()}-en" + return "world" + + +inflectEngine = inflect.engine() + + +def pluralize(facet: str): + """Return plural form of facet.""" + return facet if facet == "packaging" else inflectEngine.plural_noun(facet) + + +def singularize(facet: str | None = None): + """Return singular form of facet.""" + if facet is not None: + singular_value = inflectEngine.singular_noun(facet) + return facet if not singular_value else singular_value diff --git a/build_content.py b/build_content.py new file mode 100644 index 0000000..ab10836 --- /dev/null +++ b/build_content.py @@ -0,0 +1,16 @@ +""" +This script builds the static HTML dump of information knowledge panels +from a content YAML file. +""" + +import argparse +from pathlib import Path + +from app.information_kp import build_content +from app.settings import HTML_DIR + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("file_path", type=Path) + args = parser.parse_args() + build_content(HTML_DIR, args.file_path) diff --git a/requirements.txt b/requirements.txt index 436a5f3..345f4de 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/template/item.html b/template/item.html index f864566..47e0a52 100644 --- a/template/item.html +++ b/template/item.html @@ -1,27 +1,32 @@
The total number of issues are 181
-Total number of edits 1
-DATA about {content_item.value_tag} for " + f"{content_item.country.value}-{content_item.lang.value}
" + ) + yield content_item + file_path.unlink() + os.rmdir(file_path.parent) + + +def test_knowledge_panel_with_information_kp(client, knowledge_content_item: KnowledgeContentItem): + for tag_type_suffix in ("", "s"): + # test with singular and plural form of facet tag + response = client.get( + f"/knowledge_panel?facet_tag={knowledge_content_item.tag_type}{tag_type_suffix}" + f"&value_tag={knowledge_content_item.value_tag}" + f"&country={knowledge_content_item.country.value}" + f"&lang_code={knowledge_content_item.lang.value}" + "&add_contribution_panels=false" + ) + assert response.status_code == 200 + result = response.json() + assert set(result["knowledge_panels"].keys()) == {"Description"} + kp = result["knowledge_panels"]["Description"] + assert len(kp["elements"]) == 1 + element = kp["elements"][0] + assert element == { + "element_type": "text", + "text_element": {"html": "DATA about en:specific-label for it-it
"}, + } + + +def test_knowledge_panel_with_information_kp_unknown_value( + client, knowledge_content_item: KnowledgeContentItem +): + # test with singular and plural form of facet tag + response = client.get( + f"/knowledge_panel?facet_tag={knowledge_content_item.tag_type}" + f"&value_tag=en:value-without-kp" + f"&country={knowledge_content_item.country.value}" + f"&lang_code={knowledge_content_item.lang.value}" + "&add_contribution_panels=false" + ) + assert response.status_code == 200 + result = response.json() + assert result["knowledge_panels"] == {}