Task concurrency / limiter question #132
-
What would be the best approach to do the following. We have a task in taskiq that is used to fetch data from a data source. Now per data source we want to limit the simultaneous tasks to be for example "2". No more than 2 tasks should be fetching from the data source at the same time. Workers are spread across different servers. In our previous setup we had a decorator function that would randomly delay the task if a Redis "increment" "decrement" value was at the max concurrent amount. I suppose the best way would be to implement middleware with the "pre_execute" function and the "post_execute" / "on_error" functions? I was wondering if there is a better approach that might be more integrated with the broker? Waiting should not be blocking since we don't want to hold-up the different worker processes. The unique key to know the "per data-source" value could be a label that we set when kiqing the task. We are using Redis and RabbitMQ as result and broker. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
Hello and thanks for asking this question. You can actually create a dependency, that waits for redis-based semaphore for release. As an example: from taskiq import Context, TaskiqDepends
class ConcurencyLimiter:
def __init__(self, counter_name: Optional[str] = None, limit: int = 2):
self.limit = limit
self.counter_name = counter_name
async def __call__(self, redis_pool: ConnectionPool = Depends(get_redis_pool), context: Context = TaskiqDepends()) -> AsyncGenerator[None, None]:
counter_name = self.counter_name or f"{context.message.task_name}.RATELIMIT"
cur_value = self.limit + 1
while cur_val >= self.limit:
# here you just get current value from redis
# and check that it's available.
...
# You increase your counter before task execution.
redis.incr(counter_name)
yield
# You decrease it after the function is complete.
redis.decr(counter_name) In your tasks, you can easily depend on this limiter. async def my_limited_task(limiter: None = TaskiqDepends(ConcurencyLimiter(4))):
# do things. I can make a full working example this evening, if you want. try:
yield
finally:
redis.decr(counter_name) |
Beta Was this translation helpful? Give feedback.
Hi! Sory, for such a late reply. We are proud to announce that we have implemented all primitives to solve your specific case.
Here is an example project with implemented limiter. You can adopt it and start using.
mweq.zip
It uses new
requeue
method of a context. Also, I disabled logger of taskiq-dependencies, because throwing an exception in dependencies triggers resolver to catch them and print.logging.getLogger("taskiq.dependencies.ctx").setLevel(logging.ERROR)