Skip to content

Commit

Permalink
deferred task duplicate skip fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 23, 2023
1 parent 74c6384 commit 25ca5bb
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,18 @@ async def handle(self, *requests: DeferredRequest) -> None:
]

for pipe in set([pipe for (pipe, _) in todo]):
await self.queue.storage.append(pipe, *[
ready = [
msg for msg in
[
self.queue.serializer.serialize(request)
for (candidate_pipe, request) in todo
if candidate_pipe == pipe
]
if not await self.queue.storage.contains(pipe, msg)
])
]

if len(ready):
await self.queue.storage.append(pipe, *ready)

if len(pending) and not len(todo):
await sleep(settings.deferred_retry_delay)
Expand Down

0 comments on commit 25ca5bb

Please sign in to comment.