Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PrioritySemaphore name-mangling issue #447

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions a_sync/primitives/locks/counter.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,6 @@ cdef class CounterLock(_DebugDaemonMixin):
)
await asyncio.sleep(300)

def __dealloc__(self):
# Free the memory allocated for __name
if self.__name is not NULL:
free(self.__name)


class CounterLockCluster:
"""
Expand Down
1 change: 0 additions & 1 deletion a_sync/primitives/locks/prio_semaphore.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
cdef list _potential_lost_waiters
cdef object _top_priority
cdef object _context_manager_class
cdef list[_AbstractPrioritySemaphoreContextManager] __waiters
cdef object c_getitem(self, object priority)
cdef dict[object, int] _count_waiters(self)

Expand Down
63 changes: 36 additions & 27 deletions a_sync/primitives/locks/prio_semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
self._context_managers = {}
"""A dictionary mapping priorities to their context managers."""

self.__waiters = []
self._Semaphore__waiters = []
"""A heap queue of context managers, sorted by priority."""

# NOTE: This should (hopefully) be temporary
Expand Down Expand Up @@ -144,7 +144,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
context_manager = self._context_manager_class(
self, priority, name=self.name
)
heappush(self.__waiters, context_manager) # type: ignore [misc]
heappush(self._Semaphore__waiters, context_manager) # type: ignore [misc]
self._context_managers[priority] = context_manager
return self._context_managers[priority]

Expand All @@ -163,14 +163,17 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
cdef bint c_locked(self):
cdef _AbstractPrioritySemaphoreContextManager cm
cdef object w
return self._Semaphore__value == 0 or (
any(
cm._waiters and any(not w.cancelled() for w in cm._waiters)
for cm in (self._context_managers.values() or ())
)
if self._Semaphore__value == 0:
return True
cdef dict[object, _AbstractPrioritySemaphoreContextManager] cms = self._context_managers
if not cms:
return False
return any(
cm._Semaphore__waiters and any(not w.cancelled() for w in cm._Semaphore__waiters)
for cm in cms.values()
)

cdef dict[object, int] _count_waiters(self):
cdef dict[object, Py_ssize_t] _count_waiters(self):
"""Counts the number of waiters for each priority.

Returns:
Expand All @@ -181,10 +184,9 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
>>> semaphore._count_waiters()
"""
cdef _AbstractPrioritySemaphoreContextManager manager
return {
manager._priority: len(manager._waiters)
for manager in sorted(self.__waiters, key=lambda m: m._priority)
}
cdef list[_AbstractPrioritySemaphoreContextManager] waiters = self._Semaphore__waiters
return {manager._priority: len(manager._Semaphore__waiters) for manager in sorted(waiters)}


def _wake_up_next(self) -> None:
"""Wakes up the next waiter in line.
Expand All @@ -199,10 +201,16 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore._wake_up_next()
"""
self._c_wake_up_next()

cdef void _c_wake_up_next(self):
cdef _AbstractPrioritySemaphoreContextManager manager
cdef Py_ssize_t start_len, end_len
cdef bint woke_up

cdef bint debug_logs = c_logger.isEnabledFor(DEBUG)
while self.__waiters:
manager = heappop(self.__waiters)
while <list>self._Semaphore__waiters:
manager = heappop(<list>self._Semaphore__waiters)
if len(manager) == 0:
# There are no more waiters, get rid of the empty manager
if debug_logs:
Expand All @@ -219,11 +227,11 @@ cdef class _AbstractPrioritySemaphore(Semaphore):

if debug_logs:
c_logger._log(DEBUG, "waking up next for %s", (manager._c_repr_no_parent_(), ))
if not manager._waiters:
c_logger._log(DEBUG, "not manager._waiters")
if not manager._Semaphore__waiters:
c_logger._log(DEBUG, "not manager._Semaphore__waiters")

while manager._waiters:
waiter = manager._waiters.popleft()
while manager._Semaphore__waiters:
waiter = manager._Semaphore__waiters.popleft()
self._potential_lost_waiters.remove(waiter)
if not waiter.done():
waiter.set_result(None)
Expand All @@ -242,7 +250,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):

if end_len:
# There are still waiters, put the manager back
heappush(self.__waiters, manager) # type: ignore [misc]
heappush(<list>self._Semaphore__waiters, manager) # type: ignore [misc]
else:
# There are no more waiters, get rid of the empty manager
self._context_managers.pop(manager._priority)
Expand All @@ -255,6 +263,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
if not waiter.done():
waiter.set_result(None)
return
return

while self._potential_lost_waiters:
waiter = self._potential_lost_waiters.pop(0)
Expand Down Expand Up @@ -293,14 +302,14 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent_semaphore, priority=1)
"""

super().__init__(0, name=name)

self._parent = parent
"""The parent semaphore."""

self._priority = priority
"""The priority associated with this context manager."""

super().__init__(0, name=name)

def __repr__(self) -> str:
"""Returns a string representation of the context manager."""
return f"<{self.__class__.__name__} parent={self._parent} {self._priority_name}={self._priority} waiters={len(self)}>"
Expand Down Expand Up @@ -361,10 +370,10 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
async def __acquire(self) -> Literal[True]:
cdef object loop, fut
while self._parent._Semaphore__value <= 0:
if self._AbstractPrioritySemaphoreContextManager__waiters is None:
self._AbstractPrioritySemaphoreContextManager__waiters = deque()
if self._Semaphore__waiters is None:
self._Semaphore__waiters = deque()
fut = self._c_get_loop().create_future()
self._AbstractPrioritySemaphoreContextManager__waiters.append(fut)
self._Semaphore__waiters.append(fut)
self._parent._potential_lost_waiters.append(fut)
try:
await fut
Expand Down Expand Up @@ -406,7 +415,7 @@ cdef class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextMan
def __cinit__(self):
self._priority_name = "priority"
# Semaphore.__cinit__(self)
self._AbstractPrioritySemaphoreContextManager__waiters = deque()
self._Semaphore__waiters = deque()
self._decorated: Set[str] = set()

def __lt__(self, _PrioritySemaphoreContextManager other) -> bool:
Expand Down Expand Up @@ -460,7 +469,7 @@ cdef class PrioritySemaphore(_AbstractPrioritySemaphore): # type: ignore [type-
self._context_managers = {}
"""A dictionary mapping priorities to their context managers."""

self.__waiters = []
self._Semaphore__waiters = []
"""A heap queue of context managers, sorted by priority."""

# NOTE: This should (hopefully) be temporary
Expand Down Expand Up @@ -503,7 +512,7 @@ cdef class PrioritySemaphore(_AbstractPrioritySemaphore): # type: ignore [type-
if <int>priority not in context_managers:
context_manager = _PrioritySemaphoreContextManager(self, <int>priority, name=self.name)
heappush(
<list[_PrioritySemaphoreContextManager]>self.__waiters,
<list[_PrioritySemaphoreContextManager]>self._Semaphore__waiters,
context_manager,
) # type: ignore [misc]
context_managers[<int>priority] = context_manager
Expand Down
5 changes: 3 additions & 2 deletions a_sync/primitives/locks/semaphore.pxd
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from a_sync.primitives._debug cimport _DebugDaemonMixin

cdef class Semaphore(_DebugDaemonMixin):
cdef str _name
cdef unsigned long long __value
cdef object _waiters
cdef object __waiters
cdef char* _name
cdef str decode_name(self)
cdef set _decorated
cdef dict __dict__
cpdef bint locked(self)
Expand Down
Loading
Loading