Skip to content

Commit

Permalink
Handle groups, rosters, and attachments
Browse files Browse the repository at this point in the history
  • Loading branch information
sergey-misuk-im committed Dec 19, 2024
1 parent 5cfe8b0 commit 6cd54b3
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 40 deletions.
77 changes: 52 additions & 25 deletions src/country_workspace/contrib/kobo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,75 @@
from typing import cast

from black.linegen import partial
from requests import Session
from requests import Session, Response

from country_workspace.contrib.kobo.auth import Auth
from country_workspace.contrib.kobo.data import Submission, Asset, Question
from country_workspace.contrib.kobo.raw.common import ListResponse
from country_workspace.contrib.kobo.raw import asset as raw_asset, asset_list as raw_asset_list
from country_workspace.contrib.kobo.raw import asset as raw_asset, asset_list as raw_asset_list, common as raw_common
from country_workspace.contrib.kobo.raw import submission_list as raw_submission_list


def handle_paginated_response[T, U](session: Session,
url: str,
collection_mapper: Callable[[ListResponse], list[T]],
item_mapper: Callable[[T], U]) -> Generator[U, None, None]:
DataGetter = Callable[[str], Response]


def _handle_paginated_response[T, U](data_getter: DataGetter,
url: str,
collection_mapper: Callable[[ListResponse], list[T]],
item_mapper: Callable[[T], U]) -> Generator[U, None, None]:
while url:
response = session.get(url)
response = data_getter(url)
response.raise_for_status()
data: ListResponse = response.json()
yield from map(item_mapper, collection_mapper(data))
url = data["next"]


def _get_raw_asset_list(data: raw_common.ListResponse) -> list[raw_asset_list.Asset]:
return [
datum for datum in
cast(raw_asset_list.AssetList, data)["results"]
if datum["asset_type"] == "survey"
]


def _get_raw_submission_list(data: raw_common.ListResponse) -> list[raw_submission_list.Submission]:
return cast(raw_submission_list.SubmissionList, data)["results"]


def _get_asset_list(data_getter: DataGetter, url: str) -> Generator[Asset, None, None]:
return _handle_paginated_response(data_getter,
url,
_get_raw_asset_list,
partial(_get_asset, data_getter))

def _get_submission_list(data_getter: DataGetter, url: str, questions: list[Question]) -> Generator[Submission, None, None]:
return _handle_paginated_response(
data_getter,
url,
_get_raw_submission_list,
partial(Submission, data_getter, questions)
)


def _get_asset(data_getter: DataGetter, raw: raw_asset_list.Asset) -> Asset:
response = data_getter(raw["url"])
response.raise_for_status()
data: raw_asset.Asset = response.json()
return Asset(data, partial(_get_submission_list, data_getter, raw["data"]))


def _get_submission() -> Submission:
pass


class Client:
def __init__(self, *, base_url: str, token: str) -> None:
self.base_url = base_url
self.session = Session()
self.session.auth = Auth(token)
session = Session()
session.auth = Auth(token)
self.data_getter: DataGetter = session.get

@property
def assets(self) -> Generator[Asset, None, None]:
yield from handle_paginated_response(self.session,
f"{self.base_url}/api/v2/assets.json",
lambda i: cast(raw_asset_list.AssetList, i)["results"],
self._get_asset_data)

def _get_asset_data(self, raw: raw_asset_list.Asset) -> Asset:
response = self.session.get(raw["url"])
response.raise_for_status()
data: raw_asset.Asset = response.json()
return Asset(data, self._get_asset_submissions(raw["data"]))

def _get_asset_submissions(self, url: str) -> Generator[Callable[[list[Question]], Submission], None, None]:
return handle_paginated_response(self.session,
url,
lambda i: cast(raw_submission_list.SubmissionList, i)["results"],
lambda i: partial(Submission, i))
yield from _get_asset_list(self.data_getter, f"{self.base_url}/api/v2/assets.json")
97 changes: 89 additions & 8 deletions src/country_workspace/contrib/kobo/data.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,102 @@
from base64 import b64encode
from collections import UserDict
from collections.abc import Callable, Generator
from enum import StrEnum, auto
from functools import cached_property, reduce
from requests import Response
from typing import Any
from uuid import UUID

from country_workspace.contrib.kobo.raw import asset as raw_asset
from country_workspace.contrib.kobo.raw import submission_list as raw_submission_list


class SurveyItemType(StrEnum):
START = auto()
END = auto()
BEGIN_GROUP = auto()
END_GROUP = auto()
BEGIN_REPEAT = auto()
END_REPEAT = auto()



class Raw[T]:
def __init__(self, raw: T) -> None:
self._raw = raw


class Question(Raw[raw_asset.Question]):
class Question(Raw[raw_asset.SurveyItem]):
def __init__(self, raw: raw_asset.SurveyItem, in_group: bool, in_roster: bool) -> None:
super().__init__(raw)
assert not (in_group and in_roster), "Cannot be both in group and roster"

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
self._in_group = in_group
self._in_roster = in_roster

def extract_answer(self, in_: raw_submission_list.Submission, out: dict[str, Any]) -> None:
if self._in_roster:
roster_key, _ = self.key.split("/")
roster = out.get(roster_key, [])
if self.key in in_:
if roster:
roster[0][self.key] = in_.get(self.key)
else:
roster.append({self.key: in_.get(self.key)})
else:
if roster_key in in_:
for i, item in enumerate(in_[roster_key]):
if len(roster) < i + 1:
roster.append({})
roster[i][self.key] = item[self.key]
out[roster_key] = roster
else:
out[self.key] = in_.get(self.key)

@property
def key(self) -> str:
return self._raw["$autoname"]
return self._raw["$xpath"]

@property
def labels(self) -> list[str]:
return self._raw["label"]


InAndOut = tuple[raw_submission_list.Submission, dict[str, Any]]


def _extract_answer(in_and_out: InAndOut, question: Question) -> InAndOut:
question.extract_answer(*in_and_out)
return in_and_out


def _download_attachments(data_getter: Callable[[str], Response], raw: raw_submission_list.Submission) -> None:
for attachment in raw["_attachments"]:
content = b64encode(data_getter(attachment["download_url"]).content).decode()
value = f"data:{attachment['mimetype']};base64,{content}"
key = attachment["question_xpath"]
if key in raw:
raw[key] = value
else:
parent, key = key.split("/")
parent, index = parent.split("[")
index = int(index.rstrip("]")) - 1
raw[parent][index][f"{parent}/{key}"] = value


class Submission(Raw[raw_submission_list.Submission], UserDict):
def __init__(self, raw: raw_submission_list.Submission, questions: list[Question]) -> None:
def __init__(self, data_getter: Callable[[str], Response], questions: list[Question], raw: raw_submission_list.Submission) -> None:
Raw.__init__(self, raw)
UserDict.__init__(self, {question.key: raw[question.key] for question in questions})
_download_attachments(data_getter, self._raw)
_, answers = reduce(_extract_answer, questions, (raw, {}))
UserDict.__init__(self, answers)

@property
def uuid(self) -> UUID:
return UUID(self._raw["_uuid"])


class Asset(Raw[raw_asset.Asset]):
def __init__(self, raw: raw_asset.Asset, submissions: Generator[Callable[[list[Question]], Submission], None, None]) -> None:
def __init__(self, raw: raw_asset.Asset, submissions: Callable[[list[Question]], Generator[Submission, None, None]]) -> None:
super().__init__(raw)
self._submissions = submissions

Expand All @@ -44,10 +108,27 @@ def uid(self) -> str:
def name(self) -> str:
return self._raw["name"]

@property
@cached_property
def questions(self) -> list[Question]:
return [Question(raw_question) for raw_question in self._raw["content"]["survey"] if "label" in raw_question]
in_group = False
in_roster = False
questions = []
for raw_question in self._raw["content"]["survey"]:
match raw_question["type"]:
case SurveyItemType.START | SurveyItemType.END:
pass
case SurveyItemType.BEGIN_GROUP:
in_group = True
case SurveyItemType.END_GROUP:
in_group = False
case SurveyItemType.BEGIN_REPEAT:
in_roster = True
case SurveyItemType.END_REPEAT:
in_roster = False
case _:
questions.append(Question(raw_question, in_group, in_roster))
return questions

@property
def submissions(self) -> Generator[Submission, None, None]:
return (submission(self.questions) for submission in self._submissions)
yield from self._submissions(self.questions)
7 changes: 3 additions & 4 deletions src/country_workspace/contrib/kobo/raw/asset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import TypedDict


class Question(TypedDict("Question", {"$autoname": str})):
label: list[str]
class SurveyItem(TypedDict("SurveyItem", {"$xpath": str})):
type: str


class Content(TypedDict):
survey: list[Question]
survey: list[SurveyItem]


class Asset(TypedDict):
Expand Down
1 change: 1 addition & 0 deletions src/country_workspace/contrib/kobo/raw/asset_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class Asset(TypedDict):
data: str
url: str
asset_type: str


class AssetList(ListResponse):
Expand Down
7 changes: 7 additions & 0 deletions src/country_workspace/contrib/kobo/raw/submission_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
from country_workspace.contrib.kobo.raw.common import ListResponse


class Attachment(TypedDict):
download_url: str
mimetype: str
question_xpath: str


class Submission(TypedDict):
_uuid: str
_attachments: list[Attachment]


class SubmissionList(ListResponse):
Expand Down
6 changes: 3 additions & 3 deletions tests/contrib/kobo/test_kobo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from requests.sessions import Session
from requests.exceptions import Timeout

from country_workspace.contrib.kobo.client import handle_paginated_response
from country_workspace.contrib.kobo.client import _handle_paginated_response


SAMPLE_URL = "https://example.com"
Expand All @@ -32,7 +32,7 @@ def test_all_data_is_fetched() -> None:
session.get.return_value.json.side_effect = responses
assert (
tuple(
handle_paginated_response(
_handle_paginated_response(
session, urls[0], lambda x: x["results"], identity
)
)
Expand All @@ -44,4 +44,4 @@ def test_error_is_propagated() -> None:
session = Mock(spec=Session)
session.get.return_value.raise_for_status.side_effect = Timeout
with raises(Timeout):
tuple(handle_paginated_response(session, SAMPLE_URL, identity, identity))
tuple(_handle_paginated_response(session, SAMPLE_URL, identity, identity))

0 comments on commit 6cd54b3

Please sign in to comment.