From b7883fbe6938d2a81a52b1b2cc7c653e86693911 Mon Sep 17 00:00:00 2001 From: florimondmanca Date: Mon, 1 Apr 2024 00:47:45 +0200 Subject: [PATCH] Refactor --- server/diypedals/.gitignore | 1 + server/diypedals/infrastructure/webdav.py | 265 +++++++++++++--------- 2 files changed, 154 insertions(+), 112 deletions(-) diff --git a/server/diypedals/.gitignore b/server/diypedals/.gitignore index e934adf..d7e1b6b 100644 --- a/server/diypedals/.gitignore +++ b/server/diypedals/.gitignore @@ -1 +1,2 @@ cache/ +web/static/img/build_reports/ diff --git a/server/diypedals/infrastructure/webdav.py b/server/diypedals/infrastructure/webdav.py index 2fa8ec4..8dde5e1 100644 --- a/server/diypedals/infrastructure/webdav.py +++ b/server/diypedals/infrastructure/webdav.py @@ -4,7 +4,8 @@ import itertools import xml.etree.ElementTree as ET from base64 import b64decode, b64encode -from typing import AsyncIterator +from dataclasses import dataclass +from typing import Any, AsyncIterator import httpx from PIL import Image @@ -14,20 +15,52 @@ from .cache import DiskCache +@dataclass(frozen=True) +class _Folder: + href: str + etag: str + + +@dataclass(frozen=True) +class _PhotoFile: + media_type: str + href: str + etag: str + + class BuildReportClient: def __init__(self, username: str, password: str, cache: DiskCache) -> None: self._cache = cache self._http = httpx.AsyncClient( auth=httpx.BasicAuth(username, password), - timeout=httpx.Timeout(5, connect=15), + timeout=httpx.Timeout(5, connect=15, read=15), ) + # https://docs.nextcloud.com/server/latest/developer_manual/client_apis/WebDAV/basic.html + self._url = httpx.URL(settings.BUILD_REPORTS_WEBDAV_URL) async def fetch_all(self) -> AsyncIterator[BuildReport]: - # https://docs.nextcloud.com/server/latest/developer_manual/client_apis/WebDAV/basic.html + folders = await self._list_folders() + + for folder in folders: + entry = await self._read_entry(folder) + photos = await self._read_photos(folder) + thumbnail = _make_thumbnail(photos[0]) - webdav_url = httpx.URL(settings.BUILD_REPORTS_WEBDAV_URL) + yield BuildReport( + title=entry["title"], + slug=entry["slug"], + description=entry["description"], + categories=entry["categories"], + build_date=dt.date.fromisoformat(entry["build_date"]), + status=entry["status"], + thumbnail=thumbnail, + photos=photos, + kit=Kit(**entry["kit"]) if entry.get("kit") else None, + pcb=Pcb(**entry["pcb"]) if entry.get("pcb") else None, + ) - ls_content = """ + async def _list_folders(self) -> list[_Folder]: + body = """ @@ -36,120 +69,128 @@ async def fetch_all(self) -> AsyncIterator[BuildReport]: """ - r = await self._http.request("PROPFIND", webdav_url, content=ls_content) + response = await self._http.request("PROPFIND", self._url, content=body) + response.raise_for_status() + assert response.status_code == http.HTTPStatus.MULTI_STATUS.value + + xml = ET.fromstring(response.text) - xml = ET.fromstring(r.text) - ns = {"d": "DAV:"} + folders = [] - for item in itertools.islice(xml, 1, None): # First is the folder itself - if item.find(".//d:resourcetype/d:collection", ns) is None: + for i, item in enumerate(xml): + if i == 0: + # Skip the folder itself continue - href = item.find(".//d:href", ns) - assert href is not None and href.text is not None + if item.find(".//d:resourcetype/d:collection", {"d": "DAV:"}) is None: + continue - async def _fetch_entry_json(url: str) -> dict: - r = await self._http.request("GET", url) - r.raise_for_status() - return r.json() + href_el = item.find(".//d:href", {"d": "DAV:"}) + assert href_el is not None and href_el.text is not None - entry_etag = item.find(".//d:getetag", ns) - assert entry_etag is not None and entry_etag.text is not None - entry_json_url = str(webdav_url.copy_with(path=href.text + "entry.json")) - data = await self._cache.get( - entry_etag.text, lambda: _fetch_entry_json(entry_json_url) + etag_el = item.find(".//d:getetag", {"d": "DAV:"}) + assert etag_el is not None and etag_el.text is not None + + folder = _Folder( + href=href_el.text, + etag=etag_el.text, ) - async def _fetch_photos(url: str) -> list[dict]: - ls_photos_content = """ - - - - - - - - - """ - - photos_response = await self._http.request( - "PROPFIND", url, content=ls_photos_content - ) - - photos = [] - - if photos_response.status_code == http.HTTPStatus.MULTI_STATUS.value: - photos_xml = ET.fromstring(photos_response.text) - - for photo_item in itertools.islice(photos_xml, 1, None): - if ( - photo_item.find(".//d:resourcetype/d:collection", ns) - is not None - ): - continue - - content_type = photo_item.find(".//d:getcontenttype", ns) - assert ( - content_type is not None and content_type.text is not None - ) - - if not content_type.text.startswith("image/"): - continue - - async def _fetch_photo() -> dict: - # Browser won't be able to download the image as it is behind authentication. - # Need to download the image and serve it as base64. - # See: https://stackoverflow.com/a/62305417 - href = photo_item.find(".//d:href", ns) - assert href is not None and href.text is not None - photo_url = webdav_url.copy_with(path=href.text) - photo_response = await self._http.request("GET", photo_url) - photo_data = b64encode(photo_response.content).decode() - - photo_src = "data:%s;base64,%s" % ( - photo_response.headers["content-type"], - photo_data, - ) - - return { - "src": photo_src, - "content_type": photo_response.headers["content-type"], - "alt": "Photo", - "data": photo_data, - } - - photo_etag = photo_item.find(".//d:getetag", ns) - assert photo_etag is not None and photo_etag.text is not None - photo_attrs = await self._cache.get( - photo_etag.text, _fetch_photo - ) - photos.append(photo_attrs) - - return photos - - entry_url = webdav_url.copy_with(path=href.text + "photos") - photos = [ - Photo(**attr) - for attr in await self._cache.get( - str(entry_url), lambda: _fetch_photos(str(entry_url)) - ) - ] + folders.append(folder) - thumbnail = _make_thumbnail(photos[0]) + return folders - yield BuildReport( - title=data["title"], - slug=data["slug"], - description=data["description"], - categories=data["categories"], - build_date=dt.date.fromisoformat(data["build_date"]), - status=data["status"], - thumbnail=thumbnail, - photos=photos, - kit=Kit(**data.get("kit")) if data.get("kit") else None, - pcb=Pcb(**data.get("pcb")) if data.get("pcb") else None, + async def _read_entry(self, folder: _Folder) -> dict[str, Any]: + async def fetch() -> dict: + url = str(self._url.copy_with(path=folder.href + "entry.json")) + r = await self._http.request("GET", url) + r.raise_for_status() + return r.json() + + return await self._cache.get(f"entry-{folder.etag}", lambda: fetch()) + + async def _list_photo_files(self, folder: _Folder) -> list[_PhotoFile]: + body = """ + + + + + + + + + """ + url = str(self._url.copy_with(path=folder.href + "photos")) + response = await self._http.request("PROPFIND", url, content=body) + response.raise_for_status() + assert response.status_code == http.HTTPStatus.MULTI_STATUS.value + + xml = ET.fromstring(response.text) + + photo_files = [] + + for i, item in enumerate(xml): + if i == 0: + # Skip the folder itself + continue + + if item.find(".//d:resourcetype/d:collection", {"d": "DAV:"}) is not None: + continue + + media_type_el = item.find(".//d:getcontenttype", {"d": "DAV:"}) + assert media_type_el is not None and media_type_el.text is not None + + if not media_type_el.text.startswith("image/"): + continue + + href_el = item.find(".//d:href", {"d": "DAV:"}) + assert href_el is not None and href_el.text is not None + + etag_el = item.find(".//d:getetag", {"d": "DAV:"}) + assert etag_el is not None and etag_el.text is not None + + photo_file = _PhotoFile( + media_type=media_type_el.text, href=href_el.text, etag=etag_el.text ) + photo_files.append(photo_file) + + return photo_files + + async def _read_photos(self, folder: _Folder) -> list[Photo]: + async def fetch() -> list[dict]: + return [ + await self._read_photo_serializable(photo_file) + for photo_file in await self._list_photo_files(folder) + ] + + attrs = await self._cache.get(f"photos-{folder.etag}", lambda: fetch()) + + return [Photo(**attr) for attr in attrs] + + async def _read_photo_serializable(self, photo_file: _PhotoFile) -> dict: + async def fetch() -> dict: + # Browser won't be able to download the image as it is behind authentication. + # Need to download the image and serve it as base64. + # See: https://stackoverflow.com/a/62305417 + url = self._url.copy_with(path=photo_file.href) + response = await self._http.request("GET", url) + content = b64encode(response.content).decode() + src = _make_image_src(response.headers["Content-Type"], content) + + return { + "src": src, + "content_type": response.headers["content-type"], + "alt": "Photo", # TODO + "data": content, + } + + return await self._cache.get(photo_file.etag, lambda: fetch()) + + +def _make_image_src(media_type: str, content: str) -> str: + return "data:%s;base64,%s" % (media_type, content) + def _make_thumbnail(photo: Photo) -> Photo: img = Image.open(io.BytesIO(b64decode(photo.data))) @@ -159,11 +200,11 @@ def _make_thumbnail(photo: Photo) -> Photo: fp = io.BytesIO() img.save(fp, format="JPEG") fp.seek(0) - thumbnail_data = b64encode(fp.read()).decode() + content = b64encode(fp.read()).decode() return Photo( - src=("data:%s;base64,%s" % (photo.content_type, thumbnail_data)), - content_type=photo.content_type, - alt=photo.alt, - data=thumbnail_data, + src=_make_image_src("image/jpeg", content), + content_type="image/jpeg", + alt="Thumbnail", # TODO + data=content, )