diff --git a/docs/api.rst b/docs/api.rst index 2a6ca6b0..1fe3c883 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -38,6 +38,7 @@ Events .. autoclass:: Event .. autoclass:: Signal +.. autoclass:: SignalQueueFull .. autofunction:: stream_events .. autofunction:: wait_event diff --git a/docs/userguide/events.rst b/docs/userguide/events.rst index f75d13a3..e7567062 100644 --- a/docs/userguide/events.rst +++ b/docs/userguide/events.rst @@ -3,15 +3,15 @@ Working with signals and events .. py:currentmodule:: asphalt.core -Events are a handy way to make your code react to changes in another part of the application. -To dispatch and listen to events, you first need to have one or more +Events are a handy way to make your code react to changes in another part of the +application. To dispatch and listen to events, you first need to have one or more :class:`Signal` instances as attributes of some class. Each signal needs to be associated with some :class:`Event` class. Then, when you dispatch a new event by calling :meth:`Signal.dispatch`, a new instance of this event class will be constructed and passed to all listener callbacks. -To listen to events dispatched from a signal, you need to have a function or any other callable -that accepts a single positional argument. You then pass this callable to +To listen to events dispatched from a signal, you need to have a function or any other +callable that accepts a single positional argument. You then pass this callable to :meth:`Signal.connect`. That's it! To disconnect the callback, simply call :meth:`Signal.disconnect` with whatever @@ -34,11 +34,11 @@ Here's how it works:: def plain_listener(event): - print('received event: %s' % event) + print(f'received event: {event}') async def coro_listener(event): - print('coroutine listeners are fine too: %s' % event) + print(f'coroutine listeners are fine too: {event}') async def some_handler(): @@ -49,16 +49,17 @@ Here's how it works:: # Dispatches an Event instance source.somesignal.dispatch() - # Dispatches a CustomEvent instance (the extra argument is passed to its constructor) + # Dispatches a CustomEvent instance (the extra argument is passed to its + # constructor) source.customsignal.dispatch('extra argument here') Exception handling ------------------ -Any exceptions raised by the listener callbacks are logged to the ``asphalt.core.event`` logger. -Additionally, the future returned by :meth:`Signal.dispatch` resolves to -``True`` if no exceptions were raised during the processing of listeners. This was meant as a -convenience for use with tests where you can just do +Any exceptions raised by the listener callbacks are logged to the ``asphalt.core.event`` +logger. Additionally, the future returned by :meth:`Signal.dispatch` resolves to +``True`` if no exceptions were raised during the processing of listeners. This was meant +as a convenience for use with tests where you can just do ``assert await thing.some_signal.dispatch('foo')``. Waiting for a single event @@ -78,15 +79,19 @@ You can even wait for the next event dispatched from any of several signals usin async def print_next_event(source1, source2, source3): - event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal) + event = await wait_event( + [source1.some_signal, source2.another_signal, source3.some_signal] + ) print(event) As a convenience, you can provide a filter callback that will cause the call to only return when the callback returns ``True``:: async def print_next_matching_event(source1, source2, source3): - event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal, - lambda event: event.myrandomproperty == 'foo') + event = await wait_event( + [source1.some_signal, source2.another_signal, source3.some_signal], + lambda event: event.myrandomproperty == 'foo' + ) print(event) Receiving events iteratively @@ -95,11 +100,8 @@ Receiving events iteratively With :meth:`Signal.stream_events`, you can even asynchronously iterate over events dispatched from a signal:: - from contextlib import aclosing # on Python < 3.10, import from async_generator or contextlib2 - - async def listen_to_events(source): - async with aclosing(source.somesignal.stream_events()) as stream: + async with source.somesignal.stream_events() as stream: async for event in stream: print(event) @@ -109,16 +111,18 @@ Using :func:`stream_events`, you can stream events from multiple signals:: async def listen_to_events(source1, source2, source3): - stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal) - async with aclosing(stream): + async with stream_events( + [source1.some_signal, source2.another_signal, source3.some_signal] + ) as stream: async for event in stream: print(event) The filtering capability of :func:`wait_event` works here too:: async def listen_to_events(source1, source2, source3): - stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal, - lambda event: event.randomproperty == 'foo') - async with aclosing(stream): + async with stream_events( + [source1.some_signal, source2.another_signal, source3.some_signal], + lambda event: event.randomproperty == 'foo' + ) as stream: async for event in stream: print(event) diff --git a/examples/tutorial2/webnotifier/detector.py b/examples/tutorial2/webnotifier/detector.py index 93517dc1..560956b2 100644 --- a/examples/tutorial2/webnotifier/detector.py +++ b/examples/tutorial2/webnotifier/detector.py @@ -66,7 +66,7 @@ def __init__(self, url: str, delay: int = 10): @context_teardown async def start(self) -> AsyncGenerator[None, Exception | None]: detector = Detector(self.url, self.delay) - await add_resource(detector) + add_resource(detector) await start_background_task(detector.run, "Web page change detector") logging.info( 'Started web page change detector for url "%s" with a delay of %d seconds', diff --git a/pyproject.toml b/pyproject.toml index 6041f262..3be57bc9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,9 @@ branch = true [tool.coverage.report] show_missing = true +exclude_also = [ + "@overload" +] [tool.tox] legacy_tox_ini = """ diff --git a/src/asphalt/core/__init__.py b/src/asphalt/core/__init__.py index b9917796..3128fe4e 100644 --- a/src/asphalt/core/__init__.py +++ b/src/asphalt/core/__init__.py @@ -23,6 +23,7 @@ from ._context import start_background_task as start_background_task from ._event import Event as Event from ._event import Signal as Signal +from ._event import SignalQueueFull as SignalQueueFull from ._event import stream_events as stream_events from ._event import wait_event as wait_event from ._exceptions import ApplicationExit as ApplicationExit diff --git a/src/asphalt/core/_context.py b/src/asphalt/core/_context.py index 7236ba2e..cb1b7318 100644 --- a/src/asphalt/core/_context.py +++ b/src/asphalt/core/_context.py @@ -297,7 +297,7 @@ async def __aexit__( finally: self._state = ContextState.closed - async def add_resource( + def add_resource( self, value: T_Resource, name: str = "default", @@ -366,9 +366,9 @@ async def add_resource( self.add_teardown_callback(teardown_callback) # Notify listeners that a new resource has been made available - await self.resource_added.dispatch(ResourceEvent(types_, name, False)) + self.resource_added.dispatch(ResourceEvent(types_, name, False)) - async def add_resource_factory( + def add_resource_factory( self, factory_callback: factory_callback_type, name: str = "default", @@ -474,7 +474,7 @@ async def add_resource_factory( self._resource_factories[(type_, name)] = resource # Notify listeners that a new resource has been made available - await self.resource_added.dispatch(ResourceEvent(resource_types, name, True)) + self.resource_added.dispatch(ResourceEvent(resource_types, name, True)) def _generate_resource_from_factory(self, factory: ResourceContainer) -> Any: retval = factory.value_or_factory() @@ -800,7 +800,7 @@ def current_context() -> Context: return ctx -async def add_resource( +def add_resource( value: T_Resource, name: str = "default", types: type | Sequence[type] = (), @@ -813,12 +813,12 @@ async def add_resource( .. seealso:: :meth:`Context.add_resource` """ - await current_context().add_resource( + current_context().add_resource( value, name, types, description=description, teardown_callback=teardown_callback ) -async def add_resource_factory( +def add_resource_factory( factory_callback: factory_callback_type, name: str = "default", *, @@ -831,7 +831,7 @@ async def add_resource_factory( .. seealso:: :meth:`Context.add_resource_factory` """ - await current_context().add_resource_factory( + current_context().add_resource_factory( factory_callback, name, types=types, description=description ) diff --git a/src/asphalt/core/_event.py b/src/asphalt/core/_event.py index 01c34db0..59e29f8f 100644 --- a/src/asphalt/core/_event.py +++ b/src/asphalt/core/_event.py @@ -12,14 +12,22 @@ from datetime import datetime, timezone from time import time as stdlib_time from typing import Any, Generic, TypeVar, overload +from warnings import warn from weakref import WeakKeyDictionary -from anyio import BrokenResourceError, create_memory_object_stream -from anyio.abc import ObjectSendStream +from anyio import BrokenResourceError, WouldBlock, create_memory_object_stream +from anyio.streams.memory import MemoryObjectSendStream from ._utils import qualified_name +class SignalQueueFull(UserWarning): + """ + Warning about signal delivery failing due to a subscriber's queue being full + because the subscriber could not receive the events quickly enough. + """ + + class Event: """ The base class for all events. @@ -60,11 +68,11 @@ class BoundSignal(Generic[T_Event]): instance: weakref.ReferenceType[Any] topic: str - _send_streams: list[ObjectSendStream[T_Event]] = field( + _send_streams: list[MemoryObjectSendStream[T_Event]] = field( init=False, default_factory=list ) - async def dispatch(self, event: T_Event) -> None: + def dispatch(self, event: T_Event) -> None: """Dispatch an event.""" if not isinstance(event, self.event_class): raise TypeError( @@ -78,12 +86,19 @@ async def dispatch(self, event: T_Event) -> None: for stream in list(self._send_streams): try: - await stream.send(event) + stream.send_nowait(event) except BrokenResourceError: pass + except WouldBlock: + warn( + f"Queue full ({stream.statistics().max_buffer_size}) when trying " + f"to send dispatched event to subscriber", + SignalQueueFull, + stacklevel=2, + ) @contextmanager - def _subscribe(self, send: ObjectSendStream[T_Event]) -> Iterator[None]: + def _subscribe(self, send: MemoryObjectSendStream[T_Event]) -> Iterator[None]: self._send_streams.append(send) yield None self._send_streams.remove(send) @@ -225,5 +240,5 @@ async def wait_event( any of the signals """ - async with stream_events(signals, filter, max_queue_size=1) as stream: + async with stream_events(signals, filter) as stream: return await stream.__anext__() diff --git a/tests/test_context.py b/tests/test_context.py index 19ae1300..73109f99 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -106,9 +106,15 @@ async def test_add_resource(self, context: Context, types): notified. """ + + async def resource_adder() -> None: + await wait_all_tasks_blocked() + context.add_resource(6, "foo", types) + async with create_task_group() as tg: - tg.start_soon(context.add_resource, 6, "foo", types) - event = await context.resource_added.wait_event() + tg.start_soon(resource_adder) + with fail_after(1): + event = await context.resource_added.wait_event() assert event.resource_types == (int,) assert event.resource_name == "foo" @@ -117,9 +123,9 @@ async def test_add_resource(self, context: Context, types): async def test_add_resource_name_conflict(self, context: Context) -> None: """Test that adding a resource won't replace any existing resources.""" - await context.add_resource(5, "foo") + context.add_resource(5, "foo") with pytest.raises(ResourceConflict) as exc: - await context.add_resource(4, "foo") + context.add_resource(4, "foo") exc.match( "this context already contains a resource of type int using the name 'foo'" @@ -128,12 +134,12 @@ async def test_add_resource_name_conflict(self, context: Context) -> None: async def test_add_resource_none_value(self, context: Context) -> None: """Test that None is not accepted as a resource value.""" with pytest.raises(ValueError, match='"value" must not be None'): - await context.add_resource(None) + context.add_resource(None) async def test_add_resource_type_conflict(self, context: Context) -> None: - await context.add_resource(5) + context.add_resource(5) with pytest.raises(ResourceConflict) as exc: - await context.add_resource(6) + context.add_resource(6) exc.match( "this context already contains a resource of type int using the name " @@ -145,7 +151,7 @@ async def test_add_resource_type_conflict(self, context: Context) -> None: ) async def test_add_resource_bad_name(self, context: Context, name: str) -> None: with pytest.raises(ValueError) as exc: - await context.add_resource(1, name) + context.add_resource(1, name) exc.match( '"name" must be a nonempty string consisting only of alphanumeric ' @@ -162,7 +168,7 @@ def factory() -> int: return next(counter) counter = count(1) - await context.add_resource_factory(factory) + context.add_resource_factory(factory) assert context.require_resource(int) == 1 assert context.require_resource(int) == 1 @@ -174,7 +180,7 @@ async def test_add_resource_factory_bad_name( self, context: Context, name: str ) -> None: with pytest.raises(ValueError) as exc: - await context.add_resource_factory(lambda: 1, name, types=[int]) + context.add_resource_factory(lambda: 1, name, types=[int]) exc.match( '"name" must be a nonempty string consisting only of alphanumeric ' @@ -183,14 +189,14 @@ async def test_add_resource_factory_bad_name( async def test_add_resource_factory_empty_types(self, context: Context) -> None: with pytest.raises(ValueError) as exc: - await context.add_resource_factory(lambda: 1, types=()) + context.add_resource_factory(lambda: 1, types=()) exc.match("no resource types were specified") async def test_add_resource_factory_type_conflict(self, context: Context) -> None: - await context.add_resource_factory(lambda: None, types=(str, int)) + context.add_resource_factory(lambda: None, types=(str, int)) with pytest.raises(ResourceConflict) as exc: - await context.add_resource_factory(lambda: None, types=[int]) + context.add_resource_factory(lambda: None, types=[int]) exc.match("this context already contains a resource factory for the type int") @@ -205,7 +211,7 @@ def factory() -> int: return next(counter) counter = count(1) - await context.add_resource_factory(factory) + context.add_resource_factory(factory) assert context.require_resource(int) == 1 assert context.require_resource(int) == 1 @@ -217,14 +223,14 @@ async def test_add_resource_return_type_single(self, context: Context) -> None: def factory() -> str: return "foo" - await context.add_resource_factory(factory) + context.add_resource_factory(factory) assert context.require_resource(str) == "foo" async def test_add_resource_return_type_union(self, context: Context) -> None: def factory() -> Union[int, float]: # noqa: UP007 return 5 - await context.add_resource_factory(factory) + context.add_resource_factory(factory) assert context.require_resource(int) == 5 assert context.require_resource(float) == 5 @@ -233,7 +239,7 @@ async def test_add_resource_return_type_uniontype(self, context: Context) -> Non def factory() -> int | float: return 5 - await context.add_resource_factory(factory) + context.add_resource_factory(factory) assert context.require_resource(int) == 5 assert context.require_resource(float) == 5 @@ -241,19 +247,19 @@ async def test_add_resource_return_type_optional(self, context: Context) -> None def factory() -> Optional[str]: # noqa: UP007 return "foo" - await context.add_resource_factory(factory) + context.add_resource_factory(factory) assert context.require_resource(str) == "foo" async def test_get_static_resources(self, context: Context) -> None: - await context.add_resource(9, "foo") - await context.add_resource_factory(lambda: 7, "bar", types=[int]) + context.add_resource(9, "foo") + context.add_resource_factory(lambda: 7, "bar", types=[int]) async with Context() as subctx: - await subctx.add_resource(1, "bar") - await subctx.add_resource(4, "foo") + subctx.add_resource(1, "bar") + subctx.add_resource(4, "foo") assert subctx.get_static_resources(int) == {1, 4, 9} async def test_require_resource(self, context: Context) -> None: - await context.add_resource(1) + context.add_resource(1) assert context.require_resource(int) == 1 async def test_require_resource_not_found(self, context: Context) -> None: @@ -400,7 +406,7 @@ async def teardown_callback() -> None: resource = require_resource(str) async with Context() as ctx: - await ctx.add_resource("blah") + ctx.add_resource("blah") ctx.add_teardown_callback(teardown_callback) assert resource == "blah" @@ -413,7 +419,7 @@ async def teardown_callback() -> None: resource = require_resource(str) async with Context() as ctx: - await ctx.add_resource_factory(lambda: "blah", types=[str]) + ctx.add_resource_factory(lambda: "blah", types=[str]) ctx.add_teardown_callback(teardown_callback) assert resource == "blah" @@ -461,14 +467,14 @@ async def test_current_context() -> None: async def test_get_resource() -> None: async with Context() as ctx: - await ctx.add_resource("foo") + ctx.add_resource("foo") assert get_resource(str) == "foo" assert get_resource(int) is None async def test_require_resource() -> None: async with Context() as ctx: - await ctx.add_resource("foo") + ctx.add_resource("foo") assert require_resource(str) == "foo" pytest.raises(ResourceNotFound, require_resource, int) @@ -482,8 +488,8 @@ async def injected( return foo, bar, baz async with Context() as ctx: - await ctx.add_resource("bar_test") - await ctx.add_resource("baz_test", "alt") + ctx.add_resource("bar_test") + ctx.add_resource("baz_test", "alt") foo, bar, baz = await injected(2) assert foo == 2 @@ -498,8 +504,8 @@ def injected( return foo, bar, baz async with Context() as ctx: - await ctx.add_resource("bar_test") - await ctx.add_resource("baz_test", "alt") + ctx.add_resource("bar_test") + ctx.add_resource("baz_test", "alt") foo, bar, baz = injected(2) assert foo == 2 @@ -574,7 +580,7 @@ async def injected( async with Context() as ctx: retval: Any = injected() if sync else (await injected()) assert retval is None - await ctx.add_resource("hello") + ctx.add_resource("hello") retval = injected() if sync else (await injected()) assert retval == "hello" diff --git a/tests/test_event.py b/tests/test_event.py index cc73faa2..5812875b 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -2,9 +2,11 @@ from datetime import datetime, timedelta, timezone import pytest -from anyio import create_task_group +from anyio import create_task_group, fail_after +from anyio.abc import TaskStatus +from anyio.lowlevel import checkpoint -from asphalt.core import Event, Signal, stream_events, wait_event +from asphalt.core import Event, Signal, SignalQueueFull, stream_events, wait_event pytestmark = pytest.mark.anyio() @@ -61,14 +63,41 @@ async def test_dispatch_event_type_mismatch(self, source): f"{__name__}.DummyEvent" ) with pytest.raises(TypeError, match=pattern): - await source.event_a.dispatch("foo") + source.event_a.dispatch("foo") async def test_dispatch_event_no_listeners(self, source): """ Test that dispatching an event when there are no listeners will still work. """ - await source.event_a.dispatch(DummyEvent()) + source.event_a.dispatch(DummyEvent()) + + async def test_dispatch_event_buffer_overflow(self, source): + """ + Test that dispatching to a subscriber that has a full queue raises the + SignalQueueFull warning. + + """ + received_events = [] + + async def receive_events(task_status: TaskStatus[None]) -> None: + async with source.event_a.stream_events(max_queue_size=1) as stream: + task_status.started() + async for event in stream: + received_events.append(event) + + async with create_task_group() as tg: + await tg.start(receive_events) + source.event_a.dispatch(DummyEvent(1)) + with pytest.warns(SignalQueueFull): + source.event_a.dispatch(DummyEvent(2)) + source.event_a.dispatch(DummyEvent(3)) + + # Give the task a chance to run, then cancel + await checkpoint() + tg.cancel_scope.cancel() + + assert len(received_events) == 1 @pytest.mark.parametrize( "filter, expected_value", @@ -80,11 +109,12 @@ async def test_dispatch_event_no_listeners(self, source): async def test_wait_event(self, source, filter, expected_value): async def dispatch_events() -> None: for i in range(1, 4): - await source.event_a.dispatch(DummyEvent(i)) + source.event_a.dispatch(DummyEvent(i)) async with create_task_group() as tg: tg.start_soon(dispatch_events) - event = await wait_event([source.event_a], filter) + with fail_after(1): + event = await wait_event([source.event_a], filter) assert event.args == (expected_value,) @@ -99,7 +129,7 @@ async def test_stream_events(self, source, filter, expected_values): values = [] async with source.event_a.stream_events(filter) as stream: for i in range(1, 4): - await source.event_a.dispatch(DummyEvent(i)) + source.event_a.dispatch(DummyEvent(i)) async for event in stream: values.append(event.args[0]) @@ -144,11 +174,12 @@ async def test_wait_event(source, filter, expected_value): async def dispatch_events() -> None: for i in range(1, 4): - await source.event_a.dispatch(DummyEvent(i)) + source.event_a.dispatch(DummyEvent(i)) async with create_task_group() as tg: tg.start_soon(dispatch_events) - event = await wait_event([source.event_a], filter) + with fail_after(1): + event = await wait_event([source.event_a], filter) assert event.args == (expected_value,) @@ -166,9 +197,9 @@ async def test_stream_events(filter, expected_values): async with stream_events([source1.event_a, source2.event_b], filter) as stream: for signal in [source1.event_a, source2.event_b]: for i in range(1, 4): - await signal.dispatch(DummyEvent(i)) + signal.dispatch(DummyEvent(i)) - await signal.dispatch(DummyEvent(None)) + signal.dispatch(DummyEvent(None)) async for event in stream: if event.args[0] is None: