Skip to content

Commit

Permalink
Refactor to go async (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
almarklein committed Nov 28, 2024
1 parent cf9192a commit f9b1d0e
Show file tree
Hide file tree
Showing 13 changed files with 449 additions and 401 deletions.
5 changes: 2 additions & 3 deletions rendercanvas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from ._version import __version__, version_info
from . import _coreutils
from ._events import EventType
from .base import BaseRenderCanvas, BaseLoop, BaseTimer
from .base import BaseRenderCanvas, BaseLoop

__all__ = [
"BaseLoop",
"BaseRenderCanvas",
"EventType",
"BaseLoop",
"BaseTimer",
]
124 changes: 124 additions & 0 deletions rendercanvas/_async_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import time
import logging

from sniffio import thread_local as sniffio_thread_local


logger = logging.getLogger("rendercanvas")


class Sleeper:
def __init__(self, when):
self.when = when

def __await__(self):
yield {"wait_method": "sleep", "when": self.when}


async def sleep(delay):
await Sleeper(time.perf_counter() + delay)


class Event:
def __init__(self):
self._is_set = False
self._tasks = []

async def wait(self):
if self._is_set:
return
else:
return self # triggers __await__

def __await__(self):
return {"wait_method": "event", "event": self}

def _add_task(self, task):
self._tasks.append(task)

def set(self):
self._is_set = True
for task in self._tasks:
task.call_step_soon()
self._tasks = []


class CancelledError(BaseException):
"""Exception raised when a task is cancelled."""

pass


class Task:
def __init__(self, loop, coro, name):
self.loop = loop
self.coro = coro
self.name = name
self.cancelled = False
self._done_callbacks = []
self.call_step_soon()

def add_done_callback(self, callback):
self._done_callbacks.append(callback)

def _close(self):
self.loop = None
self.coro = None
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
pass

def call_step_soon(self):
self.loop._rc_call_soon(self.step)

def call_step_at(self, when):
self.loop._rc_call_at(when, self.step)

def cancel(self):
self.cancelled = True

def step(self):
if self.coro is None:
return

result = None
stop = False

old_name, sniffio_thread_local.name = sniffio_thread_local.name, __name__
try:
if self.cancelled:
stop = True
self.coro.throw(CancelledError()) # falls through if not caught
self.coro.close() # raises GeneratorExit
else:
result = self.coro.send(None)
except CancelledError:
stop = True
except StopIteration:
stop = True
except Exception as err:
# This should not happen, because the loop catches and logs all errors. But just in case.
logger.error(f"Error in task: {err}")
stop = True
finally:
sniffio_thread_local.name = old_name

# Clean up to help gc
if stop:
return self._close()

if not (isinstance(result, dict) and result.get("wait_method", None)):
raise RuntimeError(
f"Incompatible awaitable result {result!r}. Maybe you used asyncio or trio (this does not run on either)?"
)

wait_method = result["wait_method"]

if wait_method == "sleep":
self.call_step_at(result["when"])
elif wait_method == "event":
result["event"]._add_task(self)
else:
raise RuntimeError(f"Unknown wait_method {wait_method!r}.")
15 changes: 15 additions & 0 deletions rendercanvas/_async_sniffs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import sys
import sniffio


async def sleep(delay):
libname = sniffio.current_async_library()
sleep = sys.modules[libname].sleep
await sleep(delay)


class Event:
def __new__(cls):
libname = sniffio.current_async_library()
Event = sys.modules[libname].Event # noqa
return Event()
14 changes: 9 additions & 5 deletions rendercanvas/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import time
from asyncio import iscoroutinefunction # note: is not asyncio-specific
from collections import defaultdict, deque

from ._coreutils import log_exception, BaseEnum
Expand Down Expand Up @@ -182,7 +183,7 @@ def submit(self, event):

self._pending_events.append(event)

def flush(self):
async def flush(self):
"""Dispatch all pending events.
This should generally be left to the scheduler.
Expand All @@ -192,9 +193,9 @@ def flush(self):
event = self._pending_events.popleft()
except IndexError:
break
self.emit(event)
await self.emit(event)

def emit(self, event):
async def emit(self, event):
"""Directly emit the given event.
In most cases events should be submitted, so that they are flushed
Expand All @@ -208,11 +209,14 @@ def emit(self, event):
if event.get("stop_propagation", False):
break
with log_exception(f"Error during handling {event_type} event"):
callback(event)
if iscoroutinefunction(callback):
await callback(event)
else:
callback(event)

def _rc_close(self):
"""Wrap up when the scheduler detects the canvas is closed/dead."""
# This is a little feature because detecting a widget from closing can be tricky.
if not self._closed:
self.submit({"event_type": "close"})
self.flush()
# todo: !! ??self.flush()
Loading

0 comments on commit f9b1d0e

Please sign in to comment.