Skip to content

Commit

Permalink
Datagram server: Fixed memory leak if the request handler have an inn…
Browse files Browse the repository at this point in the history
…er loop (#306)
  • Loading branch information
francis-clairicia authored Jun 24, 2024
1 parent 80a3ea7 commit 3dc17a0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
4 changes: 4 additions & 0 deletions benchmark_server/build_benchmark_image
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ from __future__ import annotations

import argparse
import os
import shutil
from pathlib import Path
from typing import Any, Literal

Expand All @@ -29,6 +30,8 @@ def _build_sdist_for_docker_image() -> Path:
distribution: Literal["sdist"] = "sdist"
config_settings: dict[str, Any] = {}

shutil.rmtree(output_dir, ignore_errors=True)

print("Building source distribution for docker image...")
with pyproject_build.env.DefaultIsolatedEnv() as env:
builder = pyproject_build.ProjectBuilder.from_isolated_env(env, src_dir)
Expand Down Expand Up @@ -65,6 +68,7 @@ def main() -> None:
help="Built image tag",
)
parser.add_argument(
"-p",
"--python-version",
default=SUPPORTED_PYTHON_VERSIONS[-1],
choices=list(SUPPORTED_PYTHON_VERSIONS),
Expand Down
12 changes: 8 additions & 4 deletions src/easynetwork/lowlevel/api_async/servers/datagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,11 @@ async def handler(datagram: bytes, address: _T_Address, /) -> None:
client = client_cache[address]
except KeyError:
client_cache[address] = client = _ClientToken(DatagramClientContext(address, self), _ClientData(backend))
notify = False
else:
notify = True

await client.data.push_datagram(datagram)
await client.data.push_datagram(datagram, notify=notify)

if client.data.state is None:
del datagram
Expand Down Expand Up @@ -364,10 +367,11 @@ def state(self) -> _ClientState | None:
def queue_is_empty(self) -> bool:
return not self._datagram_queue

async def push_datagram(self, datagram: bytes) -> None:
async def push_datagram(self, datagram: bytes, *, notify: bool) -> None:
self._datagram_queue.append(datagram)
async with (queue_condition := self._queue_condition):
queue_condition.notify()
if notify:
async with (queue_condition := self._queue_condition):
queue_condition.notify()

def pop_datagram_no_wait(self) -> bytes:
return self._datagram_queue.popleft()
Expand Down
8 changes: 1 addition & 7 deletions src/easynetwork/lowlevel/std_asyncio/datagram/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,9 @@ def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
class _DatagramListenerServeContext:
datagram_handler: Callable[[bytes, tuple[Any, ...]], Coroutine[Any, Any, None]]
task_group: TaskGroup
__queue: collections.deque[tuple[bytes, tuple[Any, ...]]] = dataclasses.field(init=False, default_factory=collections.deque)

def handle(self, data: bytes, addr: tuple[Any, ...]) -> None:
self.__queue.append((data, addr))
self.task_group.start_soon(self.__datagram_handler_task)

async def __datagram_handler_task(self) -> None:
data, addr = self.__queue.popleft()
await self.datagram_handler(data, addr)
self.task_group.start_soon(self.datagram_handler, data, addr)


class DatagramListenerProtocol(asyncio.DatagramProtocol):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,35 @@ def test____client_state____irregular_state_transition(
assert self.get_client_state(client_data) is _ClientState.TASK_RUNNING

@pytest.mark.asyncio
@pytest.mark.parametrize("notify", [True, False], ids=lambda p: f"notify=={p}")
async def test____datagram_queue____push_datagram(
self,
notify: bool,
client_data: _ClientData,
mocker: MockerFixture,
) -> None:
# Arrange
queue_condition = mocker.NonCallableMagicMock(
spec=client_data._queue_condition,
wraps=client_data._queue_condition,
**{
"__aenter__.side_effect": client_data._queue_condition.__aenter__,
"__aexit__.side_effect": client_data._queue_condition.__aexit__,
},
)
client_data._queue_condition = queue_condition

# Act
await client_data.push_datagram(b"datagram_1")
await client_data.push_datagram(b"datagram_2")
await client_data.push_datagram(b"datagram_3")
await client_data.push_datagram(b"datagram_1", notify=notify)
await client_data.push_datagram(b"datagram_2", notify=notify)
await client_data.push_datagram(b"datagram_3", notify=notify)

# Assert
assert list(client_data._datagram_queue) == [b"datagram_1", b"datagram_2", b"datagram_3"]
if notify:
assert queue_condition.notify.call_count == 3
else:
queue_condition.notify.assert_not_called()

@pytest.mark.asyncio
@pytest.mark.parametrize("no_wait", [False, True], ids=lambda p: f"no_wait=={p}")
Expand Down Expand Up @@ -326,7 +342,7 @@ async def test____datagram_queue____pop_datagram____wait_until_notification(
assert not pop_datagram_task.done()

# Act
await client_data.push_datagram(b"datagram_1")
await client_data.push_datagram(b"datagram_1", notify=True)

# Assert
assert (await pop_datagram_task) == b"datagram_1"

0 comments on commit 3dc17a0

Please sign in to comment.