File tree Expand file tree Collapse file tree 1 file changed +20
-3
lines changed Expand file tree Collapse file tree 1 file changed +20
-3
lines changed Original file line number Diff line number Diff line change @@ -355,9 +355,26 @@ async def get(self, timeout=None):
355355 if self ._tainted :
356356 raise AsyncQueue .MixedSyncAsyncAPIError ()
357357
358- if timeout is None or timeout > 0 :
359- # This may raise asyncio.TimeoutError
360- await asyncio .wait_for (self ._event .wait (), timeout = timeout )
358+ # Blocking path: timeout is None (wait indefinitely)
359+ if timeout is None :
360+ # Wait indefinitely until the queue is non-empty.
361+ # It is necessary to check if the queue is empty after waking.
362+ # Because multiple waiting coroutines may be awakened simultaneously when a new item entries empty queue.
363+ # These coroutines will all pop this item from queue, and then raise IndexError.
364+ while not self ._q :
365+ await self ._event .wait ()
366+ # Blocking path: timeout > 0 (timed wait, retry with remaining time).
367+ elif timeout > 0 :
368+ # Compute the deadline; if the queue is still empty after waking, continue waiting for the remaining time.
369+ loop = asyncio .get_running_loop ()
370+ deadline = loop .time () + timeout
371+ while not self ._q :
372+ remaining = deadline - loop .time ()
373+ if remaining <= 0 :
374+ raise asyncio .TimeoutError ()
375+ # This may raise asyncio.TimeoutError.
376+ await asyncio .wait_for (self ._event .wait (), timeout = remaining )
377+ # Non-blocking path: timeout <= 0.
361378 elif not self ._q :
362379 raise asyncio .QueueEmpty ()
363380
You can’t perform that action at this time.
0 commit comments