Skip to content

Commit

Permalink
Serialize JSON with b64 encoding/decoding for data field of LatestMit…
Browse files Browse the repository at this point in the history
…mDataEntry.py
  • Loading branch information
Grennith committed Jan 21, 2024
1 parent d84d908 commit f760b1d
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import base64
from typing import Dict, List, Optional, Union

from orjson import orjson
Expand Down Expand Up @@ -36,12 +37,20 @@ async def from_json(json_data: Union[bytes, str]) -> Optional[LatestMitmDataEntr
timestamp_received: Optional[int] = loaded.get("timestamp_received")
timestamp_of_data_retrieval: Optional[int] = loaded.get("timestamp_of_data_retrieval")
# TODO: Likely data is a str which needs to be translated to bytes?
data: Union[List, Dict, bytes] = loaded.get("data")
raw_data: Optional[Union[List, Dict, bytes, str]] = loaded.get("data")
if not raw_data:
return None
elif isinstance(raw_data, str):
data: Union[List, Dict, bytes] = base64.b64decode(raw_data)
else:
data: Union[List, Dict, bytes] = raw_data
obj: LatestMitmDataEntry = LatestMitmDataEntry(location,
timestamp_received,
timestamp_of_data_retrieval,
data)
return obj

async def to_json(self) -> bytes:
if isinstance(self.data, bytes):
self.data = base64.b64encode(self.data)
return orjson.dumps(self.__dict__)

0 comments on commit f760b1d

Please sign in to comment.