Way to wait until all tasks are handled #350
Unanswered
rafalkrupinski
asked this question in
Q&A
Replies: 1 comment
-
class FinishEventMiddleware(TaskiqMiddleware):
def __init__(self):
super().__init__()
self._count = 0
self.done = asyncio.Event()
def pre_execute(self, message: "TaskiqMessage") -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
if self.done.is_set():
raise ValueError('Broker is done')
self._count += 1
return message
def post_execute(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
self._count -= 1
if not self._count:
self.done.set()
`` |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm writing a small script that does a lot IO with an async library, so I run it via taskiq to parallelize it, and it works great!
My main task fetches pages of data in a loop and starts new tasks to process, but it doesn't wait for them.
How to detect all the tasks are done to shutdown the broker and safely end the script? I can't just await the main task, as it will finish before other tasks.
Beta Was this translation helpful? Give feedback.
All reactions