Skip to content

Commit

Permalink
Fix: Better performances when using buffer API (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
francis-clairicia authored Jun 16, 2024
1 parent eed9011 commit a4a2f97
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 24 deletions.
4 changes: 3 additions & 1 deletion benchmark_server/servers/easynetwork_tcp_echoserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def create_deserializer_buffer(self, sizehint: int) -> bytearray:
def buffered_incremental_deserialize(self, buffer: bytearray) -> Generator[int | None, int, tuple[bytes, memoryview]]:
with memoryview(buffer) as buffer_view:
_, offset, buflen = yield from _buffered_readuntil(buffer, b"\n")
return bytes(buffer[:offset]), buffer_view[offset:buflen]
del buffer
return bytes(buffer_view[:offset]), buffer_view[offset:buflen]


class EchoRequestHandler(AsyncStreamRequestHandler[Any, Any]):
Expand Down Expand Up @@ -113,6 +114,7 @@ def create_tcp_server(
ssl=ssl_context,
runner_options=asyncio_options,
manual_buffer_allocation="force" if buffered else "no",
max_recv_size=65536, # Default buffer limit of asyncio streams
)


Expand Down
31 changes: 16 additions & 15 deletions src/easynetwork/lowlevel/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class BufferedStreamDataConsumer(Generic[_T_ReceivedPacket]):
__slots__ = (
"__buffered_receiver",
"__buffer",
"__buffer_view_cache",
"__exported_write_buffer_view",
"__buffer_start",
"__already_written",
Expand All @@ -136,8 +137,9 @@ def __init__(self, protocol: StreamProtocol[Any, _T_ReceivedPacket], buffer_size
self.__buffered_receiver: BufferedStreamReceiver[_T_ReceivedPacket, WriteableBuffer] = protocol.buffered_receiver()
self.__consumer: Generator[int | None, int, tuple[_T_ReceivedPacket, ReadableBuffer]] | None = None
self.__buffer: WriteableBuffer | None = None
self.__buffer_view_cache: memoryview | None = None
self.__exported_write_buffer_view: memoryview | None = None
self.__buffer_start: int | None = None
self.__buffer_start: int = 0
self.__already_written: int = 0
self.__sizehint: int = buffer_size_hint

Expand Down Expand Up @@ -174,7 +176,7 @@ def next(self, nb_updated_bytes: int | None) -> _T_ReceivedPacket:
packet: _T_ReceivedPacket
remaining: ReadableBuffer
try:
self.__buffer_start = consumer.send(nb_updated_bytes)
self.__buffer_start = consumer.send(nb_updated_bytes) or 0
except StopIteration as exc:
packet, remaining = exc.value
self.__save_remainder_in_buffer(remaining)
Expand All @@ -184,7 +186,7 @@ def next(self, nb_updated_bytes: int | None) -> _T_ReceivedPacket:
raise
except Exception as exc:
# Reset buffer, since we do not know if the buffer state is still valid
self.__buffer = None
self.__buffer_view_cache = self.__buffer = None
raise RuntimeError("protocol.build_packet_from_buffer() crashed") from exc
else:
self.__consumer = consumer
Expand All @@ -198,26 +200,26 @@ def get_write_buffer(self) -> WriteableBuffer:
whole_buffer = self.__buffered_receiver.create_buffer(self.__sizehint)
self.__validate_created_buffer(whole_buffer)
self.__buffer = whole_buffer
self.__buffer_view_cache = None # Ensure buffer view is reset

if self.__consumer is None:
consumer = self.__buffered_receiver.build_packet_from_buffer(self.__buffer)
try:
self.__buffer_start = next(consumer)
self.__buffer_start = next(consumer) or 0
except StopIteration:
raise RuntimeError("protocol.build_packet_from_buffer() did not yield") from None
except Exception as exc:
# Reset buffer, since we do not know if the buffer state is still valid
self.__buffer = None
self.__buffer_view_cache = self.__buffer = None
raise RuntimeError("protocol.build_packet_from_buffer() crashed") from exc
self.__consumer = consumer

buffer: memoryview = memoryview(self.__buffer).cast("B")
buffer: memoryview | None
if (buffer := self.__buffer_view_cache) is None:
buffer = memoryview(self.__buffer).cast("B")
self.__buffer_view_cache = buffer

match self.__buffer_start:
case None | 0:
pass
case start_idx:
buffer = buffer[start_idx:]
buffer = buffer[self.__buffer_start :]

if self.__already_written:
buffer = buffer[self.__already_written :]
Expand All @@ -234,17 +236,16 @@ def get_value(self, *, full: bool = False) -> bytes | None:
if full:
return bytes(self.__buffer)
buffer = memoryview(self.__buffer).cast("B")
if self.__buffer_start is None:
nbytes = self.__already_written
elif self.__buffer_start < 0:
if self.__buffer_start < 0:
nbytes = self.__buffer_start + len(buffer) + self.__already_written
else:
nbytes = self.__buffer_start + self.__already_written
return buffer[:nbytes].tobytes()

def clear(self) -> None:
self.__release_write_buffer_view()
self.__buffer = self.__buffer_start = None
self.__buffer_view_cache = self.__buffer = None
self.__buffer_start = 0
self.__already_written = 0
consumer, self.__consumer = self.__consumer, None
if consumer is not None:
Expand Down
14 changes: 8 additions & 6 deletions src/easynetwork/lowlevel/std_asyncio/stream/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ async def receive_data(self, bufsize: int, /) -> bytes:
return b""
if bufsize < 0:
raise ValueError("'bufsize' must be a positive or null integer")
if not self.__buffer_nbytes_written and not self.__eof_reached:
await self._wait_for_data("receive_data")

await self._wait_for_data("receive_data")

nbytes_written = self.__buffer_nbytes_written
if nbytes_written:
Expand All @@ -269,10 +269,10 @@ async def receive_data_into(self, buffer: WriteableBuffer, /) -> int:
if self.__connection_lost_exception is not None:
raise self.__connection_lost_exception.with_traceback(self.__connection_lost_exception_tb)
with memoryview(buffer).cast("B") as buffer:
if not buffer.nbytes:
if not buffer:
return 0
if not self.__buffer_nbytes_written and not self.__eof_reached:
await self._wait_for_data("receive_data_into")

await self._wait_for_data("receive_data_into")

nbytes_written = self.__buffer_nbytes_written
if nbytes_written:
Expand All @@ -294,7 +294,9 @@ async def _wait_for_data(self, requester: str) -> None:
if self.__read_waiter is not None:
raise RuntimeError(f"{requester}() called while another coroutine is already waiting for incoming data")

assert not self.__eof_reached, "_wait_for_data after EOF" # nosec assert_used
if self.__buffer_nbytes_written or self.__eof_reached:
return

assert not self.__read_paused, "transport reading is paused" # nosec assert_used

if self.__transport is None:
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ setenv =
readline: BENCHMARK_PATTERN = ^readline

# Report files
BENCHMARK_REPORT_JSON = {toxinidir}{/}benchmark_reports{/}json{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.json
BENCHMARK_REPORT_HTML = {toxinidir}{/}benchmark_reports{/}html{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.html
BENCHMARK_REPORT_JSON = {toxinidir}{/}benchmark_reports{/}server_benches{/}json{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.json
BENCHMARK_REPORT_HTML = {toxinidir}{/}benchmark_reports{/}server_benches{/}html{/}{envname}-{env:BENCHMARK_PYTHON_VERSION}-report.html
passenv =
BENCHMARK_PYTHON_VERSION
DOCKER_HOST
Expand Down

0 comments on commit a4a2f97

Please sign in to comment.