Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessagePackSerializer can be used with StreamProtocol and BufferedStreamProtocol #339

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/api/serializers/cbor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Configuration

.. autoclass:: CBOREncoderConfig
:members:
:undoc-members:

.. autoclass:: CBORDecoderConfig
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/source/api/serializers/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Configuration

.. autoclass:: JSONEncoderConfig
:members:
:undoc-members:

.. autoclass:: JSONDecoderConfig
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/source/api/serializers/msgpack.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Configuration

.. autoclass:: MessagePackerConfig
:members:
:undoc-members:

.. autoclass:: MessageUnpackerConfig
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/source/api/serializers/pickle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Configuration

.. autoclass:: PicklerConfig
:members:
:undoc-members:

.. autoclass:: UnpicklerConfig
:members:
:undoc-members:
7 changes: 3 additions & 4 deletions micro_benchmarks/serializers/bench_cbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Any, assert_type
from typing import TYPE_CHECKING, Any

from easynetwork.serializers.cbor import CBORSerializer

Expand Down Expand Up @@ -57,9 +57,8 @@ def bench_CBORSerializer_incremental_deserialize(

def deserialize() -> Any:
consumer = serializer.buffered_incremental_deserialize(buffer)
assert_type(next(consumer), None)
with buffer.cast("B") as view:
view[:nbytes] = cbor_data
next(consumer)
buffer[:nbytes] = cbor_data
try:
consumer.send(nbytes)
except StopIteration as exc:
Expand Down
52 changes: 52 additions & 0 deletions micro_benchmarks/serializers/bench_msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from easynetwork.serializers.msgpack import MessagePackSerializer

import pytest

if TYPE_CHECKING:
from pytest_benchmark.fixture import BenchmarkFixture

Expand All @@ -29,3 +31,53 @@ def bench_MessagePackSerializer_deserialize(
result = benchmark(serializer.deserialize, msgpack_data)

assert result == json_object


def bench_MessagePackSerializer_incremental_serialize(
benchmark: BenchmarkFixture,
json_object: Any,
) -> None:
serializer = MessagePackSerializer()

benchmark(lambda: b"".join(serializer.incremental_serialize(json_object)))


@pytest.mark.parametrize("buffered", [False, True], ids=lambda p: f"buffered=={p}")
def bench_MessagePackSerializer_incremental_deserialize(
buffered: bool,
benchmark: BenchmarkFixture,
msgpack_data: bytes,
json_object: Any,
) -> None:
serializer = MessagePackSerializer()

if buffered:
nbytes = len(msgpack_data)
buffer: memoryview = serializer.create_deserializer_buffer(nbytes)

def deserialize() -> Any:
consumer = serializer.buffered_incremental_deserialize(buffer)
next(consumer)
buffer[:nbytes] = msgpack_data
try:
consumer.send(nbytes)
except StopIteration as exc:
return exc.value
else:
raise RuntimeError("consumer yielded")

else:

def deserialize() -> Any:
consumer = serializer.incremental_deserialize()
next(consumer)
try:
consumer.send(msgpack_data)
except StopIteration as exc:
return exc.value
else:
raise RuntimeError("consumer yielded")

result, _ = benchmark(deserialize)

assert result == json_object
2 changes: 1 addition & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cbor = [
"cbor2>=5.5,<6",
]
msgpack = [
"msgpack>=1.0.7,<2",
"msgpack>=1.0.8,<2",
]
trio = [
"trio>=0.26,<1",
Expand Down
2 changes: 1 addition & 1 deletion src/easynetwork/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(self, message: str, buffer: ReadableBuffer, consumed: int, separato
while remaining_data.nbytes and remaining_data[:seplen] != separator[: remaining_data.nbytes]:
remaining_data = remaining_data[1:]

super().__init__(message, remaining_data, error_info=None)
super().__init__(message, bytes(remaining_data), error_info=None)

self.consumed: int = consumed
"""Total number of to be consumed bytes."""
Expand Down
38 changes: 31 additions & 7 deletions src/easynetwork/serializers/base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,26 +361,33 @@ class FileBasedPacketSerializer(BufferedIncrementalPacketSerializer[_T_SentDTOPa
Base class for APIs requiring a :std:term:`file object` for serialization/deserialization.
"""

__slots__ = ("__expected_errors", "__debug")
__slots__ = ("__expected_errors", "__limit", "__debug")

def __init__(
self,
expected_load_error: type[Exception] | tuple[type[Exception], ...],
*,
limit: int = DEFAULT_SERIALIZER_LIMIT,
debug: bool = False,
**kwargs: Any,
) -> None:
"""
Parameters:
expected_load_error: Errors that can be raised by :meth:`load_from_file` implementation,
which must be considered as deserialization errors.
limit: Maximum buffer size.
debug: If :data:`True`, add information to :exc:`.DeserializeError` via the ``error_info`` attribute.
kwargs: Extra options given to ``super().__init__()``.
"""
super().__init__(**kwargs)
if not isinstance(expected_load_error, tuple):
expected_load_error = (expected_load_error,)
assert all(issubclass(e, Exception) for e in expected_load_error) # nosec assert_used

if limit <= 0:
raise ValueError("limit must be a positive integer")

self.__limit: int = limit
self.__expected_errors: tuple[type[Exception], ...] = expected_load_error
self.__debug: bool = bool(debug)

Expand Down Expand Up @@ -412,24 +419,26 @@ def load_from_file(self, file: IO[bytes], /) -> _T_ReceivedDTOPacket:
"""
raise NotImplementedError

@final
def serialize(self, packet: _T_SentDTOPacket, /) -> bytes:
"""
Calls :meth:`dump_to_file` and returns the result.
Returns the byte representation of the Python object `packet`.

By default, this method uses :meth:`dump_to_file`, but it can be overriden for speicific usage.

See :meth:`.AbstractPacketSerializer.serialize` documentation for details.

Raises:
Exception: Any error raised by :meth:`dump_to_file`.
Exception: Any error raised by :meth:`dump_to_bytes`.
"""
with BytesIO() as buffer:
self.dump_to_file(packet, buffer)
return buffer.getvalue()

@final
def deserialize(self, data: bytes, /) -> _T_ReceivedDTOPacket:
"""
Calls :meth:`load_from_file` and returns the result.
Creates a Python object representing the raw :term:`packet` from `data`.

By default, this method uses :meth:`load_from_file`, but it can be overriden for speicific usage.

See :meth:`.AbstractPacketSerializer.deserialize` documentation for details.

Expand Down Expand Up @@ -499,6 +508,7 @@ def create_deserializer_buffer(self, sizehint: int, /) -> memoryview:
"""
See :meth:`.BufferedIncrementalPacketSerializer.create_deserializer_buffer` documentation for details.
"""
sizehint = min(sizehint, self.__limit)
return memoryview(bytearray(sizehint))

@final
Expand Down Expand Up @@ -528,10 +538,11 @@ def __generic_incremental_deserialize(self) -> Generator[None, ReadableBuffer, t
if not initial:
buffer.write((yield))
buffer.seek(0)
self.__check_file_buffer_limit(buffer)
try:
packet: _T_ReceivedDTOPacket = self.load_from_file(buffer)
except EOFError:
continue
pass
except self.__expected_errors as exc:
msg = f"Deserialize error: {exc}"
if self.debug:
Expand All @@ -546,6 +557,11 @@ def __generic_incremental_deserialize(self) -> Generator[None, ReadableBuffer, t
finally:
initial = False

def __check_file_buffer_limit(self, file: BytesIO) -> None:
with file.getbuffer() as buffer_view:
if buffer_view.nbytes > self.__limit:
raise LimitOverrunError("chunk exceeded buffer limit", buffer_view, consumed=buffer_view.nbytes)

@property
@final
def debug(self) -> bool:
Expand All @@ -554,6 +570,14 @@ def debug(self) -> bool:
"""
return self.__debug

@property
@final
def buffer_limit(self) -> int:
"""
Maximum buffer size. Read-only attribute.
"""
return self.__limit


def _wrap_generic_incremental_deserialize(
func: Callable[[], Generator[None, ReadableBuffer, tuple[_T_ReceivedDTOPacket, ReadableBuffer]]],
Expand Down
5 changes: 4 additions & 1 deletion src/easynetwork/serializers/cbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from typing import IO, TYPE_CHECKING, Any, final

from ..lowlevel import _utils
from ..lowlevel.constants import DEFAULT_SERIALIZER_LIMIT
from .base_stream import FileBasedPacketSerializer

if TYPE_CHECKING:
Expand Down Expand Up @@ -86,20 +87,22 @@ def __init__(
encoder_config: CBOREncoderConfig | None = None,
decoder_config: CBORDecoderConfig | None = None,
*,
limit: int = DEFAULT_SERIALIZER_LIMIT,
debug: bool = False,
) -> None:
"""
Parameters:
encoder_config: Parameter object to configure the :class:`~cbor.encoder.CBOREncoder`.
decoder_config: Parameter object to configure the :class:`~cbor.decoder.CBORDecoder`.
limit: Maximum buffer size. Used in incremental serialization context.
debug: If :data:`True`, add information to :exc:`.DeserializeError` via the ``error_info`` attribute.
"""
try:
import cbor2
except ModuleNotFoundError as exc:
raise _utils.missing_extra_deps("cbor") from exc

super().__init__(expected_load_error=(cbor2.CBORDecodeError, UnicodeError), debug=debug)
super().__init__(expected_load_error=(cbor2.CBORDecodeError, UnicodeError), limit=limit, debug=debug)
self.__encoder_cls: Callable[[IO[bytes]], cbor2.CBOREncoder]
self.__decoder_cls: Callable[[IO[bytes]], cbor2.CBORDecoder]

Expand Down
Loading