From fa562b26da24feffc643502fdaa057875b1ac8fe Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Mon, 6 Jan 2025 05:48:45 +0000 Subject: [PATCH] chore: refactor SmartProcessingQueue error handling --- a_sync/primitives/queue.py | 53 +++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/a_sync/primitives/queue.py b/a_sync/primitives/queue.py index 0474e041..a4f4eeed 100644 --- a/a_sync/primitives/queue.py +++ b/a_sync/primitives/queue.py @@ -891,7 +891,6 @@ def _get(self): fut, args, kwargs = super()._get() return args, kwargs, fut() - @log_broken async def _worker_coro(self) -> NoReturn: """ Worker coroutine responsible for processing tasks in the queue. @@ -912,32 +911,38 @@ async def _worker_coro(self) -> NoReturn: args: P.args kwargs: P.kwargs fut: SmartFuture[V] - while True: - try: - args, kwargs, fut = await get_next_job() - if fut is None: - # the weakref was already cleaned up, we don't need to process this item - task_done() - continue - log_debug("processing %s", fut) - result = await func(*args, **kwargs) - fut.set_result(result) - except InvalidStateError: - logger.error( - "cannot set result for %s %s: %s", - func.__name__, - fut, - result, - ) - except Exception as e: - log_debug("%s: %s", type(e).__name__, e) + try: + while True: try: - fut.set_exception(e) + args, kwargs, fut = await get_next_job() + if fut is None: + # the weakref was already cleaned up, we don't need to process this item + task_done() + continue + log_debug("processing %s", fut) + result = await func(*args, **kwargs) + fut.set_result(result) except InvalidStateError: logger.error( - "cannot set exception for %s %s: %s", + "cannot set result for %s %s: %s", func.__name__, fut, - e, + result, ) - task_done() + except Exception as e: + log_debug("%s: %s", type(e).__name__, e) + try: + fut.set_exception(e) + except InvalidStateError: + logger.error( + "cannot set exception for %s %s: %s", + func.__name__, + fut, + e, + ) + task_done() + + except Exception as e: + logger.error("%s is broken!!!", self) + logger.exception(e) + raise