Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
florimondmanca committed Mar 31, 2024
1 parent c48b7a7 commit b7883fb
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 112 deletions.
1 change: 1 addition & 0 deletions server/diypedals/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
cache/
web/static/img/build_reports/
265 changes: 153 additions & 112 deletions server/diypedals/infrastructure/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import itertools
import xml.etree.ElementTree as ET
from base64 import b64decode, b64encode
from typing import AsyncIterator
from dataclasses import dataclass
from typing import Any, AsyncIterator

import httpx
from PIL import Image
Expand All @@ -14,20 +15,52 @@
from .cache import DiskCache


@dataclass(frozen=True)
class _Folder:
href: str
etag: str


@dataclass(frozen=True)
class _PhotoFile:
media_type: str
href: str
etag: str


class BuildReportClient:
def __init__(self, username: str, password: str, cache: DiskCache) -> None:
self._cache = cache
self._http = httpx.AsyncClient(
auth=httpx.BasicAuth(username, password),
timeout=httpx.Timeout(5, connect=15),
timeout=httpx.Timeout(5, connect=15, read=15),
)
# https://docs.nextcloud.com/server/latest/developer_manual/client_apis/WebDAV/basic.html
self._url = httpx.URL(settings.BUILD_REPORTS_WEBDAV_URL)

async def fetch_all(self) -> AsyncIterator[BuildReport]:
# https://docs.nextcloud.com/server/latest/developer_manual/client_apis/WebDAV/basic.html
folders = await self._list_folders()

for folder in folders:
entry = await self._read_entry(folder)
photos = await self._read_photos(folder)
thumbnail = _make_thumbnail(photos[0])

webdav_url = httpx.URL(settings.BUILD_REPORTS_WEBDAV_URL)
yield BuildReport(
title=entry["title"],
slug=entry["slug"],
description=entry["description"],
categories=entry["categories"],
build_date=dt.date.fromisoformat(entry["build_date"]),
status=entry["status"],
thumbnail=thumbnail,
photos=photos,
kit=Kit(**entry["kit"]) if entry.get("kit") else None,
pcb=Pcb(**entry["pcb"]) if entry.get("pcb") else None,
)

ls_content = """<?xml version="1.0" encoding="UTF-8"?>
async def _list_folders(self) -> list[_Folder]:
body = """<?xml version="1.0" encoding="UTF-8"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<d:displayname/>
Expand All @@ -36,120 +69,128 @@ async def fetch_all(self) -> AsyncIterator[BuildReport]:
</d:prop>
</d:propfind>
"""
r = await self._http.request("PROPFIND", webdav_url, content=ls_content)
response = await self._http.request("PROPFIND", self._url, content=body)
response.raise_for_status()
assert response.status_code == http.HTTPStatus.MULTI_STATUS.value

xml = ET.fromstring(response.text)

xml = ET.fromstring(r.text)
ns = {"d": "DAV:"}
folders = []

for item in itertools.islice(xml, 1, None): # First is the folder itself
if item.find(".//d:resourcetype/d:collection", ns) is None:
for i, item in enumerate(xml):
if i == 0:
# Skip the folder itself
continue

href = item.find(".//d:href", ns)
assert href is not None and href.text is not None
if item.find(".//d:resourcetype/d:collection", {"d": "DAV:"}) is None:
continue

async def _fetch_entry_json(url: str) -> dict:
r = await self._http.request("GET", url)
r.raise_for_status()
return r.json()
href_el = item.find(".//d:href", {"d": "DAV:"})
assert href_el is not None and href_el.text is not None

entry_etag = item.find(".//d:getetag", ns)
assert entry_etag is not None and entry_etag.text is not None
entry_json_url = str(webdav_url.copy_with(path=href.text + "entry.json"))
data = await self._cache.get(
entry_etag.text, lambda: _fetch_entry_json(entry_json_url)
etag_el = item.find(".//d:getetag", {"d": "DAV:"})
assert etag_el is not None and etag_el.text is not None

folder = _Folder(
href=href_el.text,
etag=etag_el.text,
)

async def _fetch_photos(url: str) -> list[dict]:
ls_photos_content = """<?xml version="1.0" encoding="UTF-8"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<d:displayname/>
<d:getcontenttype/>
<d:resourcetype/>
<d:getetag/>
</d:prop>
</d:propfind>
"""

photos_response = await self._http.request(
"PROPFIND", url, content=ls_photos_content
)

photos = []

if photos_response.status_code == http.HTTPStatus.MULTI_STATUS.value:
photos_xml = ET.fromstring(photos_response.text)

for photo_item in itertools.islice(photos_xml, 1, None):
if (
photo_item.find(".//d:resourcetype/d:collection", ns)
is not None
):
continue

content_type = photo_item.find(".//d:getcontenttype", ns)
assert (
content_type is not None and content_type.text is not None
)

if not content_type.text.startswith("image/"):
continue

async def _fetch_photo() -> dict:
# Browser won't be able to download the image as it is behind authentication.
# Need to download the image and serve it as base64.
# See: https://stackoverflow.com/a/62305417
href = photo_item.find(".//d:href", ns)
assert href is not None and href.text is not None
photo_url = webdav_url.copy_with(path=href.text)
photo_response = await self._http.request("GET", photo_url)
photo_data = b64encode(photo_response.content).decode()

photo_src = "data:%s;base64,%s" % (
photo_response.headers["content-type"],
photo_data,
)

return {
"src": photo_src,
"content_type": photo_response.headers["content-type"],
"alt": "Photo",
"data": photo_data,
}

photo_etag = photo_item.find(".//d:getetag", ns)
assert photo_etag is not None and photo_etag.text is not None
photo_attrs = await self._cache.get(
photo_etag.text, _fetch_photo
)
photos.append(photo_attrs)

return photos

entry_url = webdav_url.copy_with(path=href.text + "photos")
photos = [
Photo(**attr)
for attr in await self._cache.get(
str(entry_url), lambda: _fetch_photos(str(entry_url))
)
]
folders.append(folder)

thumbnail = _make_thumbnail(photos[0])
return folders

yield BuildReport(
title=data["title"],
slug=data["slug"],
description=data["description"],
categories=data["categories"],
build_date=dt.date.fromisoformat(data["build_date"]),
status=data["status"],
thumbnail=thumbnail,
photos=photos,
kit=Kit(**data.get("kit")) if data.get("kit") else None,
pcb=Pcb(**data.get("pcb")) if data.get("pcb") else None,
async def _read_entry(self, folder: _Folder) -> dict[str, Any]:
async def fetch() -> dict:
url = str(self._url.copy_with(path=folder.href + "entry.json"))
r = await self._http.request("GET", url)
r.raise_for_status()
return r.json()

return await self._cache.get(f"entry-{folder.etag}", lambda: fetch())

async def _list_photo_files(self, folder: _Folder) -> list[_PhotoFile]:
body = """<?xml version="1.0" encoding="UTF-8"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<d:displayname/>
<d:getcontenttype/>
<d:resourcetype/>
<d:getetag/>
</d:prop>
</d:propfind>
"""
url = str(self._url.copy_with(path=folder.href + "photos"))
response = await self._http.request("PROPFIND", url, content=body)
response.raise_for_status()
assert response.status_code == http.HTTPStatus.MULTI_STATUS.value

xml = ET.fromstring(response.text)

photo_files = []

for i, item in enumerate(xml):
if i == 0:
# Skip the folder itself
continue

if item.find(".//d:resourcetype/d:collection", {"d": "DAV:"}) is not None:
continue

media_type_el = item.find(".//d:getcontenttype", {"d": "DAV:"})
assert media_type_el is not None and media_type_el.text is not None

if not media_type_el.text.startswith("image/"):
continue

href_el = item.find(".//d:href", {"d": "DAV:"})
assert href_el is not None and href_el.text is not None

etag_el = item.find(".//d:getetag", {"d": "DAV:"})
assert etag_el is not None and etag_el.text is not None

photo_file = _PhotoFile(
media_type=media_type_el.text, href=href_el.text, etag=etag_el.text
)

photo_files.append(photo_file)

return photo_files

async def _read_photos(self, folder: _Folder) -> list[Photo]:
async def fetch() -> list[dict]:
return [
await self._read_photo_serializable(photo_file)
for photo_file in await self._list_photo_files(folder)
]

attrs = await self._cache.get(f"photos-{folder.etag}", lambda: fetch())

return [Photo(**attr) for attr in attrs]

async def _read_photo_serializable(self, photo_file: _PhotoFile) -> dict:
async def fetch() -> dict:
# Browser won't be able to download the image as it is behind authentication.
# Need to download the image and serve it as base64.
# See: https://stackoverflow.com/a/62305417
url = self._url.copy_with(path=photo_file.href)
response = await self._http.request("GET", url)
content = b64encode(response.content).decode()
src = _make_image_src(response.headers["Content-Type"], content)

return {
"src": src,
"content_type": response.headers["content-type"],
"alt": "Photo", # TODO
"data": content,
}

return await self._cache.get(photo_file.etag, lambda: fetch())


def _make_image_src(media_type: str, content: str) -> str:
return "data:%s;base64,%s" % (media_type, content)


def _make_thumbnail(photo: Photo) -> Photo:
img = Image.open(io.BytesIO(b64decode(photo.data)))
Expand All @@ -159,11 +200,11 @@ def _make_thumbnail(photo: Photo) -> Photo:
fp = io.BytesIO()
img.save(fp, format="JPEG")
fp.seek(0)
thumbnail_data = b64encode(fp.read()).decode()
content = b64encode(fp.read()).decode()

return Photo(
src=("data:%s;base64,%s" % (photo.content_type, thumbnail_data)),
content_type=photo.content_type,
alt=photo.alt,
data=thumbnail_data,
src=_make_image_src("image/jpeg", content),
content_type="image/jpeg",
alt="Thumbnail", # TODO
data=content,
)

0 comments on commit b7883fb

Please sign in to comment.