From 3dc17a0db7fbdbe26ac59ac19a13ded7a07ad811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francis=20Clairicia-Rose-Claire-Jos=C3=A9phine?= Date: Mon, 24 Jun 2024 09:05:40 +0200 Subject: [PATCH] Datagram server: Fixed memory leak if the request handler have an inner loop (#306) --- benchmark_server/build_benchmark_image | 4 ++++ .../lowlevel/api_async/servers/datagram.py | 12 ++++++---- .../lowlevel/std_asyncio/datagram/listener.py | 8 +------ .../test_servers/test_datagram.py | 24 +++++++++++++++---- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/benchmark_server/build_benchmark_image b/benchmark_server/build_benchmark_image index 28793a77..79a6d13f 100755 --- a/benchmark_server/build_benchmark_image +++ b/benchmark_server/build_benchmark_image @@ -8,6 +8,7 @@ from __future__ import annotations import argparse import os +import shutil from pathlib import Path from typing import Any, Literal @@ -29,6 +30,8 @@ def _build_sdist_for_docker_image() -> Path: distribution: Literal["sdist"] = "sdist" config_settings: dict[str, Any] = {} + shutil.rmtree(output_dir, ignore_errors=True) + print("Building source distribution for docker image...") with pyproject_build.env.DefaultIsolatedEnv() as env: builder = pyproject_build.ProjectBuilder.from_isolated_env(env, src_dir) @@ -65,6 +68,7 @@ def main() -> None: help="Built image tag", ) parser.add_argument( + "-p", "--python-version", default=SUPPORTED_PYTHON_VERSIONS[-1], choices=list(SUPPORTED_PYTHON_VERSIONS), diff --git a/src/easynetwork/lowlevel/api_async/servers/datagram.py b/src/easynetwork/lowlevel/api_async/servers/datagram.py index acc2c073..83c17d93 100644 --- a/src/easynetwork/lowlevel/api_async/servers/datagram.py +++ b/src/easynetwork/lowlevel/api_async/servers/datagram.py @@ -185,8 +185,11 @@ async def handler(datagram: bytes, address: _T_Address, /) -> None: client = client_cache[address] except KeyError: client_cache[address] = client = _ClientToken(DatagramClientContext(address, self), _ClientData(backend)) + notify = False + else: + notify = True - await client.data.push_datagram(datagram) + await client.data.push_datagram(datagram, notify=notify) if client.data.state is None: del datagram @@ -364,10 +367,11 @@ def state(self) -> _ClientState | None: def queue_is_empty(self) -> bool: return not self._datagram_queue - async def push_datagram(self, datagram: bytes) -> None: + async def push_datagram(self, datagram: bytes, *, notify: bool) -> None: self._datagram_queue.append(datagram) - async with (queue_condition := self._queue_condition): - queue_condition.notify() + if notify: + async with (queue_condition := self._queue_condition): + queue_condition.notify() def pop_datagram_no_wait(self) -> bytes: return self._datagram_queue.popleft() diff --git a/src/easynetwork/lowlevel/std_asyncio/datagram/listener.py b/src/easynetwork/lowlevel/std_asyncio/datagram/listener.py index d6c3d2e9..3c7ecfe4 100644 --- a/src/easynetwork/lowlevel/std_asyncio/datagram/listener.py +++ b/src/easynetwork/lowlevel/std_asyncio/datagram/listener.py @@ -112,15 +112,9 @@ def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: class _DatagramListenerServeContext: datagram_handler: Callable[[bytes, tuple[Any, ...]], Coroutine[Any, Any, None]] task_group: TaskGroup - __queue: collections.deque[tuple[bytes, tuple[Any, ...]]] = dataclasses.field(init=False, default_factory=collections.deque) def handle(self, data: bytes, addr: tuple[Any, ...]) -> None: - self.__queue.append((data, addr)) - self.task_group.start_soon(self.__datagram_handler_task) - - async def __datagram_handler_task(self) -> None: - data, addr = self.__queue.popleft() - await self.datagram_handler(data, addr) + self.task_group.start_soon(self.datagram_handler, data, addr) class DatagramListenerProtocol(asyncio.DatagramProtocol): diff --git a/tests/unit_test/test_async/test_lowlevel_api/test_servers/test_datagram.py b/tests/unit_test/test_async/test_lowlevel_api/test_servers/test_datagram.py index 177625fc..78522555 100644 --- a/tests/unit_test/test_async/test_lowlevel_api/test_servers/test_datagram.py +++ b/tests/unit_test/test_async/test_lowlevel_api/test_servers/test_datagram.py @@ -268,19 +268,35 @@ def test____client_state____irregular_state_transition( assert self.get_client_state(client_data) is _ClientState.TASK_RUNNING @pytest.mark.asyncio + @pytest.mark.parametrize("notify", [True, False], ids=lambda p: f"notify=={p}") async def test____datagram_queue____push_datagram( self, + notify: bool, client_data: _ClientData, + mocker: MockerFixture, ) -> None: # Arrange + queue_condition = mocker.NonCallableMagicMock( + spec=client_data._queue_condition, + wraps=client_data._queue_condition, + **{ + "__aenter__.side_effect": client_data._queue_condition.__aenter__, + "__aexit__.side_effect": client_data._queue_condition.__aexit__, + }, + ) + client_data._queue_condition = queue_condition # Act - await client_data.push_datagram(b"datagram_1") - await client_data.push_datagram(b"datagram_2") - await client_data.push_datagram(b"datagram_3") + await client_data.push_datagram(b"datagram_1", notify=notify) + await client_data.push_datagram(b"datagram_2", notify=notify) + await client_data.push_datagram(b"datagram_3", notify=notify) # Assert assert list(client_data._datagram_queue) == [b"datagram_1", b"datagram_2", b"datagram_3"] + if notify: + assert queue_condition.notify.call_count == 3 + else: + queue_condition.notify.assert_not_called() @pytest.mark.asyncio @pytest.mark.parametrize("no_wait", [False, True], ids=lambda p: f"no_wait=={p}") @@ -326,7 +342,7 @@ async def test____datagram_queue____pop_datagram____wait_until_notification( assert not pop_datagram_task.done() # Act - await client_data.push_datagram(b"datagram_1") + await client_data.push_datagram(b"datagram_1", notify=True) # Assert assert (await pop_datagram_task) == b"datagram_1"