Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extract @log_broken decorator #499

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 55 additions & 50 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@ def get_multi_nowait(self, i: int, can_return_less: bool = False) -> List[T]:
return items


def log_broken(func: Callable[[Any], NoReturn]) -> Callable[[Any], NoReturn]:
@wraps(func)
async def __worker_exc_wrap(self):
try:
return await func(self)
except Exception as e:
logger.error("%s for %s is broken!!!", type(self).__name__, func)
logger.exception(e)
raise

return __worker_exc_wrap


class ProcessingQueue(_Queue[Tuple[P, "asyncio.Future[V]"]], Generic[P, V]):
"""
A queue designed for processing tasks asynchronously with multiple workers.
Expand Down Expand Up @@ -480,6 +493,7 @@ def _workers(self) -> "asyncio.Task[NoReturn]":
task._workers = workers
return task

@log_broken
async def __worker_coro(self) -> NoReturn:
"""
The coroutine executed by worker tasks to process the queue.
Expand All @@ -502,37 +516,32 @@ async def __worker_coro(self) -> NoReturn:
else:
fut: asyncio.Future[V]
while True:
args, kwargs, fut = await get_next_job()
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
result = await func(*args, **kwargs)
fut.set_result(result)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
func.__name__,
fut,
result,
)
except Exception as e:
try:
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
result = await func(*args, **kwargs)
fut.set_result(result)
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
"cannot set exception for %s %s: %s",
func.__name__,
fut,
result,
e,
)
except Exception as e:
try:
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set exception for %s %s: %s",
func.__name__,
fut,
e,
)
task_done()
except Exception as e:
logger.error("%s for %s is broken!!!", type(self).__name__, func)
logger.exception(e)
raise
task_done()


def _validate_args(i: int, can_return_less: bool) -> None:
Expand Down Expand Up @@ -889,6 +898,7 @@ def _get(self):
fut, args, kwargs = super()._get()
return args, kwargs, fut()

@log_broken
async def __worker_coro(self) -> NoReturn:
"""
Worker coroutine responsible for processing tasks in the queue.
Expand All @@ -911,35 +921,30 @@ async def __worker_coro(self) -> NoReturn:
fut: SmartFuture[V]
while True:
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
log_debug("processing %s", fut)
result = await func(*args, **kwargs)
fut.set_result(result)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
func.__name__,
fut,
result,
)
except Exception as e:
log_debug("%s: %s", type(e).__name__, e)
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
log_debug("processing %s", fut)
result = await func(*args, **kwargs)
fut.set_result(result)
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
"cannot set exception for %s %s: %s",
func.__name__,
fut,
result,
e,
)
except Exception as e:
log_debug("%s: %s", type(e).__name__, e)
try:
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set exception for %s %s: %s",
func.__name__,
fut,
e,
)
task_done()
except Exception as e:
logger.error("%s for %s is broken!!!", type(self).__name__, func)
logger.exception(e)
raise
task_done()
Loading