Skip to content

Commit

Permalink
fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 12, 2023
1 parent 416893a commit 546b8b6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
22 changes: 11 additions & 11 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from functools import cache
from importlib import import_module
from json import dumps, loads
from typing import (Any, Generic, List, NamedTuple, Optional, Protocol,
Sequence, TypeVar, get_type_hints)
from typing import (Any, AsyncGenerator, Generic, List, NamedTuple, Optional,
Protocol, Self, Sequence, TypeVar, get_type_hints)

from pydantic import BaseSettings, Field

Expand Down Expand Up @@ -64,7 +64,7 @@ class Handler(Generic[T]):
orders: Optional[list[str]] = None

@classmethod
async def create(cls):
async def create(cls) -> Self:
return cls()

@classmethod
Expand All @@ -81,13 +81,13 @@ async def route(cls, *requests: T) -> list[RequestRoute]:
for request in requests
]

async def start(self):
async def start(self) -> None:
pass

async def handle(self, *requests: T):
async def handle(self, *requests: T) -> None:
raise NotImplementedError()

async def stop(self):
async def stop(self) -> None:
pass


Expand All @@ -105,7 +105,7 @@ def pipe(self) -> str:
])

@asynccontextmanager
async def context(self):
async def context(self) -> AsyncGenerator:
instance = await self.handler.create()
await instance.start()
try:
Expand Down Expand Up @@ -156,7 +156,7 @@ class Queue(Generic[T]):
serializer: Serializer
storage: Storage

async def register(self, handler: type[Handler], *requests: T):
async def register(self, handler: type[Handler], *requests: T) -> None:
tubes: list[RequestTube] = [
RequestTube(request, Tube(handler, route))
for (request, route)
Expand Down Expand Up @@ -202,7 +202,7 @@ async def acquire_tube(self, queue: Queue) -> Tube:
async def bind(self, tube: str) -> bool:
raise NotImplementedError

async def unbind(self, tube: str):
async def unbind(self, tube: str) -> None:
raise NotImplementedError


Expand All @@ -217,7 +217,7 @@ def page_size(self, limit: Optional[int] = None) -> int:

return min(limit, settings.worker_batch_size)

async def loop(self, limit: Optional[int] = None):
async def loop(self, limit: Optional[int] = None) -> None:
processed = 0
while True and limit is None or limit > processed:
tube = await self.coordinator.acquire_tube(self.queue)
Expand Down Expand Up @@ -279,7 +279,7 @@ async def bind(self, pipe: str) -> bool:
self.binds[pipe] = True
return True

async def unbind(self, pipe: str):
async def unbind(self, pipe: str) -> None:
del self.binds[pipe]


Expand Down
12 changes: 6 additions & 6 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ async def route(cls, *requests: Request) -> list[RequestRoute]:
for request in requests
]

async def start(self):
async def start(self) -> None:
context['started'] = datetime.now().timestamp()

async def stop(self):
async def stop(self) -> None:
context['stopped'] = datetime.now().timestamp()

async def handle(self, *requests: Request):
async def handle(self, *requests: Request) -> None:
synced.extend(requests)


@mark.asyncio
async def test_queue():
async def test_queue() -> None:
queue = Queue(JsonTupleSerializer(), RuntimeStorage())

def get_pipe(thread: str):
def get_pipe(thread: str) -> str:
return Tube(SyncHandler, Route(thread=thread)).pipe

await queue.register(
Expand Down Expand Up @@ -94,7 +94,7 @@ def get_pipe(thread: str):


@mark.asyncio
async def test_storage():
async def test_storage() -> None:
storage = RuntimeStorage()
await storage.append('tester', 'q')
await storage.append('tester', 'w')
Expand Down

0 comments on commit 546b8b6

Please sign in to comment.