Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

easynetwork_asyncio.ThreadsPortal: Fix future returned by run_coroutine_soon() #139

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions src/easynetwork_asyncio/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,21 @@ def schedule_task() -> concurrent.futures.Future[_T]:
future: concurrent.futures.Future[_T] = concurrent.futures.Future()

async def coroutine() -> None:
def on_fut_done(future: concurrent.futures.Future[_T]) -> None:
if future.cancelled():
try:
self.run_sync(task.cancel)
except RuntimeError:
# on_fut_done() called from coroutine()
# or the portal is already shut down
pass

task = TaskUtils.current_asyncio_task()
try:
if future.cancelled():
task.cancel()
else:
future.add_done_callback(on_fut_done)
result = await coro_func(*args, **kwargs)
except asyncio.CancelledError:
future.cancel()
Expand All @@ -102,20 +116,9 @@ async def coroutine() -> None:

task = self.__task_group.create_task(coroutine())
loop = task.get_loop()
del task
with self.__lock.get():
loop.call_soon(self.__register_waiter(self.__call_soon_waiters, loop).set_result, None)

def on_fut_done(future: concurrent.futures.Future[_T]) -> None:
if future.cancelled():
try:
self.run_sync(task.cancel)
except RuntimeError:
# on_fut_done() called from coroutine()
# or the portal is already shut down
pass

future.add_done_callback(on_fut_done)

return future

return self.run_sync(schedule_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,43 @@ def thread() -> None:

cancellation_ignored.assert_called_once()

async def test____create_threads_portal____run_coroutine_soon____future_cancelled_before_await(
self,
event_loop: asyncio.AbstractEventLoop,
backend: AsyncIOBackend,
) -> None:
checkpoints: list[str] = []

async def coroutine() -> None:
current_task = asyncio.current_task()
assert current_task is not None

checkpoints.append(f"{current_task.cancelling()=}")
await asyncio.sleep(0)
checkpoints.append("does-not-raise-CancelledError")

def thread() -> None:
future = threads_portal.run_coroutine_soon(coroutine)
future.cancel()

wait_concurrent_futures({future}, timeout=5) # Test if future.set_running_or_notify_cancel() have been called
assert future.cancelled()

event_loop_slowdown_handle: asyncio.Handle

def event_loop_slowdown() -> None: # Drastically slow down event loop
nonlocal event_loop_slowdown_handle

time.sleep(0.5)
event_loop_slowdown_handle = event_loop.call_soon(event_loop_slowdown)

event_loop_slowdown_handle = event_loop.call_soon(event_loop_slowdown)
async with backend.create_threads_portal() as threads_portal:
await backend.run_in_thread(thread)

event_loop_slowdown_handle.cancel()
assert checkpoints == ["current_task.cancelling()=1"]

async def test____create_threads_portal____context_exit____wait_scheduled_call_soon(
self,
event_loop: asyncio.AbstractEventLoop,
Expand Down