Skip to content

Commit

Permalink
worker porlongate lock api
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 25, 2023
1 parent 856112b commit e85cb75
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,8 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
])

if tube.handler is RecurrentHandler:
await self.lock.ttl(
key=tube.pipe,
ttl=settings.recurrent_check_interval
await self.prolongate_lock(
settings.recurrent_check_interval
)
return len(msgs)

Expand All @@ -253,18 +252,20 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
await sleep(settings.worker_empty_pause)

if tube.handler is DeferredHandler:
await self.lock.ttl(
key=tube.pipe,
ttl=settings.deferred_retry_delay,
)
await self.prolongate_lock(settings.deferred_retry_delay)
return processed_counter
else:
await self.lock.ttl(tube.pipe, settings.lock_timeout)
await self.prolongate_lock()

await self.lock.release(tube.pipe)
await self.lock.release(self.pipe)

return processed_counter

async def prolongate_lock(self, ttl: Optional[int | timedelta] = None):
if ttl is None:
ttl = settings.lock_timeout
await self.lock.ttl(self.pipe, ttl)


class DeferredRequest(NamedTuple):
timestamp: float
Expand Down

0 comments on commit e85cb75

Please sign in to comment.