Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serializers: Added buffer limit options #133

Merged
merged 6 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/api/serializers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ Serializers
wrappers/base64
wrappers/compressor
wrappers/encryptor

-----

.. toctree::
:caption: Miscellaneous
:maxdepth: 2

tools
7 changes: 7 additions & 0 deletions docs/source/api/serializers/tools.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*******************************
Serializer implementation tools
*******************************

.. automodule:: easynetwork.serializers.tools
:members:
:no-docstring:
2 changes: 1 addition & 1 deletion src/easynetwork/api_async/server/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ async def send_packet(self, packet: _ResponseT, /) -> None:
self.__check_closed()
self.__logger.debug("A response will be sent to %s", self.address)
producer = self.__producer
producer.queue(packet)
producer.enqueue(packet)
del packet
async with self.__send_lock:
socket = self.__check_closed()
Expand Down
25 changes: 25 additions & 0 deletions src/easynetwork/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"DatagramProtocolParseError",
"DeserializeError",
"IncrementalDeserializeError",
"LimitOverrunError",
"PacketConversionError",
"ServerAlreadyRunning",
"ServerClosedError",
Expand All @@ -34,6 +35,8 @@
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from _typeshed import ReadableBuffer

from .tools.socket import SocketAddress


Expand Down Expand Up @@ -82,6 +85,28 @@ def __init__(self, message: str, remaining_data: bytes, error_info: Any = None)
"""Unused trailing data."""


class LimitOverrunError(IncrementalDeserializeError):
"""Reached the buffer size limit while looking for a separator."""

def __init__(self, message: str, buffer: ReadableBuffer, consumed: int, separator: bytes = b"") -> None:
"""
Parameters:
message: Error message.
buffer: Currently too big buffer.
consumed: Total number of to be consumed bytes.
separator: Searched separator.
"""

remaining_data = memoryview(buffer)[consumed:].tobytes()
if separator and remaining_data.startswith(separator):
remaining_data = remaining_data.removeprefix(separator)

super().__init__(message, remaining_data, error_info=None)

self.consumed: int = consumed
"""Total number of to be consumed bytes."""


class PacketConversionError(Exception):
"""The deserialized :term:`packet` is invalid."""

Expand Down
65 changes: 34 additions & 31 deletions src/easynetwork/serializers/base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,48 @@

from .._typevars import _DTOPacketT
from ..exceptions import DeserializeError, IncrementalDeserializeError
from ..tools.constants import _DEFAULT_LIMIT
from .abc import AbstractIncrementalPacketSerializer, AbstractPacketSerializer
from .tools import GeneratorStreamReader


class AutoSeparatedPacketSerializer(AbstractIncrementalPacketSerializer[_DTOPacketT]):
"""
Base class for stream protocols that separates sent information by a byte sequence.
"""

__slots__ = ("__separator", "__incremental_serialize_check_separator")
__slots__ = ("__separator", "__limit", "__incremental_serialize_check_separator")

def __init__(self, separator: bytes, *, incremental_serialize_check_separator: bool = True, **kwargs: Any) -> None:
def __init__(
self,
separator: bytes,
*,
incremental_serialize_check_separator: bool = True,
limit: int = _DEFAULT_LIMIT,
**kwargs: Any,
) -> None:
"""
Parameters:
separator: Byte sequence that indicates the end of the token.
incremental_serialize_check_separator: If `True` (the default), checks that the data returned by
:meth:`serialize` does not contain `separator`,
and removes superfluous `separator` added at the end.
limit: Maximum buffer size.
kwargs: Extra options given to ``super().__init__()``.

Raises:
TypeError: Invalid arguments.
ValueError: Empty separator sequence.
ValueError: Empty `separator` sequence.
ValueError: `limit` must be a positive integer.
"""
super().__init__(**kwargs)
separator = bytes(separator)
if len(separator) < 1:
raise ValueError("Empty separator")
if limit <= 0:
raise ValueError("limit must be a positive integer")
self.__separator: bytes = separator
self.__limit: int = limit
self.__incremental_serialize_check_separator = bool(incremental_serialize_check_separator)

@abstractmethod
Expand Down Expand Up @@ -84,9 +98,15 @@ def incremental_serialize(self, packet: _DTOPacketT, /) -> Generator[bytes, None
data = data.removesuffix(separator)
if separator in data:
raise ValueError(f"{separator!r} separator found in serialized packet {packet!r} which was not at the end")
yield data
del data
yield separator
if not data:
return
if len(data) + len(separator) <= self.__limit // 2:
data += separator
yield data
else:
yield data
del data
yield separator

@abstractmethod
def deserialize(self, data: bytes, /) -> _DTOPacketT:
Expand All @@ -103,23 +123,14 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[_DTOPacketT, b
See :meth:`.AbstractIncrementalPacketSerializer.incremental_deserialize` documentation for details.

Raises:
LimitOverrunError: Reached buffer size limit.
IncrementalDeserializeError: :meth:`deserialize` raised :exc:`.DeserializeError`.
Exception: Any error raised by :meth:`deserialize`.
"""
buffer: bytes = yield
separator: bytes = self.__separator
separator_length: int = len(separator)
while True:
data, found_separator, buffer = buffer.partition(separator)
if found_separator:
del found_separator
if not data: # There was successive separators
continue
break
assert not buffer # nosec assert_used
buffer = data + (yield)
while buffer.startswith(separator): # Remove successive separators which can already be eliminated
buffer = buffer[separator_length:]
reader = GeneratorStreamReader()
data = yield from reader.read_until(self.__separator, limit=self.__limit, keep_end=False)
buffer = reader.read_all()

try:
packet = self.deserialize(data)
except DeserializeError as exc:
Expand Down Expand Up @@ -205,18 +216,10 @@ def incremental_deserialize(self) -> Generator[None, bytes, tuple[_DTOPacketT, b
IncrementalDeserializeError: :meth:`deserialize` raised :exc:`.DeserializeError`.
Exception: Any error raised by :meth:`deserialize`.
"""
buffer: bytes = yield
packet_size: int = self.__size
while (buffer_size := len(buffer)) < packet_size:
buffer += yield
reader = GeneratorStreamReader()
data = yield from reader.read_exactly(self.__size)
buffer = reader.read_all()

# Do not copy if the size is *exactly* as expected
if buffer_size == packet_size:
data = buffer
buffer = b""
else:
data = buffer[:packet_size]
buffer = buffer[packet_size:]
try:
packet = self.deserialize(data)
except DeserializeError as exc:
Expand Down
Loading