Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made multiple methods synchronous #113

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Events

.. autoclass:: Event
.. autoclass:: Signal
.. autoclass:: SignalQueueFull
.. autofunction:: stream_events
.. autofunction:: wait_event

Expand Down
50 changes: 27 additions & 23 deletions docs/userguide/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ Working with signals and events

.. py:currentmodule:: asphalt.core

Events are a handy way to make your code react to changes in another part of the application.
To dispatch and listen to events, you first need to have one or more
Events are a handy way to make your code react to changes in another part of the
application. To dispatch and listen to events, you first need to have one or more
:class:`Signal` instances as attributes of some class. Each signal needs to be
associated with some :class:`Event` class. Then, when you dispatch a new event
by calling :meth:`Signal.dispatch`, a new instance of this event class will be
constructed and passed to all listener callbacks.

To listen to events dispatched from a signal, you need to have a function or any other callable
that accepts a single positional argument. You then pass this callable to
To listen to events dispatched from a signal, you need to have a function or any other
callable that accepts a single positional argument. You then pass this callable to
:meth:`Signal.connect`. That's it!

To disconnect the callback, simply call :meth:`Signal.disconnect` with whatever
Expand All @@ -34,11 +34,11 @@ Here's how it works::


def plain_listener(event):
print('received event: %s' % event)
print(f'received event: {event}')


async def coro_listener(event):
print('coroutine listeners are fine too: %s' % event)
print(f'coroutine listeners are fine too: {event}')


async def some_handler():
Expand All @@ -49,16 +49,17 @@ Here's how it works::
# Dispatches an Event instance
source.somesignal.dispatch()

# Dispatches a CustomEvent instance (the extra argument is passed to its constructor)
# Dispatches a CustomEvent instance (the extra argument is passed to its
# constructor)
source.customsignal.dispatch('extra argument here')

Exception handling
------------------

Any exceptions raised by the listener callbacks are logged to the ``asphalt.core.event`` logger.
Additionally, the future returned by :meth:`Signal.dispatch` resolves to
``True`` if no exceptions were raised during the processing of listeners. This was meant as a
convenience for use with tests where you can just do
Any exceptions raised by the listener callbacks are logged to the ``asphalt.core.event``
logger. Additionally, the future returned by :meth:`Signal.dispatch` resolves to
``True`` if no exceptions were raised during the processing of listeners. This was meant
as a convenience for use with tests where you can just do
``assert await thing.some_signal.dispatch('foo')``.

Waiting for a single event
Expand All @@ -78,15 +79,19 @@ You can even wait for the next event dispatched from any of several signals usin


async def print_next_event(source1, source2, source3):
event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal)
event = await wait_event(
[source1.some_signal, source2.another_signal, source3.some_signal]
)
print(event)

As a convenience, you can provide a filter callback that will cause the call to only return when
the callback returns ``True``::

async def print_next_matching_event(source1, source2, source3):
event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal,
lambda event: event.myrandomproperty == 'foo')
event = await wait_event(
[source1.some_signal, source2.another_signal, source3.some_signal],
lambda event: event.myrandomproperty == 'foo'
)
print(event)

Receiving events iteratively
Expand All @@ -95,11 +100,8 @@ Receiving events iteratively
With :meth:`Signal.stream_events`, you can even asynchronously iterate over
events dispatched from a signal::

from contextlib import aclosing # on Python < 3.10, import from async_generator or contextlib2


async def listen_to_events(source):
async with aclosing(source.somesignal.stream_events()) as stream:
async with source.somesignal.stream_events() as stream:
async for event in stream:
print(event)

Expand All @@ -109,16 +111,18 @@ Using :func:`stream_events`, you can stream events from multiple signals::


async def listen_to_events(source1, source2, source3):
stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal)
async with aclosing(stream):
async with stream_events(
[source1.some_signal, source2.another_signal, source3.some_signal]
) as stream:
async for event in stream:
print(event)

The filtering capability of :func:`wait_event` works here too::

async def listen_to_events(source1, source2, source3):
stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal,
lambda event: event.randomproperty == 'foo')
async with aclosing(stream):
async with stream_events(
[source1.some_signal, source2.another_signal, source3.some_signal],
lambda event: event.randomproperty == 'foo'
) as stream:
async for event in stream:
print(event)
2 changes: 1 addition & 1 deletion examples/tutorial2/webnotifier/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, url: str, delay: int = 10):
@context_teardown
async def start(self) -> AsyncGenerator[None, Exception | None]:
detector = Detector(self.url, self.delay)
await add_resource(detector)
add_resource(detector)
await start_background_task(detector.run, "Web page change detector")
logging.info(
'Started web page change detector for url "%s" with a delay of %d seconds',
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ branch = true

[tool.coverage.report]
show_missing = true
exclude_also = [
"@overload"
]

[tool.tox]
legacy_tox_ini = """
Expand Down
1 change: 1 addition & 0 deletions src/asphalt/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ._context import start_background_task as start_background_task
from ._event import Event as Event
from ._event import Signal as Signal
from ._event import SignalQueueFull as SignalQueueFull
from ._event import stream_events as stream_events
from ._event import wait_event as wait_event
from ._exceptions import ApplicationExit as ApplicationExit
Expand Down
16 changes: 8 additions & 8 deletions src/asphalt/core/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async def __aexit__(
finally:
self._state = ContextState.closed

async def add_resource(
def add_resource(
self,
value: T_Resource,
name: str = "default",
Expand Down Expand Up @@ -366,9 +366,9 @@ async def add_resource(
self.add_teardown_callback(teardown_callback)

# Notify listeners that a new resource has been made available
await self.resource_added.dispatch(ResourceEvent(types_, name, False))
self.resource_added.dispatch(ResourceEvent(types_, name, False))

async def add_resource_factory(
def add_resource_factory(
self,
factory_callback: factory_callback_type,
name: str = "default",
Expand Down Expand Up @@ -474,7 +474,7 @@ async def add_resource_factory(
self._resource_factories[(type_, name)] = resource

# Notify listeners that a new resource has been made available
await self.resource_added.dispatch(ResourceEvent(resource_types, name, True))
self.resource_added.dispatch(ResourceEvent(resource_types, name, True))

def _generate_resource_from_factory(self, factory: ResourceContainer) -> Any:
retval = factory.value_or_factory()
Expand Down Expand Up @@ -800,7 +800,7 @@ def current_context() -> Context:
return ctx


async def add_resource(
def add_resource(
value: T_Resource,
name: str = "default",
types: type | Sequence[type] = (),
Expand All @@ -813,12 +813,12 @@ async def add_resource(

.. seealso:: :meth:`Context.add_resource`
"""
await current_context().add_resource(
current_context().add_resource(
value, name, types, description=description, teardown_callback=teardown_callback
)


async def add_resource_factory(
def add_resource_factory(
factory_callback: factory_callback_type,
name: str = "default",
*,
Expand All @@ -831,7 +831,7 @@ async def add_resource_factory(
.. seealso:: :meth:`Context.add_resource_factory`

"""
await current_context().add_resource_factory(
current_context().add_resource_factory(
factory_callback, name, types=types, description=description
)

Expand Down
29 changes: 22 additions & 7 deletions src/asphalt/core/_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
from datetime import datetime, timezone
from time import time as stdlib_time
from typing import Any, Generic, TypeVar, overload
from warnings import warn
from weakref import WeakKeyDictionary

from anyio import BrokenResourceError, create_memory_object_stream
from anyio.abc import ObjectSendStream
from anyio import BrokenResourceError, WouldBlock, create_memory_object_stream
from anyio.streams.memory import MemoryObjectSendStream

from ._utils import qualified_name


class SignalQueueFull(UserWarning):
"""
Warning about signal delivery failing due to a subscriber's queue being full
because the subscriber could not receive the events quickly enough.
"""


class Event:
"""
The base class for all events.
Expand Down Expand Up @@ -60,11 +68,11 @@ class BoundSignal(Generic[T_Event]):
instance: weakref.ReferenceType[Any]
topic: str

_send_streams: list[ObjectSendStream[T_Event]] = field(
_send_streams: list[MemoryObjectSendStream[T_Event]] = field(
init=False, default_factory=list
)

async def dispatch(self, event: T_Event) -> None:
def dispatch(self, event: T_Event) -> None:
"""Dispatch an event."""
if not isinstance(event, self.event_class):
raise TypeError(
Expand All @@ -78,12 +86,19 @@ async def dispatch(self, event: T_Event) -> None:

for stream in list(self._send_streams):
try:
await stream.send(event)
stream.send_nowait(event)
except BrokenResourceError:
pass
except WouldBlock:
warn(
f"Queue full ({stream.statistics().max_buffer_size}) when trying "
f"to send dispatched event to subscriber",
SignalQueueFull,
stacklevel=2,
)

@contextmanager
def _subscribe(self, send: ObjectSendStream[T_Event]) -> Iterator[None]:
def _subscribe(self, send: MemoryObjectSendStream[T_Event]) -> Iterator[None]:
self._send_streams.append(send)
yield None
self._send_streams.remove(send)
Expand Down Expand Up @@ -225,5 +240,5 @@ async def wait_event(
any of the signals

"""
async with stream_events(signals, filter, max_queue_size=1) as stream:
async with stream_events(signals, filter) as stream:
return await stream.__anext__()
Loading