diff --git a/benchmark_server/servers/easynetwork_tcp_echoserver.py b/benchmark_server/servers/easynetwork_tcp_echoserver.py index 0d48f9d4..38e0a8c8 100755 --- a/benchmark_server/servers/easynetwork_tcp_echoserver.py +++ b/benchmark_server/servers/easynetwork_tcp_echoserver.py @@ -54,7 +54,8 @@ def create_deserializer_buffer(self, sizehint: int) -> bytearray: def buffered_incremental_deserialize(self, buffer: bytearray) -> Generator[int | None, int, tuple[bytes, memoryview]]: with memoryview(buffer) as buffer_view: _, offset, buflen = yield from _buffered_readuntil(buffer, b"\n") - return bytes(buffer[:offset]), buffer_view[offset:buflen] + del buffer + return bytes(buffer_view[:offset]), buffer_view[offset:buflen] class EchoRequestHandler(AsyncStreamRequestHandler[Any, Any]): @@ -113,6 +114,7 @@ def create_tcp_server( ssl=ssl_context, runner_options=asyncio_options, manual_buffer_allocation="force" if buffered else "no", + max_recv_size=65536, # Default buffer limit of asyncio streams ) diff --git a/src/easynetwork/lowlevel/_stream.py b/src/easynetwork/lowlevel/_stream.py index 78accaaa..209efb25 100644 --- a/src/easynetwork/lowlevel/_stream.py +++ b/src/easynetwork/lowlevel/_stream.py @@ -121,6 +121,7 @@ class BufferedStreamDataConsumer(Generic[_T_ReceivedPacket]): __slots__ = ( "__buffered_receiver", "__buffer", + "__buffer_view_cache", "__exported_write_buffer_view", "__buffer_start", "__already_written", @@ -136,8 +137,9 @@ def __init__(self, protocol: StreamProtocol[Any, _T_ReceivedPacket], buffer_size self.__buffered_receiver: BufferedStreamReceiver[_T_ReceivedPacket, WriteableBuffer] = protocol.buffered_receiver() self.__consumer: Generator[int | None, int, tuple[_T_ReceivedPacket, ReadableBuffer]] | None = None self.__buffer: WriteableBuffer | None = None + self.__buffer_view_cache: memoryview | None = None self.__exported_write_buffer_view: memoryview | None = None - self.__buffer_start: int | None = None + self.__buffer_start: int = 0 self.__already_written: int = 0 self.__sizehint: int = buffer_size_hint @@ -174,7 +176,7 @@ def next(self, nb_updated_bytes: int | None) -> _T_ReceivedPacket: packet: _T_ReceivedPacket remaining: ReadableBuffer try: - self.__buffer_start = consumer.send(nb_updated_bytes) + self.__buffer_start = consumer.send(nb_updated_bytes) or 0 except StopIteration as exc: packet, remaining = exc.value self.__save_remainder_in_buffer(remaining) @@ -184,7 +186,7 @@ def next(self, nb_updated_bytes: int | None) -> _T_ReceivedPacket: raise except Exception as exc: # Reset buffer, since we do not know if the buffer state is still valid - self.__buffer = None + self.__buffer_view_cache = self.__buffer = None raise RuntimeError("protocol.build_packet_from_buffer() crashed") from exc else: self.__consumer = consumer @@ -198,26 +200,26 @@ def get_write_buffer(self) -> WriteableBuffer: whole_buffer = self.__buffered_receiver.create_buffer(self.__sizehint) self.__validate_created_buffer(whole_buffer) self.__buffer = whole_buffer + self.__buffer_view_cache = None # Ensure buffer view is reset if self.__consumer is None: consumer = self.__buffered_receiver.build_packet_from_buffer(self.__buffer) try: - self.__buffer_start = next(consumer) + self.__buffer_start = next(consumer) or 0 except StopIteration: raise RuntimeError("protocol.build_packet_from_buffer() did not yield") from None except Exception as exc: # Reset buffer, since we do not know if the buffer state is still valid - self.__buffer = None + self.__buffer_view_cache = self.__buffer = None raise RuntimeError("protocol.build_packet_from_buffer() crashed") from exc self.__consumer = consumer - buffer: memoryview = memoryview(self.__buffer).cast("B") + buffer: memoryview | None + if (buffer := self.__buffer_view_cache) is None: + buffer = memoryview(self.__buffer).cast("B") + self.__buffer_view_cache = buffer - match self.__buffer_start: - case None | 0: - pass - case start_idx: - buffer = buffer[start_idx:] + buffer = buffer[self.__buffer_start :] if self.__already_written: buffer = buffer[self.__already_written :] @@ -234,9 +236,7 @@ def get_value(self, *, full: bool = False) -> bytes | None: if full: return bytes(self.__buffer) buffer = memoryview(self.__buffer).cast("B") - if self.__buffer_start is None: - nbytes = self.__already_written - elif self.__buffer_start < 0: + if self.__buffer_start < 0: nbytes = self.__buffer_start + len(buffer) + self.__already_written else: nbytes = self.__buffer_start + self.__already_written @@ -244,7 +244,8 @@ def get_value(self, *, full: bool = False) -> bytes | None: def clear(self) -> None: self.__release_write_buffer_view() - self.__buffer = self.__buffer_start = None + self.__buffer_view_cache = self.__buffer = None + self.__buffer_start = 0 self.__already_written = 0 consumer, self.__consumer = self.__consumer, None if consumer is not None: diff --git a/src/easynetwork/lowlevel/std_asyncio/stream/socket.py b/src/easynetwork/lowlevel/std_asyncio/stream/socket.py index c94f6900..1568bebf 100644 --- a/src/easynetwork/lowlevel/std_asyncio/stream/socket.py +++ b/src/easynetwork/lowlevel/std_asyncio/stream/socket.py @@ -248,8 +248,8 @@ async def receive_data(self, bufsize: int, /) -> bytes: return b"" if bufsize < 0: raise ValueError("'bufsize' must be a positive or null integer") - if not self.__buffer_nbytes_written and not self.__eof_reached: - await self._wait_for_data("receive_data") + + await self._wait_for_data("receive_data") nbytes_written = self.__buffer_nbytes_written if nbytes_written: @@ -269,10 +269,10 @@ async def receive_data_into(self, buffer: WriteableBuffer, /) -> int: if self.__connection_lost_exception is not None: raise self.__connection_lost_exception.with_traceback(self.__connection_lost_exception_tb) with memoryview(buffer).cast("B") as buffer: - if not buffer.nbytes: + if not buffer: return 0 - if not self.__buffer_nbytes_written and not self.__eof_reached: - await self._wait_for_data("receive_data_into") + + await self._wait_for_data("receive_data_into") nbytes_written = self.__buffer_nbytes_written if nbytes_written: @@ -294,7 +294,9 @@ async def _wait_for_data(self, requester: str) -> None: if self.__read_waiter is not None: raise RuntimeError(f"{requester}() called while another coroutine is already waiting for incoming data") - assert not self.__eof_reached, "_wait_for_data after EOF" # nosec assert_used + if self.__buffer_nbytes_written or self.__eof_reached: + return + assert not self.__read_paused, "transport reading is paused" # nosec assert_used if self.__transport is None: diff --git a/tox.ini b/tox.ini index cf561af6..8a96d7b6 100644 --- a/tox.ini +++ b/tox.ini @@ -242,8 +242,8 @@ setenv = readline: BENCHMARK_PATTERN = ^readline # Report files - BENCHMARK_REPORT_JSON = {toxinidir}{/}benchmark_reports{/}json{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.json - BENCHMARK_REPORT_HTML = {toxinidir}{/}benchmark_reports{/}html{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.html + BENCHMARK_REPORT_JSON = {toxinidir}{/}benchmark_reports{/}server_benches{/}json{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.json + BENCHMARK_REPORT_HTML = {toxinidir}{/}benchmark_reports{/}server_benches{/}html{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.html passenv = BENCHMARK_PYTHON_VERSION DOCKER_HOST