Skip to content

Commit

Permalink
worker prolongate
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 25, 2023
1 parent e85cb75 commit 8b1ac29
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
7 changes: 5 additions & 2 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,14 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
else:
await self.prolongate_lock()

await self.lock.release(self.pipe)
if self.pipe:
await self.lock.release(self.pipe)

return processed_counter

async def prolongate_lock(self, ttl: Optional[int | timedelta] = None):
async def prolongate_lock(self, ttl: Optional[int] = None):
if not self.pipe:
raise RuntimeError('No active pipe')
if ttl is None:
ttl = settings.lock_timeout
await self.lock.ttl(self.pipe, ttl)
Expand Down
10 changes: 9 additions & 1 deletion tests/test_worker_pause.py → tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from asyncio import get_event_loop, sleep
from typing import NamedTuple

from pytest import mark
from pytest import mark, raises

from sharded_queue import Handler, Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
Expand Down Expand Up @@ -33,3 +33,11 @@ async def test_worker_pause() -> None:
await queue.register(SignContract, SignContractRequest(2))
await worker_task
assert len(signed) == 2


@mark.asyncio
async def test_worker_prolongate_invalid_pipe() -> None:
queue: Queue = Queue(RuntimeStorage())
worker = Worker(RuntimeLock(), queue)
with raises(RuntimeError):
await worker.prolongate_lock()

0 comments on commit 8b1ac29

Please sign in to comment.