Skip to content

Commit

Permalink
worker event loop running state check
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 25, 2023
1 parent bf327de commit 752642c
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def acquire_tube(
self, handler: Optional[type[Handler]] = None
) -> Tube:
all_pipes = False
while True:
while get_event_loop().is_running():
for pipe in await self.queue.storage.pipes():
if not await self.queue.storage.length(pipe):
continue
Expand Down Expand Up @@ -178,7 +178,9 @@ async def loop(
) -> None:
get_event_loop().add_signal_handler(SIGTERM, self.housekeep)
processed = 0
while True and limit is None or limit > processed:
while get_event_loop().is_running() and (
limit is None or limit > processed
):
tube = await self.acquire_tube(handler)
self.pipe = tube.pipe
processed = processed + await self.process(tube, limit)
Expand Down Expand Up @@ -209,7 +211,9 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
if isinstance(instance, DeferredHandler | RecurrentHandler):
instance.queue = self.queue

while limit is None or limit > processed_counter:
while get_event_loop().is_running() and (
limit is None or limit > processed_counter
):
if tube.handler is RecurrentHandler:
page_size = settings.recurrent_tasks_limit
else:
Expand Down

0 comments on commit 752642c

Please sign in to comment.