diff --git a/a_sync/primitives/locks/counter.pyx b/a_sync/primitives/locks/counter.pyx index 9032c46f..959d9717 100644 --- a/a_sync/primitives/locks/counter.pyx +++ b/a_sync/primitives/locks/counter.pyx @@ -4,8 +4,8 @@ This module provides two specialized async flow management classes, :class:`Coun These primitives manage synchronization of tasks that must wait for an internal counter to reach a specific value. """ -import asyncio -import heapq +from asyncio import gather, sleep +from heapq import heappop, heappush from libc.string cimport strcpy from libc.stdlib cimport malloc, free from libc.time cimport time @@ -82,7 +82,8 @@ cdef class CounterLock(_DebugDaemonMixin): >>> repr(counter) '' """ - cdef dict[long long, Py_ssize_t] waiters = {v: len(self._events[v]._waiters) for v in sorted(self._events)} + cdef dict[long long, Event] events = self._events + cdef dict[long long, Py_ssize_t] waiters = {v: len((events[v])._waiters) for v in self._heap} return "".format(self.__name.decode("utf-8"), self._value, waiters) cpdef bint is_ready(self, long long v): @@ -113,7 +114,7 @@ cdef class CounterLock(_DebugDaemonMixin): if event is None: event = Event() self._events[value] = event - heapq.heappush(self._heap, value) + heappush(self._heap, value) self._c_ensure_debug_daemon((),{}) await (event).c_wait() return True @@ -181,11 +182,11 @@ cdef class CounterLock(_DebugDaemonMixin): if value > self._value: self._value = value while self._heap: - key = heapq.heappop(self._heap) + key = heappop(self._heap) if key <= self._value: (self._events.pop(key)).c_set() else: - heapq.heappush(self._heap, key) + heappush(self._heap, key) return elif value < self._value: raise ValueError("You cannot decrease the value.") @@ -207,7 +208,7 @@ cdef class CounterLock(_DebugDaemonMixin): self.get_logger().debug( "%s is still locked after %sm", self, round(now - start / 60, 2) ) - await asyncio.sleep(300) + await sleep(300) class CounterLockCluster: @@ -249,7 +250,7 @@ class CounterLockCluster: >>> cluster = CounterLockCluster([lock1, lock2]) >>> await cluster.wait_for(5) # This will block until all locks have value >= 5 """ - await asyncio.gather( + await gather( *[counter_lock.wait_for(value) for counter_lock in self.locks] ) return True diff --git a/a_sync/primitives/locks/event.pyx b/a_sync/primitives/locks/event.pyx index 0a216832..cebb1c6e 100644 --- a/a_sync/primitives/locks/event.pyx +++ b/a_sync/primitives/locks/event.pyx @@ -85,14 +85,8 @@ cdef class CythonEvent(_DebugDaemonMixin): cdef str status = "set" if self._value else "unset" - try: - waiters = self._waiters - except AttributeError: - # TODO: debug how this error is possible since _waiters is set in __init__ - pass - else: - if len_waiters := len(waiters): - status += ", waiters:{}".format(len_waiters) + if len_waiters := len(self._waiters): + status += ", waiters:{}".format(len_waiters) return "<{}.{} {} at {} [{}]>".format( self.__class__.__module__, diff --git a/tests/primitives/test_counter.py b/tests/primitives/test_counter.py index 79979205..e899fc59 100644 --- a/tests/primitives/test_counter.py +++ b/tests/primitives/test_counter.py @@ -6,8 +6,16 @@ @pytest.mark.asyncio_cooperative async def test_counter_lock(): counter = CounterLock(name="test") - assert await counter.wait_for(0) assert counter._name == "test" + assert repr(counter) == "" + coro = counter.wait_for(1) + task = asyncio.create_task(coro) + await asyncio.sleep(0) + assert repr(counter) == "" + counter.set(1) + await asyncio.sleep(0) + assert task.done() and task.result() is True + assert repr(counter) == "" @pytest.mark.asyncio_cooperative