Skip to content

Commit

Permalink
Defer fix
Browse files Browse the repository at this point in the history
Fix for event loops in defer.py
  • Loading branch information
frdel committed Sep 13, 2024
1 parent 4307dce commit 62e244f
Showing 1 changed file with 38 additions and 47 deletions.
85 changes: 38 additions & 47 deletions python/helpers/defer.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,61 @@
import asyncio
import threading
from concurrent.futures import Future
from typing import Any, Callable, Optional, Coroutine

class DeferredTask:
def __init__(self, func, *args, **kwargs):
self._loop: asyncio.AbstractEventLoop = None # type: ignore
self._task = None
self._future = Future()
self._task_initialized = threading.Event()
self._start_task(func, *args, **kwargs)
class EventLoopThread:
_instance = None
_lock = threading.Lock()

def _start_task(self, func, *args, **kwargs):
def run_in_thread():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._task = self._loop.create_task(self._run(func, *args, **kwargs))
self._task_initialized.set()
self._loop.run_forever()
def __init__(self) -> None:
self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self.thread: threading.Thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.thread.start()

self._thread = threading.Thread(target=run_in_thread)
self._thread.start()
def __new__(cls) -> 'EventLoopThread':
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.__init__()
return cls._instance

async def _run(self, func, *args, **kwargs):
try:
result = await func(*args, **kwargs)
self._future.set_result(result)
except Exception as e:
self._future.set_exception(e)
finally:
self._loop.call_soon_threadsafe(self._cleanup)
def _run_event_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()

def _cleanup(self):
self._loop.stop()
def run_coroutine(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self.loop)

def is_ready(self):
return self._future.done()
class DeferredTask:
def __init__(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any) -> None:
self._event_loop_thread = EventLoopThread()
self._future: Future[Any] = self._event_loop_thread.run_coroutine(self._run(func, *args, **kwargs))

async def _run(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any) -> Any:
return await func(*args, **kwargs)

async def result(self, timeout=None):
if not self._task_initialized.wait(timeout):
raise RuntimeError("Task was not initialized properly.")
def is_ready(self) -> bool:
return self._future.done()

async def result(self, timeout: Optional[float] = None) -> Any:
try:
return await asyncio.wait_for(asyncio.wrap_future(self._future), timeout)
except asyncio.TimeoutError:
raise TimeoutError("The task did not complete within the specified timeout.")

def result_sync(self, timeout=None):
if not self._task_initialized.wait(timeout):
raise RuntimeError("Task was not initialized properly.")

def result_sync(self, timeout: Optional[float] = None) -> Any:
try:
return self._future.result(timeout)
except TimeoutError:
raise TimeoutError("The task did not complete within the specified timeout.")

def kill(self):
if self._task and not self._task.done():
self._loop.call_soon_threadsafe(self._task.cancel)
def kill(self) -> None:
if not self._future.done():
self._future.cancel()

def is_alive(self):
return self._thread.is_alive() and not self._future.done()
def is_alive(self) -> bool:
return not self._future.done()

def __del__(self):
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._cleanup)
if self._thread and self._thread.is_alive():
self._thread.join()
if self._loop:
self._loop.close()
# Helper function to run async code
async def run_async(func, *args, **kwargs):
return await func(*args, **kwargs)

0 comments on commit 62e244f

Please sign in to comment.