Skip to content

Commit

Permalink
MessagePackSerializer can be used with StreamProtocol and `Buffer…
Browse files Browse the repository at this point in the history
…edStreamProtocol` (#339)
  • Loading branch information
francis-clairicia authored Aug 4, 2024
1 parent 3bfc45c commit b1fdf37
Show file tree
Hide file tree
Showing 20 changed files with 551 additions and 88 deletions.
2 changes: 2 additions & 0 deletions docs/source/api/serializers/cbor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Configuration

.. autoclass:: CBOREncoderConfig
:members:
:undoc-members:

.. autoclass:: CBORDecoderConfig
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/source/api/serializers/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Configuration

.. autoclass:: JSONEncoderConfig
:members:
:undoc-members:

.. autoclass:: JSONDecoderConfig
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/source/api/serializers/msgpack.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Configuration

.. autoclass:: MessagePackerConfig
:members:
:undoc-members:

.. autoclass:: MessageUnpackerConfig
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/source/api/serializers/pickle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Configuration

.. autoclass:: PicklerConfig
:members:
:undoc-members:

.. autoclass:: UnpicklerConfig
:members:
:undoc-members:
7 changes: 3 additions & 4 deletions micro_benchmarks/serializers/bench_cbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Any, assert_type
from typing import TYPE_CHECKING, Any

from easynetwork.serializers.cbor import CBORSerializer

Expand Down Expand Up @@ -57,9 +57,8 @@ def bench_CBORSerializer_incremental_deserialize(

def deserialize() -> Any:
consumer = serializer.buffered_incremental_deserialize(buffer)
assert_type(next(consumer), None)
with buffer.cast("B") as view:
view[:nbytes] = cbor_data
next(consumer)
buffer[:nbytes] = cbor_data
try:
consumer.send(nbytes)
except StopIteration as exc:
Expand Down
52 changes: 52 additions & 0 deletions micro_benchmarks/serializers/bench_msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from easynetwork.serializers.msgpack import MessagePackSerializer

import pytest

if TYPE_CHECKING:
from pytest_benchmark.fixture import BenchmarkFixture

Expand All @@ -29,3 +31,53 @@ def bench_MessagePackSerializer_deserialize(
result = benchmark(serializer.deserialize, msgpack_data)

assert result == json_object


def bench_MessagePackSerializer_incremental_serialize(
benchmark: BenchmarkFixture,
json_object: Any,
) -> None:
serializer = MessagePackSerializer()

benchmark(lambda: b"".join(serializer.incremental_serialize(json_object)))


@pytest.mark.parametrize("buffered", [False, True], ids=lambda p: f"buffered=={p}")
def bench_MessagePackSerializer_incremental_deserialize(
buffered: bool,
benchmark: BenchmarkFixture,
msgpack_data: bytes,
json_object: Any,
) -> None:
serializer = MessagePackSerializer()

if buffered:
nbytes = len(msgpack_data)
buffer: memoryview = serializer.create_deserializer_buffer(nbytes)

def deserialize() -> Any:
consumer = serializer.buffered_incremental_deserialize(buffer)
next(consumer)
buffer[:nbytes] = msgpack_data
try:
consumer.send(nbytes)
except StopIteration as exc:
return exc.value
else:
raise RuntimeError("consumer yielded")

else:

def deserialize() -> Any:
consumer = serializer.incremental_deserialize()
next(consumer)
try:
consumer.send(msgpack_data)
except StopIteration as exc:
return exc.value
else:
raise RuntimeError("consumer yielded")

result, _ = benchmark(deserialize)

assert result == json_object
2 changes: 1 addition & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cbor = [
"cbor2>=5.5,<6",
]
msgpack = [
"msgpack>=1.0.7,<2",
"msgpack>=1.0.8,<2",
]
trio = [
"trio>=0.26,<1",
Expand Down
2 changes: 1 addition & 1 deletion src/easynetwork/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(self, message: str, buffer: ReadableBuffer, consumed: int, separato
while remaining_data.nbytes and remaining_data[:seplen] != separator[: remaining_data.nbytes]:
remaining_data = remaining_data[1:]

super().__init__(message, remaining_data, error_info=None)
super().__init__(message, bytes(remaining_data), error_info=None)

self.consumed: int = consumed
"""Total number of to be consumed bytes."""
Expand Down
38 changes: 31 additions & 7 deletions src/easynetwork/serializers/base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,26 +361,33 @@ class FileBasedPacketSerializer(BufferedIncrementalPacketSerializer[_T_SentDTOPa
Base class for APIs requiring a :std:term:`file object` for serialization/deserialization.
"""

__slots__ = ("__expected_errors", "__debug")
__slots__ = ("__expected_errors", "__limit", "__debug")

def __init__(
self,
expected_load_error: type[Exception] | tuple[type[Exception], ...],
*,
limit: int = DEFAULT_SERIALIZER_LIMIT,
debug: bool = False,
**kwargs: Any,
) -> None:
"""
Parameters:
expected_load_error: Errors that can be raised by :meth:`load_from_file` implementation,
which must be considered as deserialization errors.
limit: Maximum buffer size.
debug: If :data:`True`, add information to :exc:`.DeserializeError` via the ``error_info`` attribute.
kwargs: Extra options given to ``super().__init__()``.
"""
super().__init__(**kwargs)
if not isinstance(expected_load_error, tuple):
expected_load_error = (expected_load_error,)
assert all(issubclass(e, Exception) for e in expected_load_error) # nosec assert_used

if limit <= 0:
raise ValueError("limit must be a positive integer")

self.__limit: int = limit
self.__expected_errors: tuple[type[Exception], ...] = expected_load_error
self.__debug: bool = bool(debug)

Expand Down Expand Up @@ -412,24 +419,26 @@ def load_from_file(self, file: IO[bytes], /) -> _T_ReceivedDTOPacket:
"""
raise NotImplementedError

@final
def serialize(self, packet: _T_SentDTOPacket, /) -> bytes:
"""
Calls :meth:`dump_to_file` and returns the result.
Returns the byte representation of the Python object `packet`.
By default, this method uses :meth:`dump_to_file`, but it can be overriden for speicific usage.
See :meth:`.AbstractPacketSerializer.serialize` documentation for details.
Raises:
Exception: Any error raised by :meth:`dump_to_file`.
Exception: Any error raised by :meth:`dump_to_bytes`.
"""
with BytesIO() as buffer:
self.dump_to_file(packet, buffer)
return buffer.getvalue()

@final
def deserialize(self, data: bytes, /) -> _T_ReceivedDTOPacket:
"""
Calls :meth:`load_from_file` and returns the result.
Creates a Python object representing the raw :term:`packet` from `data`.
By default, this method uses :meth:`load_from_file`, but it can be overriden for speicific usage.
See :meth:`.AbstractPacketSerializer.deserialize` documentation for details.
Expand Down Expand Up @@ -499,6 +508,7 @@ def create_deserializer_buffer(self, sizehint: int, /) -> memoryview:
"""
See :meth:`.BufferedIncrementalPacketSerializer.create_deserializer_buffer` documentation for details.
"""
sizehint = min(sizehint, self.__limit)
return memoryview(bytearray(sizehint))

@final
Expand Down Expand Up @@ -528,10 +538,11 @@ def __generic_incremental_deserialize(self) -> Generator[None, ReadableBuffer, t
if not initial:
buffer.write((yield))
buffer.seek(0)
self.__check_file_buffer_limit(buffer)
try:
packet: _T_ReceivedDTOPacket = self.load_from_file(buffer)
except EOFError:
continue
pass
except self.__expected_errors as exc:
msg = f"Deserialize error: {exc}"
if self.debug:
Expand All @@ -546,6 +557,11 @@ def __generic_incremental_deserialize(self) -> Generator[None, ReadableBuffer, t
finally:
initial = False

def __check_file_buffer_limit(self, file: BytesIO) -> None:
with file.getbuffer() as buffer_view:
if buffer_view.nbytes > self.__limit:
raise LimitOverrunError("chunk exceeded buffer limit", buffer_view, consumed=buffer_view.nbytes)

@property
@final
def debug(self) -> bool:
Expand All @@ -554,6 +570,14 @@ def debug(self) -> bool:
"""
return self.__debug

@property
@final
def buffer_limit(self) -> int:
"""
Maximum buffer size. Read-only attribute.
"""
return self.__limit


def _wrap_generic_incremental_deserialize(
func: Callable[[], Generator[None, ReadableBuffer, tuple[_T_ReceivedDTOPacket, ReadableBuffer]]],
Expand Down
5 changes: 4 additions & 1 deletion src/easynetwork/serializers/cbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from typing import IO, TYPE_CHECKING, Any, final

from ..lowlevel import _utils
from ..lowlevel.constants import DEFAULT_SERIALIZER_LIMIT
from .base_stream import FileBasedPacketSerializer

if TYPE_CHECKING:
Expand Down Expand Up @@ -86,20 +87,22 @@ def __init__(
encoder_config: CBOREncoderConfig | None = None,
decoder_config: CBORDecoderConfig | None = None,
*,
limit: int = DEFAULT_SERIALIZER_LIMIT,
debug: bool = False,
) -> None:
"""
Parameters:
encoder_config: Parameter object to configure the :class:`~cbor.encoder.CBOREncoder`.
decoder_config: Parameter object to configure the :class:`~cbor.decoder.CBORDecoder`.
limit: Maximum buffer size. Used in incremental serialization context.
debug: If :data:`True`, add information to :exc:`.DeserializeError` via the ``error_info`` attribute.
"""
try:
import cbor2
except ModuleNotFoundError as exc:
raise _utils.missing_extra_deps("cbor") from exc

super().__init__(expected_load_error=(cbor2.CBORDecodeError, UnicodeError), debug=debug)
super().__init__(expected_load_error=(cbor2.CBORDecodeError, UnicodeError), limit=limit, debug=debug)
self.__encoder_cls: Callable[[IO[bytes]], cbor2.CBOREncoder]
self.__decoder_cls: Callable[[IO[bytes]], cbor2.CBORDecoder]

Expand Down
Loading

0 comments on commit b1fdf37

Please sign in to comment.