diff --git a/sharded_queue/__init__.py b/sharded_queue/__init__.py index 63c6b08..39c20a4 100644 --- a/sharded_queue/__init__.py +++ b/sharded_queue/__init__.py @@ -136,7 +136,7 @@ class Worker: async def acquire_tube( self, handler: Optional[type[Handler]] = None - ) -> Tube: + ) -> Optional[Tube]: all_pipes = False while get_event_loop().is_running(): for pipe in await self.queue.storage.pipes(): @@ -165,6 +165,8 @@ async def acquire_tube( else: all_pipes = True + return None + def page_size(self, limit: Optional[int] = None) -> int: if limit is None: return settings.worker_batch_size @@ -182,6 +184,8 @@ async def loop( limit is None or limit > processed ): tube = await self.acquire_tube(handler) + if not tube: + break self.pipe = tube.pipe processed = processed + await self.process(tube, limit) self.pipe = None