Skip to content

Commit

Permalink
deffered handler unique requests fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Nov 21, 2023
1 parent 151db97 commit 4694ee0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
3 changes: 2 additions & 1 deletion sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def handle(self, *requests: DeferredRequest) -> None:
]

for pipe in set([pipe for (pipe, _) in todo]):
ready = [
ready: list[str] = [
msg for msg in
[
self.worker.queue.serializer.serialize(request)
Expand All @@ -320,6 +320,7 @@ async def handle(self, *requests: DeferredRequest) -> None:
if not await self.worker.queue.storage.contains(pipe, msg)
]

ready = list(set(ready))
if len(ready):
await self.worker.queue.storage.append(pipe, *ready)

Expand Down
15 changes: 12 additions & 3 deletions tests/test_deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ async def test_deferred() -> None:
assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deferred_pipe) == 1

await worker.loop(1)
await queue.register(
DropBucket,
BucketRequest(1),
defer=timedelta(milliseconds=5),
)

assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deferred_pipe) == 1
assert await queue.storage.length(deferred_pipe) == 2

await worker.loop(2)
assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deferred_pipe) == 2

await sleep(0.01)

await worker.loop(1)
await worker.loop(2)
assert await queue.storage.length(drop_pipe) == 1
assert await queue.storage.length(deferred_pipe) == 0

Expand Down

0 comments on commit 4694ee0

Please sign in to comment.