Skip to content

Commit

Permalink
recurrent handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 18, 2023
1 parent caec11b commit 7c43827
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 33 deletions.
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,18 @@ There is an optional if_not_exists flag. If it is set, request will be registere
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
```
## Deferred tasks
You can use built-in deferred task handler to defer call
## Deferred handlers
You can use built-in deferred request handler to defer call
```py
await queue.register(Housekeep, Room(402), defer=5) # numeric means seconds
await queue.register(Housekeep, Room(324), defer=timedelta(minutes=15))
```

## Recurrent handlers
You can use built-in recurrent request handler for regular request registration
```py
await queue.register(BalanceCheck, Company('basis'), recurrent=timedelta(hours=1))
```
## Performance
Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with `pytest -s`, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about `200_000` rps for batch request registration with sharding and about `600_000` requests for request handling in concurrent mode.

Expand All @@ -179,6 +185,10 @@ Deferred tasks retry delay
Lock key prefix
- `QUEUE_LOCK_TIMEOUT = 24 * 60 * 60`\
Lock key ttl
- `QUEUE_RECURRENT_CHECK_INTERVAL = 30`\
Recurrent interval check in seconds
- `QUEUE_RECURRENT_TASKS_LIMIT = 1024`\
Recurrent tasks limit count
- `QUEUE_TUBE_PREFIX = 'tube_'`\
Default queue prefix
- `QUEUE_WORKER_ACQUIRE_DELAY = 1`\
Expand Down
142 changes: 123 additions & 19 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,26 @@ def __init__(
async def register(
self, handler: type[Handler], *requests: T,
defer: Optional[float | int | timedelta] = None,
if_not_exists: bool = False
if_not_exists: bool = False,
recurrent: Optional[float | int | timedelta] = None,
) -> None:
routes = await handler.route(*requests)
pipe_messages: list[tuple[str, Any]] = [
(Tube(handler, routes[n]).pipe, requests[n])
for n in range(len(routes))
]

if recurrent:
if_not_exists = True
pipe_messages = RecurrentHandler.transform(
pipe_messages, recurrent, self.serializer
)

if defer:
timestamp = DeferredRequest.calculate_timestamp(defer)
pipe_messages = [
(
Tube(DeferredHandler, Route()).pipe,
DeferredRequest(timestamp, pipe, values),
)
for (pipe, values)
in [
(pipe, self.serializer.get_values(request))
for (pipe, request) in pipe_messages
]
]
if_not_exists = True
pipe_messages = DeferredHandler.transform(
pipe_messages, defer, self.serializer
)

for pipe in set([pipe for (pipe, _) in pipe_messages]):
await self.storage.append(pipe, *[
Expand All @@ -128,13 +127,17 @@ class Worker:
lock: Lock
queue: Queue

async def acquire_tube(self) -> Tube:
async def acquire_tube(
self, handler: Optional[type[Handler]] = None
) -> Tube:
all_pipes = False
while True:
for pipe in await self.queue.storage.pipes():
if not await self.queue.storage.length(pipe):
continue
tube: Tube = Tube.parse_pipe(pipe)
if handler and tube.handler is not handler:
continue
if tube.handler.priorities:
if tube.route.priority != tube.handler.priorities[0]:
if not all_pipes:
Expand All @@ -161,10 +164,14 @@ def page_size(self, limit: Optional[int] = None) -> int:

return min(limit, settings.worker_batch_size)

async def loop(self, limit: Optional[int] = None) -> None:
async def loop(
self,
limit: Optional[int] = None,
handler: Optional[type[Handler]] = None,
) -> None:
processed = 0
while True and limit is None or limit > processed:
tube = await self.acquire_tube()
tube = await self.acquire_tube(handler)
processed = processed + await self.process(tube, limit)

async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
Expand All @@ -183,11 +190,14 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
]

async with tube.context() as instance:
if isinstance(instance, DeferredHandler):
if isinstance(instance, DeferredHandler | RecurrentHandler):
instance.queue = self.queue

while limit is None or limit > processed_counter:
page_size = self.page_size(limit)
if tube.handler is RecurrentHandler:
page_size = settings.recurrent_tasks_limit
else:
page_size = self.page_size(limit)
processed = False
for pipe in pipes:
msgs = await storage.range(pipe, page_size)
Expand All @@ -198,6 +208,13 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
deserialize(cls, msg) for msg in msgs
])

if tube.handler is RecurrentHandler:
await self.lock.ttl(
key=tube.pipe,
ttl=settings.recurrent_check_interval
)
return len(msgs)

await storage.pop(pipe, len(msgs))

processed = True
Expand All @@ -211,7 +228,10 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
break
await sleep(settings.worker_empty_pause)

await self.lock.release(tube.pipe)
if tube.handler is DeferredHandler:
await self.lock.ttl(tube.pipe, settings.deferred_retry_delay)
else:
await self.lock.release(tube.pipe)

return processed_counter

Expand Down Expand Up @@ -266,3 +286,87 @@ async def handle(self, *requests: DeferredRequest) -> None:

if len(pending) and not len(todo):
await sleep(settings.deferred_retry_delay)

@classmethod
def transform(
cls,
pipe_messages: list[tuple[str, T]],
defer: float | int | timedelta,
serializer: Serializer,
) -> list[tuple[str, DeferredRequest]]:
timestamp = DeferredRequest.calculate_timestamp(defer)
return [
(
Tube(DeferredHandler, Route()).pipe,
DeferredRequest(timestamp, pipe, values),
)
for (pipe, values)
in [
(pipe, serializer.get_values(request))
for (pipe, request) in pipe_messages
]
]


class RecurrentRequest(NamedTuple):
interval: float
pipe: str
msg: list

@classmethod
def get_interval(cls, interval: int | float | timedelta) -> float:
if isinstance(interval, timedelta):
return float(interval.seconds)

return float(interval)


class RecurrentHandler(Handler):
queue: Queue

async def handle(self, *requests: RecurrentRequest) -> None:
deferred_pipe: str = Tube(DeferredHandler, Route()).pipe
deferred_requests: list[tuple[str, str]] = [
(request.pipe, request.msg)
for request in [
self.queue.serializer.deserialize(DeferredRequest, msg)
for msg in await self.queue.storage.range(
deferred_pipe, settings.recurrent_tasks_limit
)
]
]

todo: list[DeferredRequest] = [
DeferredRequest(
DeferredRequest.calculate_timestamp(request.interval),
request.pipe,
request.msg,
)
for request in requests
if (request.pipe, request.msg) not in deferred_requests
]

if len(todo):
await self.queue.register(
DeferredHandler, *todo, if_not_exists=True
)

@classmethod
def transform(
cls,
pipe_messages: list[tuple[str, T]],
recurrent: float | int | timedelta,
serializer: Serializer,
) -> list[tuple[str, RecurrentRequest]]:
interval: float = RecurrentRequest.get_interval(recurrent)
return [
(
Tube(RecurrentHandler, Route()).pipe,
RecurrentRequest(interval, pipe, values)
)
for (pipe, values)
in [
(pipe, serializer.get_values(request))
for (pipe, request) in pipe_messages
]
]
42 changes: 33 additions & 9 deletions sharded_queue/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@ class RuntimeLock(Lock):
def __init__(self) -> None:
self.storage: dict[str, bool] = {}

async def acquire(self, pipe: str) -> bool:
if pipe in self.storage:
async def acquire(self, key: str) -> bool:
if key in self.storage:
return False
self.storage[pipe] = True
self.storage[key] = True
return True

async def release(self, pipe: str) -> None:
del self.storage[pipe]
async def exists(self, key: str) -> bool:
return key in self.storage

async def release(self, key: str) -> None:
del self.storage[key]

async def ttl(self, key: str, ttl: int) -> bool:
if ttl == 0:
await self.release(key)
return True
return await self.exists(key)


class RuntimeStorage(Storage):
Expand Down Expand Up @@ -73,16 +82,31 @@ class RedisLock(Lock):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def acquire(self, tube: str) -> bool:
async def acquire(self, key: str) -> bool:
return None is not await self.redis.set(
name=settings.lock_prefix + tube,
name=settings.lock_prefix + key,
ex=settings.lock_timeout,
nx=True,
value=1,
)

async def release(self, tube: str) -> None:
await self.redis.delete(settings.lock_prefix + tube)
async def exists(self, key: str) -> bool:
checker = await self.redis.exists(
settings.lock_prefix + key
)
return bool(checker)

async def release(self, key: str) -> None:
await self.redis.delete(settings.lock_prefix + key)

async def ttl(self, key: str, ttl: int) -> bool:
setter = await self.redis.set(
settings.lock_prefix + key,
value=key,
ex=ttl,
xx=True
)
return bool(setter)


class RedisStorage(Storage):
Expand Down
6 changes: 4 additions & 2 deletions sharded_queue/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@


class Lock(Protocol):
async def acquire(self, tube: str) -> bool: ...
async def release(self, tube: str) -> None: ...
async def acquire(self, key: str) -> bool: ...
async def exists(self, key: str) -> bool: ...
async def release(self, key: str) -> None: ...
async def ttl(self, key: str, ttl: int) -> bool: ...


class Serializer(Protocol[T]):
Expand Down
12 changes: 11 additions & 1 deletion sharded_queue/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ShardedQueueSettings(BaseSettings):
title='Default queue thread'
)

deferred_retry_delay: float = Field(
deferred_retry_delay: int = Field(
default=1,
title='Defereed tasks retry delay'
)
Expand All @@ -30,6 +30,16 @@ class ShardedQueueSettings(BaseSettings):

model_config = SettingsConfigDict(env_prefix='queue_')

recurrent_check_interval: int = Field(
default=30,
title='Recurrent interval check in seconds'
)

recurrent_tasks_limit: int = Field(
default=1024,
title='Recurrent tasks limit count'
)

tube_prefix: str = Field(
default="tube_",
title="Queue prefix"
Expand Down
2 changes: 2 additions & 0 deletions tests/test_deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sharded_queue import DeferredHandler, Handler, Queue, Route, Tube, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
from sharded_queue.protocols import Storage
from sharded_queue.settings import settings


class BucketRequest(NamedTuple):
Expand All @@ -30,6 +31,7 @@ async def test_deferred() -> None:

deferred_pipe: str = Tube(DeferredHandler, Route()).pipe
drop_pipe: str = Tube(DropBucket, Route()).pipe
settings.deferred_retry_delay = 0

assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deferred_pipe) == 1
Expand Down
5 changes: 5 additions & 0 deletions tests/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ async def test_redis_storage() -> None:

async def runner(lock: Lock):
assert await lock.acquire('tester')
assert await lock.exists('tester')
assert not await lock.acquire('tester')
assert await lock.exists('tester')
assert await lock.ttl('tester', 1)

await lock.release('tester')
assert not await lock.exists('tester')
assert not await lock.ttl('tester', 1)

assert await lock.acquire('tester')
await lock.release('tester')
Loading

0 comments on commit 7c43827

Please sign in to comment.