Getting all running tasks #235
-
I'd like to get all the tasks that are running. I'm currently using Redis as my broker and result backend. Is that possible to do out-of-box? I'm still pretty new to taskiq, but it looks like this wouldn't be possible because the task is removed from the Redis queue ( Do I need to create a middleware to temporarily store running tasks? Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
Here's a quick update. I'm not sure if this is the best way of doing this, but I wrote a middleware to track the task_ids for Redis: from taskiq_redis.redis_broker import Redis, BaseRedisBroker
class RunningTasksMiddleware(TaskiqMiddleware):
def __init__(self, list_name: str = 'taskiq_running') -> None:
super().__init__()
self.list_name = list_name
async def startup(self) -> Coroutine[Any, Any, None] | None:
broker: BaseRedisBroker = self.broker
if broker.is_worker_process:
# clear on startup
async with Redis(connection_pool=broker.connection_pool) as redis_conn:
await redis_conn.delete(self.list_name)
return super().startup()
async def get_running_task_ids(self) -> list[str]:
"""gets a list of running task IDs
Returns:
list[str]: list of task IDs as strings
"""
broker: BaseRedisBroker = self.broker
async with Redis(connection_pool=broker.connection_pool) as redis_conn:
return await redis_conn.lrange(self.list_name, 0, -1)
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
broker: BaseRedisBroker = self.broker
async with Redis(connection_pool=broker.connection_pool) as redis_conn:
await redis_conn.lpush(self.list_name, message.task_id)
return super().pre_send(message)
async def post_save(self, message: TaskiqMessage, result: TaskiqResult[Any]) -> Coroutine[Any, Any, None] | None:
broker: BaseRedisBroker = self.broker
async with Redis(connection_pool=broker.connection_pool) as redis_conn:
await redis_conn.lrem(self.list_name, 0, message.task_id) Although I might end up serializing the TaskiqMessages and sticking those in Redis instead of just their task_ids, but that might be a mistake. 🤔 |
Beta Was this translation helpful? Give feedback.
-
Is there way to get all the failed taks ? @s3rius |
Beta Was this translation helpful? Give feedback.
Actually, not bad implementation. But there's a small flaw in the design. You perform N operations every time you delete a task. It's fine for not highload systems, but may result in poor performance.
I'd suggest you to use
scan
for keys and batch operations, like mget.