Skip to content

Commit

Permalink
[FIX] AsyncBackend: Removed Runner interface
Browse files Browse the repository at this point in the history
  • Loading branch information
francis-clairicia committed Sep 23, 2023
1 parent 4a62d4e commit 52a396e
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 193 deletions.
6 changes: 0 additions & 6 deletions docs/source/api/async/backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ Runners

.. automethod:: AsyncBackend.bootstrap

.. automethod:: AsyncBackend.new_runner

.. autoclass:: Runner
:members:
:special-members: __enter__, __exit__

Coroutines And Tasks
--------------------

Expand Down
91 changes: 25 additions & 66 deletions src/easynetwork/api_async/backend/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import contextvars
import math
from abc import ABCMeta, abstractmethod
from collections.abc import Awaitable, Callable, Coroutine, Iterable, Sequence
from collections.abc import Awaitable, Callable, Coroutine, Iterable, Mapping, Sequence
from contextlib import AbstractAsyncContextManager
from typing import TYPE_CHECKING, Any, Generic, NoReturn, ParamSpec, Protocol, Self, TypeVar

Expand Down Expand Up @@ -186,55 +186,6 @@ async def wait(self) -> Any: # pragma: no cover
...


class Runner(metaclass=ABCMeta):
"""
A :term:`context manager` that simplifies `multiple` async function calls in the same context.
Sometimes several top-level async functions should be called in the same event loop and :class:`contextvars.Context`.
"""

__slots__ = ("__weakref__",)

def __enter__(self) -> Self:
return self

def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
"""Calls :meth:`close`."""
self.close()

@abstractmethod
def close(self) -> None:
"""
Closes the runner.
"""
raise NotImplementedError

@abstractmethod
def run(self, coro_func: Callable[..., Coroutine[Any, Any, _T]], *args: Any) -> _T:
"""
Runs an async function, and returns the result.
Calling::
runner.run(coro_func, *args)
is equivalent to::
await coro_func(*args)
except that :meth:`run` can (and must) be called from a synchronous context.
Parameters:
coro_func: An async function.
args: Positional arguments to be passed to `coro_func`. If you need to pass keyword arguments,
then use :func:`functools.partial`.
Returns:
Whatever `coro_func` returns.
"""
raise NotImplementedError


class Task(Generic[_T_co], metaclass=ABCMeta):
"""
A :class:`Task` object represents a concurrent "thread" of execution.
Expand Down Expand Up @@ -791,36 +742,44 @@ class AsyncBackend(metaclass=ABCMeta):
__slots__ = ("__weakref__",)

@abstractmethod
def new_runner(self) -> Runner:
def bootstrap(
self,
coro_func: Callable[..., Coroutine[Any, Any, _T]],
*args: Any,
runner_options: Mapping[str, Any] | None = ...,
) -> _T:
"""
Returns an asynchronous function runner.
Runs an async function, and returns the result.
Returns:
A :class:`Runner` context.
"""
raise NotImplementedError
Calling::
def bootstrap(self, coro_func: Callable[..., Coroutine[Any, Any, _T]], *args: Any) -> _T:
"""
Runs an async function, and returns the result.
backend.bootstrap(coro_func, *args)
Equivalent to::
is equivalent to::
with backend.new_runner() as runner:
return runner.run(coro_func, *args)
await coro_func(*args)
except that :meth:`bootstrap` can (and must) be called from a synchronous context.
`runner_options` can be used to give additional parameters to the backend runner. For example::
See :meth:`Runner.run` documentation for details.
backend.bootstrap(coro_func, *args, runner_options={"loop_factory": uvloop.new_event_loop})
would act as the following for :mod:`asyncio`::
with asyncio.Runner(loop_factory=uvloop.new_event_loop):
runner.run(coro_func(*args))
Parameters:
coro_func: An async function.
args: Positional arguments to be passed to `coro_func`. If you need to pass keyword arguments,
then use :func:`functools.partial`.
runner_options: Options for backend's runner.
Returns:
Whatever `coro_func` returns.
Whatever ``await coro_func(*args)`` returns.
"""
with self.new_runner() as runner:
return runner.run(coro_func, *args)
raise NotImplementedError

@abstractmethod
async def coro_yield(self) -> None:
Expand Down
47 changes: 27 additions & 20 deletions src/easynetwork/api_sync/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import contextlib as _contextlib
import threading as _threading
import time
from typing import TYPE_CHECKING, Self
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

from ...api_async.backend.abc import ThreadsPortal
from ...api_async.server.abc import SupportsEventSet
Expand All @@ -31,18 +32,17 @@
from .abc import AbstractNetworkServer

if TYPE_CHECKING:
from ...api_async.backend.abc import Runner
from ...api_async.server.abc import AbstractAsyncNetworkServer


class BaseStandaloneNetworkServerImpl(AbstractNetworkServer):
__slots__ = (
"__server",
"__runner",
"__close_lock",
"__bootstrap_lock",
"__threads_portal",
"__is_shutdown",
"__is_closed",
)

def __init__(self, server: AbstractAsyncNetworkServer) -> None:
Expand All @@ -51,15 +51,10 @@ def __init__(self, server: AbstractAsyncNetworkServer) -> None:
self.__threads_portal: ThreadsPortal | None = None
self.__is_shutdown = _threading.Event()
self.__is_shutdown.set()
self.__runner: Runner | None = self.__server.get_backend().new_runner()
self.__is_closed = _threading.Event()
self.__close_lock = ForkSafeLock()
self.__bootstrap_lock = ForkSafeLock()

def __enter__(self) -> Self:
assert self.__runner is not None, "Server is entered twice" # nosec assert_used
self.__runner.__enter__()
return super().__enter__()

def is_serving(self) -> bool:
if (portal := self._portal) is not None:
with _contextlib.suppress(RuntimeError):
Expand All @@ -74,12 +69,10 @@ def server_close(self) -> None:
with _contextlib.suppress(concurrent.futures.CancelledError):
portal.run_coroutine(self.__server.server_close)
else:
runner, self.__runner = self.__runner, None
if runner is None:
return
stack.push(runner)
stack.callback(self.__is_closed.set)
self.__is_shutdown.wait() # Ensure we are not in the interval between the server shutdown and the scheduler shutdown
runner.run(self.__server.server_close)
backend = self.__server.get_backend()
backend.bootstrap(self.__server.server_close)

server_close.__doc__ = AbstractNetworkServer.server_close.__doc__

Expand All @@ -106,7 +99,24 @@ async def __do_shutdown_with_timeout(self, timeout_delay: float) -> None:
async with backend.move_on_after(timeout_delay):
await self.__server.shutdown()

def serve_forever(self, *, is_up_event: SupportsEventSet | None = None) -> None:
def serve_forever(
self,
*,
is_up_event: SupportsEventSet | None = None,
runner_options: Mapping[str, Any] | None = None,
) -> None:
"""
Starts the server's main loop.
Parameters:
is_up_event: If given, will be triggered when the server is ready to accept new clients.
runner_options: Options to pass to the :meth:`~AsyncBackend.bootstrap` method.
Raises:
ServerClosedError: The server is closed.
ServerAlreadyRunning: Another task already called :meth:`serve_forever`.
"""

backend = self.__server.get_backend()
with _contextlib.ExitStack() as server_exit_stack, _contextlib.suppress(backend.get_cancelled_exc_class()):
if is_up_event is not None:
Expand All @@ -119,8 +129,7 @@ def serve_forever(self, *, is_up_event: SupportsEventSet | None = None) -> None:
locks_stack.enter_context(self.__close_lock.get())
locks_stack.enter_context(self.__bootstrap_lock.get())

runner = self.__runner
if runner is None:
if self.__is_closed.is_set():
raise ServerClosedError("Closed server")

if not self.__is_shutdown.is_set():
Expand All @@ -145,9 +154,7 @@ def acquire_bootstrap_lock() -> None:

await self.__server.serve_forever(is_up_event=is_up_event)

runner.run(serve_forever)

serve_forever.__doc__ = AbstractNetworkServer.serve_forever.__doc__
backend.bootstrap(serve_forever, runner_options=runner_options)

@property
def _server(self) -> AbstractAsyncNetworkServer:
Expand Down
21 changes: 13 additions & 8 deletions src/easynetwork_asyncio/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import os
import socket as _socket
import sys
from collections.abc import Callable, Coroutine, Sequence
from contextlib import AbstractAsyncContextManager as AsyncContextManager
from collections.abc import Callable, Coroutine, Mapping, Sequence
from contextlib import AbstractAsyncContextManager as AsyncContextManager, closing
from typing import TYPE_CHECKING, Any, NoReturn, ParamSpec, TypeVar

try:
Expand All @@ -45,7 +45,6 @@
from ._utils import create_connection, create_datagram_socket, ensure_resolved, open_listener_sockets_from_getaddrinfo_result
from .datagram.endpoint import create_datagram_endpoint
from .datagram.socket import AsyncioTransportDatagramSocketAdapter, RawDatagramSocketAdapter
from .runner import AsyncioRunner
from .stream.listener import AcceptedSocket, AcceptedSSLSocket, ListenerSocketAdapter
from .stream.socket import AsyncioTransportStreamSocketAdapter, RawStreamSocketAdapter
from .tasks import SystemTask, TaskGroup, TaskUtils, TimeoutHandle
Expand All @@ -63,14 +62,20 @@


class AsyncioBackend(AbstractAsyncBackend):
__slots__ = ("__use_asyncio_transport", "__asyncio_runner_factory")
__slots__ = ("__use_asyncio_transport",)

def __init__(self, *, transport: bool = True, runner_factory: Callable[[], asyncio.Runner] | None = None) -> None:
def __init__(self, *, transport: bool = True) -> None:
self.__use_asyncio_transport: bool = bool(transport)
self.__asyncio_runner_factory: Callable[[], asyncio.Runner] = runner_factory or asyncio.Runner

def new_runner(self) -> AsyncioRunner:
return AsyncioRunner(self.__asyncio_runner_factory())
def bootstrap(
self,
coro_func: Callable[..., Coroutine[Any, Any, _T]],
*args: Any,
runner_options: Mapping[str, Any] | None = None,
) -> _T:
# Avoid ResourceWarning by always closing the coroutine
with asyncio.Runner(**(runner_options or {})) as runner, closing(coro_func(*args)) as coro:
return runner.run(coro)

async def coro_yield(self) -> None:
await asyncio.sleep(0)
Expand Down
49 changes: 0 additions & 49 deletions src/easynetwork_asyncio/runner.py

This file was deleted.

Loading

0 comments on commit 52a396e

Please sign in to comment.