diff --git a/sharded_queue/__init__.py b/sharded_queue/__init__.py index e90ccf2..804d85c 100644 --- a/sharded_queue/__init__.py +++ b/sharded_queue/__init__.py @@ -310,7 +310,7 @@ async def handle(self, *requests: DeferredRequest) -> None: ] for pipe in set([pipe for (pipe, _) in todo]): - ready = [ + ready: list[str] = [ msg for msg in [ self.worker.queue.serializer.serialize(request) @@ -320,6 +320,7 @@ async def handle(self, *requests: DeferredRequest) -> None: if not await self.worker.queue.storage.contains(pipe, msg) ] + ready = list(set(ready)) if len(ready): await self.worker.queue.storage.append(pipe, *ready) diff --git a/tests/test_deferred.py b/tests/test_deferred.py index 5fd4ec3..1a9d778 100644 --- a/tests/test_deferred.py +++ b/tests/test_deferred.py @@ -36,13 +36,22 @@ async def test_deferred() -> None: assert await queue.storage.length(drop_pipe) == 0 assert await queue.storage.length(deferred_pipe) == 1 - await worker.loop(1) + await queue.register( + DropBucket, + BucketRequest(1), + defer=timedelta(milliseconds=5), + ) + assert await queue.storage.length(drop_pipe) == 0 - assert await queue.storage.length(deferred_pipe) == 1 + assert await queue.storage.length(deferred_pipe) == 2 + + await worker.loop(2) + assert await queue.storage.length(drop_pipe) == 0 + assert await queue.storage.length(deferred_pipe) == 2 await sleep(0.01) - await worker.loop(1) + await worker.loop(2) assert await queue.storage.length(drop_pipe) == 1 assert await queue.storage.length(deferred_pipe) == 0