Skip to content

Commit

Permalink
feat: re-enable smart processing queue heap sorting (#491)
Browse files Browse the repository at this point in the history
* feat: re-enable smart processing queue heap sorting

but with less cpu usage than before

* Update queue.py
  • Loading branch information
BobTheBuidler authored Dec 16, 2024
1 parent 104bc0e commit 0614531
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@
"""

import asyncio
import functools
import heapq
import logging
import sys
import weakref
from asyncio import InvalidStateError, QueueEmpty, gather
from asyncio.events import _get_running_loop
from functools import cached_property, wraps
from heapq import heappop, heappush, heappushpop
from logging import getLogger

import a_sync.asyncio
from a_sync._smart import SmartFuture, create_future
from a_sync._smart import _Key as _SmartKey
from a_sync._typing import *

logger = logging.getLogger(__name__)
logger = getLogger(__name__)

if sys.version_info < (3, 9):

Expand Down Expand Up @@ -314,7 +315,7 @@ def __init__(
self._no_futs = not return_data
"""Indicates whether tasks will return data via futures."""

@functools.wraps(func)
@wraps(func)
async def _worker_coro() -> NoReturn:
"""Worker coroutine for processing tasks."""
return await self.__worker_coro()
Expand Down Expand Up @@ -373,7 +374,7 @@ def __del__(self) -> None:
context = {
"message": f"{self} was destroyed but has work pending!",
}
if loop := asyncio.events._get_running_loop():
if loop := _get_running_loop():
loop.call_exception_handler(context)

@property
Expand Down Expand Up @@ -441,7 +442,7 @@ def put_nowait(self, *args: P.args, **kwargs: P.kwargs) -> "asyncio.Future[V]":

def _create_future(self) -> "asyncio.Future[V]":
"""Creates a future for the task."""
return asyncio.events._get_running_loop().create_future()
return _get_running_loop().create_future()

def _ensure_workers(self) -> None:
"""Ensures that the worker tasks are running."""
Expand All @@ -459,7 +460,7 @@ def _ensure_workers(self) -> None:
# re-raise with clean traceback
raise exc.with_traceback(exc.__traceback__) from exc.__cause__

@functools.cached_property
@cached_property
def _workers(self) -> "asyncio.Task[NoReturn]":
"""Creates and manages the worker tasks for the queue."""
logger.debug("starting worker task for %s", self)
Expand Down Expand Up @@ -609,7 +610,7 @@ def _init(self, maxsize):
"""
self._queue: List[T] = []

def _put(self, item, heappush=heapq.heappush):
def _put(self, item, heappush=heappush):
"""
Adds an item to the priority queue based on its priority.
Expand All @@ -618,7 +619,7 @@ def _put(self, item, heappush=heapq.heappush):
"""
heappush(self._queue, item)

def _get(self, heappop=heapq.heappop):
def _get(self, heappop=heappop):
"""
Retrieves the highest priority item from the queue.
Expand Down Expand Up @@ -690,7 +691,7 @@ def put_nowait(self, priority: Any, *args: P.args, **kwargs: P.kwargs) -> "async
super().put_nowait(self, (priority, args, kwargs, fut))
return fut

def _get(self, heappop=heapq.heappop):
def _get(self, heappop=heappop):
"""
Retrieves the highest priority task from the queue.
Expand All @@ -713,7 +714,7 @@ class _VariablePriorityQueueMixin(_PriorityQueueMixin[T]):
:class:`~_PriorityQueueMixin`
"""

def _get(self, heappop=heapq.heappop):
def _get(self, heappop=heappop):
"""
Resorts the priority queue to consider any changes in priorities and retrieves the task with the highest updated priority.
Expand All @@ -727,10 +728,11 @@ def _get(self, heappop=heapq.heappop):
>>> task = queue._get()
>>> print(task)
"""
# NOTE: Since waiter priorities can change, this might not return the job with the
# most waiters if `self._queue` is not currently in order, but after calling `heappop`,
# `self._queue` will always be in proper order for next call to `self._get`.
return heappop(self._queue)
# NOTE: Since waiter priorities can change, heappop might not return the job with the
# most waiters if `self._queue` is not currently in order, but we can use `heappushpop`,
# to ensure we get the job with the most waiters.
queue = self._queue
return heappushpop(queue, heappop(queue))

def _get_key(self, *args, **kwargs) -> _SmartKey:
"""
Expand Down

0 comments on commit 0614531

Please sign in to comment.