Skip to content

Commit

Permalink
storage test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 12, 2023
1 parent a7d78b6 commit 453ceee
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ async def handle(self, *requests: Request) -> None:

@mark.asyncio
async def test_queue() -> None:
queue: Queue = Queue(JsonTupleSerializer(), RuntimeStorage())
storage = RuntimeStorage()
queue: Queue = Queue(JsonTupleSerializer(), storage)

def get_pipe(thread: int) -> str:
return Tube(SyncHandler, Route(thread=str(thread))).pipe
Expand All @@ -54,13 +55,13 @@ def get_pipe(thread: int) -> str:
Request(5),
)

assert get_pipe(0) in queue.storage.data
assert get_pipe(1) in queue.storage.data
assert get_pipe(2) not in queue.storage.data
assert get_pipe(0) in storage.data
assert get_pipe(1) in storage.data
assert get_pipe(2) not in storage.data

assert await queue.storage.length(get_pipe(0)) == 2
assert await queue.storage.length(get_pipe(1)) == 3
assert await queue.storage.length(get_pipe(2)) == 0
assert await storage.length(get_pipe(0)) == 2
assert await storage.length(get_pipe(1)) == 3
assert await storage.length(get_pipe(2)) == 0

assert context['started'] == 0
assert context['stopped'] == 0
Expand All @@ -76,19 +77,19 @@ def get_pipe(thread: int) -> str:
thread = str(synced[0].bucket % 2)

if thread == '0':
assert await queue.storage.length(get_pipe(0)) == 0
assert await queue.storage.length(get_pipe(1)) == 3
assert await storage.length(get_pipe(0)) == 0
assert await storage.length(get_pipe(1)) == 3
else:
assert await queue.storage.length(get_pipe(0)) == 2
assert await queue.storage.length(get_pipe(1)) == 1
assert await storage.length(get_pipe(0)) == 2
assert await storage.length(get_pipe(1)) == 1

# test worker switch
settings.worker_empty_pause = 0
(last_started, last_stopped) = tuple(context.values())
await worker.loop(3)
assert len(synced) == 5
assert await queue.storage.length(get_pipe(0)) == 0
assert await queue.storage.length(get_pipe(1)) == 0
assert await storage.length(get_pipe(0)) == 0
assert await storage.length(get_pipe(1)) == 0
assert last_started != context['started']
assert last_stopped != context['stopped']

Expand Down

0 comments on commit 453ceee

Please sign in to comment.