Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] closable run_sync_soon #1098

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
reattach_detached_coroutine_object
)

from ._entry_queue import TrioToken
from ._entry_queue import TrioToken, TrioEntryHandle

from ._parking_lot import ParkingLot

Expand Down
136 changes: 136 additions & 0 deletions trio/_core/_entry_queue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from collections import deque
import threading
import warnings

import attr

from .. import _core
from ._wakeup_socketpair import WakeupSocketpair
from .._deprecate import deprecated

__all__ = ["TrioToken"]

Expand Down Expand Up @@ -33,6 +35,9 @@ class EntryQueue:
# just to make 1 assignment, so that's atomic WRT a signal anyway.
lock = attr.ib(factory=threading.RLock)

live_handles = attr.ib(default=0)
live_handles_lock = attr.ib(factory=threading.Lock)

async def task(self):
assert _core.currently_ki_protected()
# RLock has two implementations: a signal-safe version in _thread, and
Expand Down Expand Up @@ -122,6 +127,136 @@ def run_sync_soon(self, sync_fn, *args, idempotent=False):
self.queue.append((sync_fn, args))
self.wakeup.wakeup_thread_and_signal_safe()

def open_handle(self):
with self.live_handles_lock:
self.live_handles += 1
return TrioEntryHandle(self)


class TrioEntryHandle:
def __init__(self, reentry_queue):
self._reentry_queue = reentry_queue
self._thread = threading.current_thread()
self._lock = threading.RLock()

def run_sync_soon(self, sync_fn, *args, idempotent=False):
"""Schedule a call to ``sync_fn(*args)`` to occur in the context of a
Trio task.

Most functions in Trio are only safe to call from inside the main Trio
thread, and never from signal handlers or ``__del__`` methods. But
this method is safe to call from the main thread, from other threads,
from signal handlers, and from ``__del__`` methods. This is the
fundamental primitive used to re-enter the Trio run loop from outside
of it.

The call will happen "soon", but there's no guarantee about exactly
when, and no mechanism provided for finding out when it's happened.
If you need this, you'll have to build your own.

The call is effectively run as part of a system task (see
:func:`~trio.hazmat.spawn_system_task`). In particular this means
that:

* :exc:`KeyboardInterrupt` protection is *enabled* by default. Your
function won't be interrupted by control-C.

* If ``sync_fn`` raises an exception, then it's converted into a
:exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You
should be careful that ``sync_fn`` doesn't raise an exception.

All calls with ``idempotent=False`` are processed in strict
first-in first-out order.

If ``idempotent=True``, then ``sync_fn`` and ``args`` must be
hashable, and Trio will make a best-effort attempt to discard any call
submission which is equal to an already-pending call. Trio will make
an attempt to process these in first-in first-out order, but no
guarantees. (Currently processing is FIFO on CPython 3.6+ and on PyPy,
but not CPython 3.5.)

Any ordering guarantees apply separately to ``idempotent=False``
and ``idempotent=True`` calls; there's no rule for how calls in the
different categories are ordered with respect to each other.

Raises:
trio.ClosedResourceError: If this handle has already been closed.
trio.RunFinishedError: If the associated call to :func:`trio.run`
has already exited.

"""
with self._lock:
if self._reentry_queue is None:
raise _core.ClosedResourceError
self._reentry_queue.run_sync_soon(
sync_fn, *args, idempotent=idempotent
)

def close(self):
"""Close this entry handle.

After the entry handle is closed, it cannot be used.

This method is thread-safe: you can call it from any thread at any
time. It is *not* reentrant-safe: you should *never* call it from a
signal handler or ``__del__`` method.

.. warning:: It is very important to always close your entry handles
when you are done with them! If you don't, Trio won't be able to
detect when a program deadlocks.

"""
with self._lock:
# This code can't mutate anything – see the comment below.
if self._reentry_queue is None:
return
rq = self._reentry_queue

# We have the lock, so we can't be racing with another thread
# calling close() or run_sync_soon(). And by assumption, no signal
# handler or __del__ method can call close(). So we don't need to
# worry about multiple close() calls racing with each other.
#
# BUT, a signal handler or __del__ method *could* call
# run_sync_soon at any point during this call, i.e. the
# interpreter could pause interpreting this function while it uses
# our thread to execute a call to run_sync_soon, and we need to
# handle that correctly.
#
# The next line is the barrier for any reentrant run_sync_soon
# calls. If they get called before this assignment takes effect,
# they're fine; we haven't done anything yet. If not, then they'll
# raise ClosedResourceError.
self._reentry_queue = None

with rq.live_handles_lock:
rq.live_handles -= 1
if rq.live_handles == 0:
try:
rq.run_sync_soon(lambda: None)
except _core.RunFinishedError:
pass

def __del__(self):
# Closing ourselves reliably from __del__ would be very tricky, and if
# anyone did rely on it then it would almost certainly break the
# deadlock detector. (If you rely on the GC to detect when you're
# deadlocked, that's a problem, because deadlocked programs don't tend
# to trigger the GC.) So instead we just issue a noisy warning.
if self._reentry_queue is not None:
warnings.warn(
RuntimeWarning(
"failed to close TrioEntryHandle. this is a bug!"
),
source=self,
)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()


class TrioToken:
"""An opaque object representing a single call to :func:`trio.run`.
Expand All @@ -147,6 +282,7 @@ class TrioToken:
def __init__(self, reentry_queue):
self._reentry_queue = reentry_queue

@deprecated("0.12.0", issue=1085, instead="open_trio_entry_handle")
def run_sync_soon(self, sync_fn, *args, idempotent=False):
"""Schedule a call to ``sync_fn(*args)`` to occur in the context of a
Trio task.
Expand Down
8 changes: 8 additions & 0 deletions trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,14 @@ def current_trio_token(self):
self.trio_token = TrioToken(self.entry_queue)
return self.trio_token

@_public
def open_trio_entry_handle(self):
"""Open a new `TrioEntryHandle`, to let you reenter the current call
to `trio.run` from other contexts.

"""
return self.entry_queue.open_handle()

################
# KI handling
################
Expand Down
18 changes: 9 additions & 9 deletions trio/_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ def open_signal_receiver(*signals):
"Sorry, open_signal_receiver is only possible when running in "
"Python interpreter's main thread"
)
token = trio.hazmat.current_trio_token()
queue = SignalReceiver()
with trio.hazmat.open_trio_entry_handle() as entry_handle:
queue = SignalReceiver()

def handler(signum, _):
token.run_sync_soon(queue._add, signum, idempotent=True)
def handler(signum, _):
entry_handle.run_sync_soon(queue._add, signum, idempotent=True)

try:
with _signal_handler(signals, handler):
yield queue
finally:
queue._redeliver_remaining()
try:
with _signal_handler(signals, handler):
yield queue
finally:
queue._redeliver_remaining()
3 changes: 2 additions & 1 deletion trio/hazmat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
permanently_detach_coroutine_object, reattach_detached_coroutine_object,
current_statistics, reschedule, remove_instrument, add_instrument,
current_clock, current_root_task, checkpoint_if_cancelled,
spawn_system_task, wait_readable, wait_writable, notify_closing
spawn_system_task, wait_readable, wait_writable, notify_closing,
open_trio_entry_handle, TrioEntryHandle
)

# Unix-specific symbols
Expand Down
4 changes: 2 additions & 2 deletions trio/testing/_mock_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ def _real_to_virtual(self, real):
return self._virtual_base + virtual_offset

def start_clock(self):
token = _core.current_trio_token()
token.run_sync_soon(self._maybe_spawn_autojump_task)
with _core.open_trio_entry_handle() as handle:
handle.run_sync_soon(self._maybe_spawn_autojump_task)

def current_time(self):
return self._real_to_virtual(self._real_clock())
Expand Down
4 changes: 2 additions & 2 deletions trio/tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ async def test_open_signal_receiver_conflict():
# processed.
async def wait_run_sync_soon_idempotent_queue_barrier():
ev = trio.Event()
token = _core.current_trio_token()
token.run_sync_soon(ev.set, idempotent=True)
with _core.open_trio_entry_handle() as handle:
handle.run_sync_soon(ev.set, idempotent=True)
await ev.wait()


Expand Down