Skip to content

Commit

Permalink
chore: refactor SmartProcessingQueue error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Jan 6, 2025
1 parent 274577a commit fa562b2
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,6 @@ def _get(self):
fut, args, kwargs = super()._get()
return args, kwargs, fut()

@log_broken
async def _worker_coro(self) -> NoReturn:
"""
Worker coroutine responsible for processing tasks in the queue.
Expand All @@ -912,32 +911,38 @@ async def _worker_coro(self) -> NoReturn:
args: P.args
kwargs: P.kwargs
fut: SmartFuture[V]
while True:
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
log_debug("processing %s", fut)
result = await func(*args, **kwargs)
fut.set_result(result)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
func.__name__,
fut,
result,
)
except Exception as e:
log_debug("%s: %s", type(e).__name__, e)
try:
while True:
try:
fut.set_exception(e)
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
log_debug("processing %s", fut)
result = await func(*args, **kwargs)
fut.set_result(result)
except InvalidStateError:
logger.error(
"cannot set exception for %s %s: %s",
"cannot set result for %s %s: %s",
func.__name__,
fut,
e,
result,
)
task_done()
except Exception as e:
log_debug("%s: %s", type(e).__name__, e)
try:
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set exception for %s %s: %s",
func.__name__,
fut,
e,
)
task_done()

except Exception as e:
logger.error("%s is broken!!!", self)
logger.exception(e)
raise

0 comments on commit fa562b2

Please sign in to comment.