Skip to content

Commit

Permalink
update: adds option to specify loop type
Browse files Browse the repository at this point in the history
- Adds handling to prevent non-ideal configuration
  • Loading branch information
namsnath committed Nov 24, 2024
1 parent 0616c49 commit 3b32d32
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 21 deletions.
4 changes: 4 additions & 0 deletions supertokens_python/async_to_sync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@


def sync(co: Coroutine[Any, Any, _T]) -> _T:
"""
Convert async function calls to sync calls using the specified `_AsyncHandler`
"""
# Disabling cyclic import since the import is lazy, and will not cause issues
from supertokens_python import supertokens # pylint: disable=cyclic-import

st = supertokens.Supertokens.get_instance()
Expand Down
125 changes: 104 additions & 21 deletions supertokens_python/async_to_sync/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,47 @@ class AsyncType(Enum):


class _AsyncHandler(ABC):
"""
Abstract class to handle async-to-sync in various environments.
"""

async_type: AsyncType
"""The type of async handling to use"""

create_loop_thread: bool
"""Whether a thread needs to be created to run an event loop"""

loop: Optional[asyncio.AbstractEventLoop]
"""The event loop to use for async-to-sync conversions"""

is_loop_threaded: bool
"""Whether the passed loop is running in a thread"""

def __init__(
self,
create_loop_thread: bool,
loop: Optional[asyncio.AbstractEventLoop],
is_loop_threaded: bool,
):
# TODO: Add checks on the socket to see if it's patched by Gevent/Eventlet
# TODO: Consider setting the type of loop (thread/normal) and base sync implementation on that

if loop is not None:
if create_loop_thread:
raise ValueError("Pass either `loop` or `create_loop_thread`, not both")
# Either the user passes in a loop or tells us to create a thread, not both
# If neither is passed, we use the default event loop handling
if loop is not None and create_loop_thread:
raise ValueError("Pass either `loop` or `create_loop_thread`, not both")

if is_loop_threaded:
if loop is None and not create_loop_thread:
raise ValueError(
"Loop cannot be marked as threaded without passing in `loop` or `create_loop_thread`"
)

if create_loop_thread:
is_loop_threaded = True

self.loop = loop
self.create_loop_thread = create_loop_thread
self.is_loop_threaded = is_loop_threaded
self._create_loop_thread()
self._register_loop()

Expand Down Expand Up @@ -75,56 +98,102 @@ def _default_run_as_sync(
coroutine: Coroutine[Any, Any, _T],
loop: Optional[asyncio.AbstractEventLoop],
) -> _T:
# Event loop running in separate thread
if self.is_loop_threaded:
if self.loop is None:
raise ValueError(
"Expected `loop` to not be `None` when `is_loop_threaded` is True"
)

future = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
return future.result()

# Normal event loop in the current thread
if loop is None:
loop = create_or_get_event_loop()

return loop.run_until_complete(coroutine)


class DefaultHandler(_AsyncHandler):
"""
Default async handler for Asyncio-based apps.
"""
async_type = AsyncType.asyncio

def __init__(self):
super().__init__(create_loop_thread=False, loop=None)
super().__init__(create_loop_thread=False, loop=None, is_loop_threaded=False)

def run_as_sync(self, coroutine: Coroutine[Any, Any, _T]) -> _T:
return super()._default_run_as_sync(coroutine, self.loop)


class AsyncioHandler(_AsyncHandler):
"""
Async handler specific to Asyncio-based apps.
Only meant for cases where existing event loops need to be re-used, or new
threaded-loops need to be created.
For normal use-cases, prefer the `DefaultHandler`.
"""
async_type = AsyncType.asyncio

def __init__(
self,
create_loop_thread: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
is_loop_threaded: bool = False,
):
# NOTE: Creating a non-threaded loop and storing it causes asyncio context issues.
# Handles missing loops similar to `DefaultHandler`
super().__init__(create_loop_thread=create_loop_thread, loop=loop)
if loop is not None and not is_loop_threaded:
raise ValueError(
"For existing, non-threaded loops in asyncio, prefer using DefaultHandler"
)

def run_as_sync(self, coroutine: Coroutine[Any, Any, _T]) -> _T:
if self.loop is None:
return super()._default_run_as_sync(coroutine, self.loop)
super().__init__(
create_loop_thread=create_loop_thread,
loop=loop,
is_loop_threaded=is_loop_threaded,
)

future = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
return future.result()
def run_as_sync(self, coroutine: Coroutine[Any, Any, _T]) -> _T:
return super()._default_run_as_sync(coroutine, self.loop)


class GeventHandler(_AsyncHandler):
"""
Async handler specific to Gevent-based apps.
Does not work optimally with event loops on the same thread, will drop requests.
Requires a separate thread for the event loop to work well.
"""
async_type = AsyncType.gevent

def __init__(
self,
create_loop_thread: bool = True,
loop: Optional[asyncio.AbstractEventLoop] = None,
is_loop_threaded: bool = True,
):
super().__init__(create_loop_thread=create_loop_thread, loop=loop)
if not create_loop_thread:
if not is_loop_threaded:
raise ValueError(
"Non-Threaded gevent loops result in stuck requests, use a threaded loop instead"
)

super().__init__(
create_loop_thread=create_loop_thread,
loop=loop,
is_loop_threaded=is_loop_threaded,
)

def run_as_sync(self, coroutine: Coroutine[Any, Any, _T]) -> _T:
if self.loop is None:
# When a loop isn't declared or is not in a thread, handle as usual
if self.loop is None or not self.is_loop_threaded:
return super()._default_run_as_sync(coroutine, self.loop)

# When loop is in a thread, we can optimize using Events
from gevent.event import Event # type: ignore

future = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
Expand All @@ -135,28 +204,42 @@ def run_as_sync(self, coroutine: Coroutine[Any, Any, _T]) -> _T:


class EventletHandler(_AsyncHandler):
"""
Async handler specific to Eventlet-based apps.
Does not work with event loops on the same thread.
Requires a separate thread for the event loop.
"""
async_type = AsyncType.eventlet

def __init__(
self,
create_loop_thread: bool = True,
loop: Optional[asyncio.AbstractEventLoop] = None,
is_loop_threaded: bool = True,
):
if not create_loop_thread:
raise ValueError(
"Cannot use eventlet with Supertokens without a dedicated event loop thread. "
"Please set `create_loop_thread=True`."
)

super().__init__(create_loop_thread=create_loop_thread, loop=loop)
if loop is None or not is_loop_threaded:
raise ValueError(
"Cannot use eventlet with Supertokens without a dedicated event loop thread. "
"Please set `create_loop_thread=True` or pass in a threaded event loop."
)

super().__init__(
create_loop_thread=create_loop_thread,
loop=loop,
is_loop_threaded=is_loop_threaded,
)

def run_as_sync(self, coroutine: Coroutine[Any, Any, _T]) -> _T:
if self.loop is None:
# Eventlet only works well when the event loop is in a different thread
if self.loop is None or not self.is_loop_threaded:
raise ValueError(
"Cannot use eventlet with Supertokens without a dedicated event loop thread. "
"Please set `create_loop_thread=True`."
"Please set `create_loop_thread=True` or pass in a threaded event loop."
)

# Use Events to handle loop callbacks
from eventlet.event import Event # type: ignore

future = asyncio.run_coroutine_threadsafe(coroutine, loop=self.loop)
Expand Down

0 comments on commit 3b32d32

Please sign in to comment.