Skip to content

Commit

Permalink
worker loop optional tube acquire
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 25, 2023
1 parent 752642c commit 856112b
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Worker:

async def acquire_tube(
self, handler: Optional[type[Handler]] = None
) -> Tube:
) -> Optional[Tube]:
all_pipes = False
while get_event_loop().is_running():
for pipe in await self.queue.storage.pipes():
Expand Down Expand Up @@ -165,6 +165,8 @@ async def acquire_tube(
else:
all_pipes = True

return None

def page_size(self, limit: Optional[int] = None) -> int:
if limit is None:
return settings.worker_batch_size
Expand All @@ -182,6 +184,8 @@ async def loop(
limit is None or limit > processed
):
tube = await self.acquire_tube(handler)
if not tube:
break
self.pipe = tube.pipe
processed = processed + await self.process(tube, limit)
self.pipe = None
Expand Down

0 comments on commit 856112b

Please sign in to comment.