diff --git a/docs/source/api/async/backend.rst b/docs/source/api/async/backend.rst index b49f3834..84943d1d 100644 --- a/docs/source/api/async/backend.rst +++ b/docs/source/api/async/backend.rst @@ -32,12 +32,6 @@ Runners .. automethod:: AsyncBackend.bootstrap -.. automethod:: AsyncBackend.new_runner - -.. autoclass:: Runner - :members: - :special-members: __enter__, __exit__ - Coroutines And Tasks -------------------- diff --git a/src/easynetwork/api_async/backend/abc.py b/src/easynetwork/api_async/backend/abc.py index 952da403..724214db 100644 --- a/src/easynetwork/api_async/backend/abc.py +++ b/src/easynetwork/api_async/backend/abc.py @@ -35,7 +35,7 @@ import contextvars import math from abc import ABCMeta, abstractmethod -from collections.abc import Awaitable, Callable, Coroutine, Iterable, Sequence +from collections.abc import Awaitable, Callable, Coroutine, Iterable, Mapping, Sequence from contextlib import AbstractAsyncContextManager from typing import TYPE_CHECKING, Any, Generic, NoReturn, ParamSpec, Protocol, Self, TypeVar @@ -186,55 +186,6 @@ async def wait(self) -> Any: # pragma: no cover ... -class Runner(metaclass=ABCMeta): - """ - A :term:`context manager` that simplifies `multiple` async function calls in the same context. - - Sometimes several top-level async functions should be called in the same event loop and :class:`contextvars.Context`. - """ - - __slots__ = ("__weakref__",) - - def __enter__(self) -> Self: - return self - - def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None: - """Calls :meth:`close`.""" - self.close() - - @abstractmethod - def close(self) -> None: - """ - Closes the runner. - """ - raise NotImplementedError - - @abstractmethod - def run(self, coro_func: Callable[..., Coroutine[Any, Any, _T]], *args: Any) -> _T: - """ - Runs an async function, and returns the result. - - Calling:: - - runner.run(coro_func, *args) - - is equivalent to:: - - await coro_func(*args) - - except that :meth:`run` can (and must) be called from a synchronous context. - - Parameters: - coro_func: An async function. - args: Positional arguments to be passed to `coro_func`. If you need to pass keyword arguments, - then use :func:`functools.partial`. - - Returns: - Whatever `coro_func` returns. - """ - raise NotImplementedError - - class Task(Generic[_T_co], metaclass=ABCMeta): """ A :class:`Task` object represents a concurrent "thread" of execution. @@ -791,36 +742,44 @@ class AsyncBackend(metaclass=ABCMeta): __slots__ = ("__weakref__",) @abstractmethod - def new_runner(self) -> Runner: + def bootstrap( + self, + coro_func: Callable[..., Coroutine[Any, Any, _T]], + *args: Any, + runner_options: Mapping[str, Any] | None = ..., + ) -> _T: """ - Returns an asynchronous function runner. + Runs an async function, and returns the result. - Returns: - A :class:`Runner` context. - """ - raise NotImplementedError + Calling:: - def bootstrap(self, coro_func: Callable[..., Coroutine[Any, Any, _T]], *args: Any) -> _T: - """ - Runs an async function, and returns the result. + backend.bootstrap(coro_func, *args) - Equivalent to:: + is equivalent to:: - with backend.new_runner() as runner: - return runner.run(coro_func, *args) + await coro_func(*args) + + except that :meth:`bootstrap` can (and must) be called from a synchronous context. + + `runner_options` can be used to give additional parameters to the backend runner. For example:: - See :meth:`Runner.run` documentation for details. + backend.bootstrap(coro_func, *args, runner_options={"loop_factory": uvloop.new_event_loop}) + + would act as the following for :mod:`asyncio`:: + + with asyncio.Runner(loop_factory=uvloop.new_event_loop): + runner.run(coro_func(*args)) Parameters: coro_func: An async function. args: Positional arguments to be passed to `coro_func`. If you need to pass keyword arguments, then use :func:`functools.partial`. + runner_options: Options for backend's runner. Returns: - Whatever `coro_func` returns. + Whatever ``await coro_func(*args)`` returns. """ - with self.new_runner() as runner: - return runner.run(coro_func, *args) + raise NotImplementedError @abstractmethod async def coro_yield(self) -> None: diff --git a/src/easynetwork/api_sync/server/_base.py b/src/easynetwork/api_sync/server/_base.py index 8424774d..b8b244af 100644 --- a/src/easynetwork/api_sync/server/_base.py +++ b/src/easynetwork/api_sync/server/_base.py @@ -22,7 +22,8 @@ import contextlib as _contextlib import threading as _threading import time -from typing import TYPE_CHECKING, Self +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any from ...api_async.backend.abc import ThreadsPortal from ...api_async.server.abc import SupportsEventSet @@ -31,18 +32,17 @@ from .abc import AbstractNetworkServer if TYPE_CHECKING: - from ...api_async.backend.abc import Runner from ...api_async.server.abc import AbstractAsyncNetworkServer class BaseStandaloneNetworkServerImpl(AbstractNetworkServer): __slots__ = ( "__server", - "__runner", "__close_lock", "__bootstrap_lock", "__threads_portal", "__is_shutdown", + "__is_closed", ) def __init__(self, server: AbstractAsyncNetworkServer) -> None: @@ -51,15 +51,10 @@ def __init__(self, server: AbstractAsyncNetworkServer) -> None: self.__threads_portal: ThreadsPortal | None = None self.__is_shutdown = _threading.Event() self.__is_shutdown.set() - self.__runner: Runner | None = self.__server.get_backend().new_runner() + self.__is_closed = _threading.Event() self.__close_lock = ForkSafeLock() self.__bootstrap_lock = ForkSafeLock() - def __enter__(self) -> Self: - assert self.__runner is not None, "Server is entered twice" # nosec assert_used - self.__runner.__enter__() - return super().__enter__() - def is_serving(self) -> bool: if (portal := self._portal) is not None: with _contextlib.suppress(RuntimeError): @@ -74,12 +69,10 @@ def server_close(self) -> None: with _contextlib.suppress(concurrent.futures.CancelledError): portal.run_coroutine(self.__server.server_close) else: - runner, self.__runner = self.__runner, None - if runner is None: - return - stack.push(runner) + stack.callback(self.__is_closed.set) self.__is_shutdown.wait() # Ensure we are not in the interval between the server shutdown and the scheduler shutdown - runner.run(self.__server.server_close) + backend = self.__server.get_backend() + backend.bootstrap(self.__server.server_close) server_close.__doc__ = AbstractNetworkServer.server_close.__doc__ @@ -106,7 +99,24 @@ async def __do_shutdown_with_timeout(self, timeout_delay: float) -> None: async with backend.move_on_after(timeout_delay): await self.__server.shutdown() - def serve_forever(self, *, is_up_event: SupportsEventSet | None = None) -> None: + def serve_forever( + self, + *, + is_up_event: SupportsEventSet | None = None, + runner_options: Mapping[str, Any] | None = None, + ) -> None: + """ + Starts the server's main loop. + + Parameters: + is_up_event: If given, will be triggered when the server is ready to accept new clients. + runner_options: Options to pass to the :meth:`~AsyncBackend.bootstrap` method. + + Raises: + ServerClosedError: The server is closed. + ServerAlreadyRunning: Another task already called :meth:`serve_forever`. + """ + backend = self.__server.get_backend() with _contextlib.ExitStack() as server_exit_stack, _contextlib.suppress(backend.get_cancelled_exc_class()): if is_up_event is not None: @@ -119,8 +129,7 @@ def serve_forever(self, *, is_up_event: SupportsEventSet | None = None) -> None: locks_stack.enter_context(self.__close_lock.get()) locks_stack.enter_context(self.__bootstrap_lock.get()) - runner = self.__runner - if runner is None: + if self.__is_closed.is_set(): raise ServerClosedError("Closed server") if not self.__is_shutdown.is_set(): @@ -145,9 +154,7 @@ def acquire_bootstrap_lock() -> None: await self.__server.serve_forever(is_up_event=is_up_event) - runner.run(serve_forever) - - serve_forever.__doc__ = AbstractNetworkServer.serve_forever.__doc__ + backend.bootstrap(serve_forever, runner_options=runner_options) @property def _server(self) -> AbstractAsyncNetworkServer: diff --git a/src/easynetwork_asyncio/backend.py b/src/easynetwork_asyncio/backend.py index bccf3baa..20aa176f 100644 --- a/src/easynetwork_asyncio/backend.py +++ b/src/easynetwork_asyncio/backend.py @@ -27,8 +27,8 @@ import os import socket as _socket import sys -from collections.abc import Callable, Coroutine, Sequence -from contextlib import AbstractAsyncContextManager as AsyncContextManager +from collections.abc import Callable, Coroutine, Mapping, Sequence +from contextlib import AbstractAsyncContextManager as AsyncContextManager, closing from typing import TYPE_CHECKING, Any, NoReturn, ParamSpec, TypeVar try: @@ -45,7 +45,6 @@ from ._utils import create_connection, create_datagram_socket, ensure_resolved, open_listener_sockets_from_getaddrinfo_result from .datagram.endpoint import create_datagram_endpoint from .datagram.socket import AsyncioTransportDatagramSocketAdapter, RawDatagramSocketAdapter -from .runner import AsyncioRunner from .stream.listener import AcceptedSocket, AcceptedSSLSocket, ListenerSocketAdapter from .stream.socket import AsyncioTransportStreamSocketAdapter, RawStreamSocketAdapter from .tasks import SystemTask, TaskGroup, TaskUtils, TimeoutHandle @@ -63,14 +62,20 @@ class AsyncioBackend(AbstractAsyncBackend): - __slots__ = ("__use_asyncio_transport", "__asyncio_runner_factory") + __slots__ = ("__use_asyncio_transport",) - def __init__(self, *, transport: bool = True, runner_factory: Callable[[], asyncio.Runner] | None = None) -> None: + def __init__(self, *, transport: bool = True) -> None: self.__use_asyncio_transport: bool = bool(transport) - self.__asyncio_runner_factory: Callable[[], asyncio.Runner] = runner_factory or asyncio.Runner - def new_runner(self) -> AsyncioRunner: - return AsyncioRunner(self.__asyncio_runner_factory()) + def bootstrap( + self, + coro_func: Callable[..., Coroutine[Any, Any, _T]], + *args: Any, + runner_options: Mapping[str, Any] | None = None, + ) -> _T: + # Avoid ResourceWarning by always closing the coroutine + with asyncio.Runner(**(runner_options or {})) as runner, closing(coro_func(*args)) as coro: + return runner.run(coro) async def coro_yield(self) -> None: await asyncio.sleep(0) diff --git a/src/easynetwork_asyncio/runner.py b/src/easynetwork_asyncio/runner.py deleted file mode 100644 index b3077351..00000000 --- a/src/easynetwork_asyncio/runner.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright 2021-2023, Francis Clairicia-Rose-Claire-Josephine -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -"""asyncio engine for easynetwork.api_async -""" - -from __future__ import annotations - -__all__ = ["AsyncioRunner"] - -import asyncio -import contextlib -from collections.abc import Callable, Coroutine -from typing import Any, Self, TypeVar - -from easynetwork.api_async.backend.abc import Runner as AbstractRunner - -_T = TypeVar("_T") - - -class AsyncioRunner(AbstractRunner): - __slots__ = ("__runner",) - - def __init__(self, runner: asyncio.Runner) -> None: - super().__init__() - - self.__runner: asyncio.Runner = runner - - def __enter__(self) -> Self: - self.__runner.__enter__() - return super().__enter__() - - def close(self) -> None: - return self.__runner.close() - - def run(self, coro_func: Callable[..., Coroutine[Any, Any, _T]], *args: Any) -> _T: - with contextlib.closing(coro_func(*args)) as coro: # Avoid ResourceWarning by always closing the coroutine - return self.__runner.run(coro) diff --git a/tests/functional_test/test_communication/test_sync/test_server/test_standalone.py b/tests/functional_test/test_communication/test_sync/test_server/test_standalone.py index 6ecd0ca1..dbdc25d8 100644 --- a/tests/functional_test/test_communication/test_sync/test_server/test_standalone.py +++ b/tests/functional_test/test_communication/test_sync/test_server/test_standalone.py @@ -1,10 +1,9 @@ from __future__ import annotations -import asyncio import contextlib import threading import time -from collections.abc import AsyncGenerator, Callable, Iterator +from collections.abc import AsyncGenerator, Iterator from easynetwork.api_async.server.abc import AbstractAsyncNetworkServer from easynetwork.api_async.server.handler import AsyncBaseClientInterface, AsyncDatagramRequestHandler, AsyncStreamRequestHandler @@ -97,29 +96,11 @@ def test____server_thread____several_join( start_server.join() -def custom_asyncio_runner() -> asyncio.Runner: - return asyncio.Runner(loop_factory=asyncio.new_event_loop) - - class TestStandaloneTCPNetworkServer(BaseTestStandaloneNetworkServer): - @pytest.fixture(params=[None, custom_asyncio_runner]) - @staticmethod - def runner_factory(request: pytest.FixtureRequest) -> Callable[[], asyncio.Runner] | None: - return getattr(request, "param", None) - @pytest.fixture @staticmethod - def server( - stream_protocol: StreamProtocol[str, str], - runner_factory: Callable[[], asyncio.Runner] | None, - ) -> StandaloneTCPNetworkServer[str, str]: - return StandaloneTCPNetworkServer( - None, - 0, - stream_protocol, - EchoRequestHandler(), - backend_kwargs={"runner_factory": runner_factory}, - ) + def server(stream_protocol: StreamProtocol[str, str]) -> StandaloneTCPNetworkServer[str, str]: + return StandaloneTCPNetworkServer(None, 0, stream_protocol, EchoRequestHandler()) def test____dunder_init____invalid_backend(self, stream_protocol: StreamProtocol[str, str]) -> None: with pytest.raises(ValueError, match=r"^You must explicitly give a backend name or instance$"): @@ -131,7 +112,6 @@ def test____dunder_init____invalid_backend(self, stream_protocol: StreamProtocol backend=None, # type: ignore[arg-type] ) - @pytest.mark.parametrize("runner_factory", [None], indirect=True) def test____serve_forever____serve_several_times(self, server: StandaloneTCPNetworkServer[str, str]) -> None: with server: for _ in range(3): @@ -169,24 +149,10 @@ def test____logger_property____exposed(self, server: StandaloneTCPNetworkServer[ class TestStandaloneUDPNetworkServer(BaseTestStandaloneNetworkServer): - @pytest.fixture(params=[None, custom_asyncio_runner]) - @staticmethod - def runner_factory(request: pytest.FixtureRequest) -> Callable[[], asyncio.Runner] | None: - return getattr(request, "param", None) - @pytest.fixture @staticmethod - def server( - datagram_protocol: DatagramProtocol[str, str], - runner_factory: Callable[[], asyncio.Runner] | None, - ) -> StandaloneUDPNetworkServer[str, str]: - return StandaloneUDPNetworkServer( - "localhost", - 0, - datagram_protocol, - EchoRequestHandler(), - backend_kwargs={"runner_factory": runner_factory}, - ) + def server(datagram_protocol: DatagramProtocol[str, str]) -> StandaloneUDPNetworkServer[str, str]: + return StandaloneUDPNetworkServer("localhost", 0, datagram_protocol, EchoRequestHandler()) def test____dunder_init____invalid_backend(self, datagram_protocol: DatagramProtocol[str, str]) -> None: with pytest.raises(ValueError, match=r"^You must explicitly give a backend name or instance$"): @@ -198,7 +164,6 @@ def test____dunder_init____invalid_backend(self, datagram_protocol: DatagramProt backend=None, # type: ignore[arg-type] ) - @pytest.mark.parametrize("runner_factory", [None], indirect=True) def test____serve_forever____serve_several_times(self, server: StandaloneUDPNetworkServer[str, str]) -> None: with server: for _ in range(3): diff --git a/tests/unit_test/test_async/test_api/test_backend/_fake_backends.py b/tests/unit_test/test_async/test_api/test_backend/_fake_backends.py index 48ee61d3..1461ddf7 100644 --- a/tests/unit_test/test_async/test_api/test_backend/_fake_backends.py +++ b/tests/unit_test/test_async/test_api/test_backend/_fake_backends.py @@ -12,7 +12,6 @@ ICondition, IEvent, ILock, - Runner, SystemTask, TaskGroup, ThreadsPortal, @@ -21,7 +20,7 @@ class BaseFakeBackend(AsyncBackend): - def new_runner(self) -> Runner: + def bootstrap(self, *args: Any, **kwargs: Any) -> Any: raise NotImplementedError async def sleep(self, delay: float) -> None: diff --git a/tests/unit_test/test_async/test_asyncio_backend/test_backend.py b/tests/unit_test/test_async/test_asyncio_backend/test_backend.py index 88b4c541..6a564409 100644 --- a/tests/unit_test/test_async/test_asyncio_backend/test_backend.py +++ b/tests/unit_test/test_async/test_asyncio_backend/test_backend.py @@ -3,7 +3,7 @@ import asyncio import contextlib import contextvars -from collections.abc import Callable, Sequence +from collections.abc import Callable, Coroutine, Sequence from socket import AF_INET from typing import TYPE_CHECKING, Any, cast @@ -20,6 +20,56 @@ from pytest_mock import MockerFixture +class TestAsyncIOBackendSync: + @pytest.fixture + @staticmethod + def backend() -> AsyncioBackend: + return AsyncioBackend() + + @pytest.mark.parametrize("runner_options", [{"loop_factory": 42}, None]) + def test____bootstrap____start_new_runner( + self, + runner_options: dict[str, Any] | None, + backend: AsyncioBackend, + mocker: MockerFixture, + ) -> None: + # Arrange + mock_asyncio_runner: MagicMock = mocker.NonCallableMagicMock( + spec=asyncio.Runner, + **{"run.return_value": mocker.sentinel.Runner_ret_val}, + ) + mock_asyncio_runner.__enter__.return_value = mock_asyncio_runner + mock_asyncio_runner_cls = mocker.patch("asyncio.Runner", side_effect=[mock_asyncio_runner]) + mock_coroutine = mocker.NonCallableMagicMock(spec=Coroutine) + coro_stub = mocker.stub() + coro_stub.return_value = mock_coroutine + + # Act + ret_val = backend.bootstrap( + coro_stub, + mocker.sentinel.arg1, + mocker.sentinel.arg2, + mocker.sentinel.arg3, + runner_options=runner_options, + ) + + # Assert + if runner_options is None: + mock_asyncio_runner_cls.assert_called_once_with() + else: + mock_asyncio_runner_cls.assert_called_once_with(**runner_options) + + coro_stub.assert_called_once_with( + mocker.sentinel.arg1, + mocker.sentinel.arg2, + mocker.sentinel.arg3, + ) + + mock_asyncio_runner.run.assert_called_once_with(mock_coroutine) + mock_coroutine.close.assert_called_once_with() + assert ret_val is mocker.sentinel.Runner_ret_val + + @pytest.mark.asyncio class TestAsyncIOBackend: @pytest.fixture(params=[False, True], ids=lambda boolean: f"use_asyncio_transport=={boolean}") @@ -1328,7 +1378,6 @@ async def test____run_in_thread____use_loop_run_in_executor( async def test____create_threads_portal____returns_asyncio_portal( self, - event_loop: asyncio.AbstractEventLoop, backend: AsyncioBackend, ) -> None: # Arrange