From 381074fb89c054819a7ee2404c7e6f087bbea293 Mon Sep 17 00:00:00 2001 From: Francis CLAIRICIA-ROSE-CLAIRE-JOSEPHINE Date: Sat, 4 Nov 2023 16:37:29 +0100 Subject: [PATCH 1/2] [FIX] Fixed race condition in ThreadsPortal.run_coroutine_soon() --- src/easynetwork/lowlevel/asyncio/threads.py | 28 +++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/easynetwork/lowlevel/asyncio/threads.py b/src/easynetwork/lowlevel/asyncio/threads.py index 342dbb04..b88d40bb 100644 --- a/src/easynetwork/lowlevel/asyncio/threads.py +++ b/src/easynetwork/lowlevel/asyncio/threads.py @@ -21,6 +21,7 @@ import asyncio import concurrent.futures +import contextlib import contextvars import inspect from collections.abc import Awaitable, Callable @@ -85,19 +86,16 @@ def schedule_task() -> concurrent.futures.Future[_T]: async def coroutine() -> None: def on_fut_done(future: concurrent.futures.Future[_T]) -> None: if future.cancelled(): - try: + if self.__is_in_this_loop_thread(loop): + task.cancel() + return + with contextlib.suppress(RuntimeError): self.run_sync(task.cancel) - except RuntimeError: - # on_fut_done() called from coroutine() - # or the portal is already shut down - pass task = TaskUtils.current_asyncio_task() + loop = task.get_loop() try: - if future.cancelled(): - task.cancel() - else: - future.add_done_callback(on_fut_done) + future.add_done_callback(on_fut_done) result = await coro_func(*args, **kwargs) except asyncio.CancelledError: future.cancel() @@ -158,13 +156,17 @@ def __check_loop(self) -> asyncio.AbstractEventLoop: loop = self.__loop if loop is None: raise RuntimeError("ThreadsPortal not running.") + if self.__is_in_this_loop_thread(loop): + raise RuntimeError("This function must be called in a different OS thread") + return loop + + @staticmethod + def __is_in_this_loop_thread(loop: asyncio.AbstractEventLoop) -> bool: try: running_loop = asyncio.get_running_loop() except RuntimeError: - return loop - if running_loop is loop: - raise RuntimeError("This function must be called in a different OS thread") - return loop + return False + return running_loop is loop @staticmethod def __register_waiter(waiters: set[asyncio.Future[None]], loop: asyncio.AbstractEventLoop) -> asyncio.Future[None]: From 6fffec70ca8a72a3363fa8ec93f5f52c42b62339 Mon Sep 17 00:00:00 2001 From: Francis CLAIRICIA-ROSE-CLAIRE-JOSEPHINE Date: Sat, 4 Nov 2023 16:49:08 +0100 Subject: [PATCH 2/2] [FIX] Do not call task.cancel() when the coroutine itself is cancelled --- src/easynetwork/lowlevel/asyncio/threads.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/easynetwork/lowlevel/asyncio/threads.py b/src/easynetwork/lowlevel/asyncio/threads.py index b88d40bb..f863c0ef 100644 --- a/src/easynetwork/lowlevel/asyncio/threads.py +++ b/src/easynetwork/lowlevel/asyncio/threads.py @@ -87,17 +87,20 @@ async def coroutine() -> None: def on_fut_done(future: concurrent.futures.Future[_T]) -> None: if future.cancelled(): if self.__is_in_this_loop_thread(loop): - task.cancel() + if not cancelling: + task.cancel() return with contextlib.suppress(RuntimeError): self.run_sync(task.cancel) task = TaskUtils.current_asyncio_task() loop = task.get_loop() + cancelling: bool = False try: future.add_done_callback(on_fut_done) result = await coro_func(*args, **kwargs) except asyncio.CancelledError: + cancelling = True future.cancel() future.set_running_or_notify_cancel() raise