diff --git a/sharded_queue/__init__.py b/sharded_queue/__init__.py index 39c20a4..a87227e 100644 --- a/sharded_queue/__init__.py +++ b/sharded_queue/__init__.py @@ -233,9 +233,8 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int: ]) if tube.handler is RecurrentHandler: - await self.lock.ttl( - key=tube.pipe, - ttl=settings.recurrent_check_interval + await self.prolongate_lock( + settings.recurrent_check_interval ) return len(msgs) @@ -253,18 +252,20 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int: await sleep(settings.worker_empty_pause) if tube.handler is DeferredHandler: - await self.lock.ttl( - key=tube.pipe, - ttl=settings.deferred_retry_delay, - ) + await self.prolongate_lock(settings.deferred_retry_delay) return processed_counter else: - await self.lock.ttl(tube.pipe, settings.lock_timeout) + await self.prolongate_lock() - await self.lock.release(tube.pipe) + await self.lock.release(self.pipe) return processed_counter + async def prolongate_lock(self, ttl: Optional[int | timedelta] = None): + if ttl is None: + ttl = settings.lock_timeout + await self.lock.ttl(self.pipe, ttl) + class DeferredRequest(NamedTuple): timestamp: float