Replies: 1 comment 1 reply
-
Hi. Thanks for interest in the library. In this particular case, I would go with another approach. Since you demand strong atomacity and releasing lock is a time-based operation. You can make a simple distributed lock entirely on redis. import asyncio
from typing import AsyncGenerator
from redis.asyncio import ConnectionPool, Redis
from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState
from taskiq_redis import ListQueueBroker
broker = ListQueueBroker("redis://localhost/0")
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState):
state.redis_pool = ConnectionPool.from_url("redis://localhost/1")
def get_redis_pool(state: TaskiqState = TaskiqDepends()) -> ConnectionPool:
return state.redis_pool
async def get_redis_connection(
pool: ConnectionPool = TaskiqDepends(get_redis_pool),
) -> AsyncGenerator[Redis, None]:
async with Redis(connection_pool=pool) as conn:
yield conn
class RateLimiter:
ATOMIC_INCR = """
local key = KEYS[1]
local requests = tonumber(redis.call('GET', key) or '-1')
local max_requests = tonumber(ARGV[1])
local expiry = tonumber(ARGV[2])
if (requests == -1) or (requests < max_requests) then
redis.call('INCR', key)
redis.call('EXPIRE', key, expiry)
return true
else
return false
end
"""
def __init__(self, counter_name: str, limit: int = 2, expiry: int = 60):
self.limit = str(limit)
self.counter_name = counter_name
self.expiry = str(expiry)
async def __call__(
self,
redis: Redis = TaskiqDepends(get_redis_connection),
context: Context = TaskiqDepends(),
) -> None:
res = await redis.eval( # type: ignore
self.ATOMIC_INCR,
1, # Num of args
self.counter_name, # Name of the key to use
self.limit, # Limit of requests
self.expiry, # Expiration for the key
)
if res is None:
await asyncio.sleep(5)
await context.requeue()
rtask_limiter = RateLimiter("rated_task", limit=5)
@broker.task
async def rated_task(_: None = TaskiqDepends(rtask_limiter)):
print("I'm a task")
async def main():
await broker.startup()
for _ in range(10):
await rated_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main()) Here's an example of such. As you can see, I have a simple lua script that does the trick. Basically redis guarantee atomacity for lua scripts even in cluster mode. You can make use of that in this your scenario. |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi everyone,
I'm currently trying to create a rate limiter based on requests per minute, like a token bucket algorithm.
I tried building a similar class as suggested in #132, just without the decrease and a scheduled task that resets the limit every minute.
However, I've not been able to get consistent results, as you can see in the logs. The increase from the Redis Asyncio connection does not seem to update the variable properly yet.
Would you maybe have an idea of how to fix this, or a better approach?
Thank you for your time!
Beta Was this translation helpful? Give feedback.
All reactions