Skip to content

Commit

Permalink
[UPDATE] JSONSerializer: Added buffer limit
Browse files Browse the repository at this point in the history
  • Loading branch information
francis-clairicia committed Sep 30, 2023
1 parent 6e41a0c commit 9f0d006
Show file tree
Hide file tree
Showing 10 changed files with 724 additions and 86 deletions.
8 changes: 8 additions & 0 deletions docs/source/api/serializers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ Serializers
wrappers/base64
wrappers/compressor
wrappers/encryptor

-----

.. toctree::
:caption: Miscellaneous
:maxdepth: 2

tools
7 changes: 7 additions & 0 deletions docs/source/api/serializers/tools.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*******************************
Serializer implementation tools
*******************************

.. automodule:: easynetwork.serializers.tools
:members:
:no-docstring:
4 changes: 4 additions & 0 deletions src/easynetwork/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"DatagramProtocolParseError",
"DeserializeError",
"IncrementalDeserializeError",
"LimitOverrunError",
"PacketConversionError",
"ServerAlreadyRunning",
"ServerClosedError",
Expand Down Expand Up @@ -102,6 +103,9 @@ def __init__(self, message: str, buffer: ReadableBuffer, consumed: int, separato

super().__init__(message, remaining_data, error_info=None)

self.consumed: int = consumed
"""Total number of to be consumed bytes."""


class PacketConversionError(Exception):
"""The deserialized :term:`packet` is invalid."""
Expand Down
48 changes: 9 additions & 39 deletions src/easynetwork/serializers/base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
from typing import IO, Any, final

from .._typevars import _DTOPacketT
from ..exceptions import DeserializeError, IncrementalDeserializeError, LimitOverrunError
from ..exceptions import DeserializeError, IncrementalDeserializeError
from ..tools.constants import _DEFAULT_LIMIT
from .abc import AbstractIncrementalPacketSerializer, AbstractPacketSerializer
from .tools import GeneratorStreamReader


class AutoSeparatedPacketSerializer(AbstractIncrementalPacketSerializer[_DTOPacketT]):
Expand Down Expand Up @@ -122,36 +123,13 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[_DTOPacketT, b
See :meth:`.AbstractIncrementalPacketSerializer.incremental_deserialize` documentation for details.
Raises:
LimitOverrunError: Reached buffer size limit.
IncrementalDeserializeError: :meth:`deserialize` raised :exc:`.DeserializeError`.
Exception: Any error raised by :meth:`deserialize`.
"""
buffer: bytes = yield
separator: bytes = self.__separator
seplen: int = len(separator)
limit: int = self.__limit
offset: int = 0
sepidx: int = -1

while True:
buflen = len(buffer)

if buflen - offset >= seplen:
sepidx = buffer.find(separator, offset)

if sepidx != -1:
break

offset = buflen + 1 - seplen
if offset > limit:
raise LimitOverrunError("Separator is not found, and chunk exceed the limit", buffer, offset, separator)

buffer += yield

if sepidx > limit:
raise LimitOverrunError("Separator is found, but chunk is longer than limit", buffer, sepidx, separator)

data = buffer[:sepidx]
buffer = buffer[sepidx + seplen :]
reader = GeneratorStreamReader()
data = yield from reader.read_until(self.__separator, limit=self.__limit, keep_end=False)
buffer = reader.read_all()

try:
packet = self.deserialize(data)
Expand Down Expand Up @@ -238,18 +216,10 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[_DTOPacketT, b
IncrementalDeserializeError: :meth:`deserialize` raised :exc:`.DeserializeError`.
Exception: Any error raised by :meth:`deserialize`.
"""
buffer: bytes = yield
packet_size: int = self.__size
while (buffer_size := len(buffer)) < packet_size:
buffer += yield
reader = GeneratorStreamReader()
data = yield from reader.read_exactly(self.__size)
buffer = reader.read_all()

# Do not copy if the size is *exactly* as expected
if buffer_size == packet_size:
data = buffer
buffer = b""
else:
data = buffer[:packet_size]
buffer = buffer[packet_size:]
try:
packet = self.deserialize(data)
except DeserializeError as exc:
Expand Down
112 changes: 88 additions & 24 deletions src/easynetwork/serializers/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
"JSONSerializer",
]


import re
import string
from collections import Counter
from collections.abc import Callable, Generator
from dataclasses import asdict as dataclass_asdict, dataclass
from typing import Any, final

from ..exceptions import DeserializeError, IncrementalDeserializeError
from ..exceptions import DeserializeError, IncrementalDeserializeError, LimitOverrunError
from ..tools._utils import iter_bytes
from ..tools.constants import _DEFAULT_LIMIT
from .abc import AbstractIncrementalPacketSerializer
from .tools import GeneratorStreamReader


@dataclass(kw_only=True)
Expand Down Expand Up @@ -70,7 +73,15 @@ class JSONSerializer(AbstractIncrementalPacketSerializer[Any]):
A :term:`serializer` built on top of the :mod:`json` module.
"""

__slots__ = ("__encoder", "__decoder", "__decoder_error_cls", "__encoding", "__unicode_errors")
__slots__ = (
"__encoder",
"__decoder",
"__decoder_error_cls",
"__encoding",
"__unicode_errors",
"__limit",
"__use_lines",
)

def __init__(
self,
Expand All @@ -79,13 +90,17 @@ def __init__(
*,
encoding: str = "utf-8",
unicode_errors: str = "strict",
limit: int = _DEFAULT_LIMIT,
use_lines: bool = True,
) -> None:
"""
Parameters:
encoder_config: Parameter object to configure the :class:`~json.JSONEncoder`.
decoder_config: Parameter object to configure the :class:`~json.JSONDecoder`.
encoding: String encoding.
unicode_errors: Controls how encoding errors are handled.
limit: Maximum buffer size. Used in incremental serialization context.
use_lines: If :data:`True` (the default), each ASCII lines is considered a JSON object.
See Also:
:ref:`standard-encodings` and :ref:`error-handlers`.
Expand All @@ -107,13 +122,19 @@ def __init__(
elif not isinstance(decoder_config, JSONDecoderConfig):
raise TypeError(f"Invalid decoder config: expected {JSONDecoderConfig.__name__}, got {type(decoder_config).__name__}")

if limit <= 0:
raise ValueError("limit must be a positive integer")

self.__encoder = JSONEncoder(**dataclass_asdict(encoder_config), indent=None, separators=(",", ":"))
self.__decoder = JSONDecoder(**dataclass_asdict(decoder_config))
self.__decoder_error_cls = JSONDecodeError

self.__encoding: str = encoding
self.__unicode_errors: str = unicode_errors

self.__limit: int = limit
self.__use_lines: bool = bool(use_lines)

@final
def serialize(self, packet: Any) -> bytes:
"""
Expand Down Expand Up @@ -153,8 +174,22 @@ def incremental_serialize(self, packet: Any) -> Generator[bytes, None, None]:
Yields:
all the parts of the JSON :term:`packet`.
"""
yield self.__encoder.encode(packet).encode(self.__encoding, self.__unicode_errors)
yield b"\n"
data = self.__encoder.encode(packet).encode(self.__encoding, self.__unicode_errors)
newline = b"\n"
if not data.startswith((b"{", b"[", b'"')):
data += newline
yield data
return
if not self.__use_lines:
yield data
return
if len(data) + len(newline) <= self.__limit // 2:
data += newline
yield data
else:
yield data
del data
yield newline

@final
def deserialize(self, data: bytes) -> Any:
Expand Down Expand Up @@ -206,7 +241,7 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[Any, bytes]]:
Creates a Python object representing the raw JSON :term:`packet`.
Example:
>>> s = JSONSerializer()
>>> s = JSONSerializer(use_lines=False)
>>> consumer = s.incremental_deserialize()
>>> next(consumer)
>>> consumer.send(b'{"key":[1,2,3]')
Expand All @@ -219,17 +254,21 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[Any, bytes]]:
:data:`None` until the whole :term:`packet` has been deserialized.
Raises:
LimitOverrunError: Reached buffer size limit.
IncrementalDeserializeError: A :class:`UnicodeError` or :class:`~json.JSONDecodeError` have been raised.
Returns:
a tuple with the deserialized Python object and the unused trailing data.
"""
complete_document, remaining_data = yield from _JSONParser.raw_parse()

if not complete_document:
# If this condition is verified, decoder.decode() will most likely raise JSONDecodeError
complete_document = remaining_data
remaining_data = b""
complete_document: bytes
remaining_data: bytes
if self.__use_lines:
reader = GeneratorStreamReader()
complete_document = yield from reader.read_until(b"\n", limit=self.__limit)
remaining_data = reader.read_all()
del reader
else:
complete_document, remaining_data = yield from _JSONParser.raw_parse(limit=self.__limit)

packet: Any
try:
Expand Down Expand Up @@ -279,19 +318,21 @@ def _escaped(partial_document_view: memoryview) -> bool:
return escaped

@staticmethod
def raw_parse() -> Generator[None, bytes, tuple[bytes, bytes]]:
def raw_parse(*, limit: int) -> Generator[None, bytes, tuple[bytes, bytes]]:
if limit <= 0:
raise ValueError("limit must be a positive integer")
escaped = _JSONParser._escaped
split_partial_document = _JSONParser._split_partial_document
enclosure_counter: Counter[bytes] = Counter()
partial_document: bytes = yield
first_enclosure: bytes = b""
start: int = 0
try:
offset: int = 0
while True:
with memoryview(partial_document) as partial_document_view:
for nb_chars, char in enumerate(iter_bytes(partial_document_view[start:]), start=start + 1):
for offset, char in enumerate(iter_bytes(partial_document_view[offset:]), start=offset):
match char:
case b'"' if not escaped(partial_document_view[: nb_chars - 1]):
case b'"' if not escaped(partial_document_view[:offset]):
enclosure_counter[b'"'] = 0 if enclosure_counter[b'"'] == 1 else 1
case _ if enclosure_counter[b'"'] > 0: # We are within a JSON string, move on.
continue
Expand All @@ -304,19 +345,25 @@ def raw_parse() -> Generator[None, bytes, tuple[bytes, bytes]]:
case b" " | b"\t" | b"\n" | b"\r": # Optimization: Skip spaces
continue
case _ if len(enclosure_counter) == 0: # No enclosure, only value
partial_document = partial_document[nb_chars - 1 :] if nb_chars > 1 else partial_document
del char, nb_chars
partial_document = partial_document[offset:] if offset > 0 else partial_document
del char, offset
raise _JSONParser._PlainValueError
case _: # JSON character, quickly go to next character
continue
assert len(enclosure_counter) > 0 # nosec assert_used
if not first_enclosure:
first_enclosure = next(iter(enclosure_counter))
if enclosure_counter[first_enclosure] <= 0: # 1st found is closed
return split_partial_document(partial_document, nb_chars)
return split_partial_document(partial_document, offset + 1, limit)

# partial_document not complete
start = partial_document_view.nbytes
offset = partial_document_view.nbytes
if offset > limit:
raise LimitOverrunError(
"JSON object's end frame is not found, and chunk exceed the limit",
partial_document,
offset,
)

# yield outside view scope
partial_document += yield
Expand All @@ -331,15 +378,32 @@ def raw_parse() -> Generator[None, bytes, tuple[bytes, bytes]]:
_JSON_VALUE_BYTES = _JSONParser._JSON_VALUE_BYTES

while (nprint_idx := next((idx for idx, byte in enumerate(partial_document) if byte not in _JSON_VALUE_BYTES), -1)) < 0:
if len(partial_document) > limit:
raise LimitOverrunError(
"JSON object's end frame is not found, and chunk exceed the limit",
partial_document,
nprint_idx,
)
partial_document += yield

return split_partial_document(partial_document, nprint_idx)
return split_partial_document(partial_document, nprint_idx, limit)

@staticmethod
def _split_partial_document(partial_document: bytes, index: int) -> tuple[bytes, bytes]:
index = _JSONParser._whitespaces_match(partial_document, index).end()
if index == len(partial_document):
def _split_partial_document(partial_document: bytes, consumed: int, limit: int) -> tuple[bytes, bytes]:
if consumed > limit:
raise LimitOverrunError(
"JSON object's end frame is found, but chunk is longer than limit",
partial_document,
consumed,
)
consumed = _JSONParser._whitespaces_match(partial_document, consumed).end()
if consumed == len(partial_document):
# The following bytes are only spaces
# Do not slice the document, the trailing spaces will be ignored by JSONDecoder
return partial_document, b""
return partial_document[:index], partial_document[index:]
complete_document, partial_document = partial_document[:consumed], partial_document[consumed:]
if not complete_document:
# If this condition is verified, decoder.decode() will most likely raise JSONDecodeError
complete_document = partial_document
partial_document = b""
return complete_document, partial_document
Loading

0 comments on commit 9f0d006

Please sign in to comment.