Skip to content

Commit

Permalink
refactor settings
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Oct 9, 2023
1 parent 8b1ac29 commit 151db97
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 127 deletions.
31 changes: 12 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,7 @@ class ParseEventHandler(Handler):
'''
override default single thread tube
'''
return [
Route(settings.default_thread, settings.default_priority)
for request in requests
]
return [Route(0, 0) for request in requests]

async def start(self):
'''
Expand Down Expand Up @@ -175,41 +172,37 @@ Performance dependends on many factors, we can only measure clean library overhe

## Advanced queue configuration
You can configure sharded queue using env
- `QUEUE_DEFAULT_PRIORITY = 0`\
Default queue priority
- `QUEUE_DEFAULT_THREAD = 0`\
Default queue thread
- `QUEUE_DEFERRED_RETRY_DELAY = 1`\
Deferred tasks retry delay
- `QUEUE_LOCK_PREFIX = 'lock_'`\
Lock key prefix
- `QUEUE_LOCK_TIMEOUT = 24 * 60 * 60`\
Lock key ttl
- `QUEUE_RECURRENT_CHECK_INTERVAL = 30`\
Recurrent interval check in seconds
- `QUEUE_RECURRENT_TASKS_LIMIT = 1024`\
Recurrent tasks limit count
- `QUEUE_TUBE_PREFIX = 'tube_'`\
- `QUEUE_STORAGE_PREFIX = 'tube_'`\
Default queue prefix
- `QUEUE_WORKER_ACQUIRE_DELAY = 1`\
Worker acquire delay in seconds on empty queues
- `QUEUE_WORKER_BATCH_SIZE = 128`\
Worker batch processing size
- `QUEUE_WORKER_DEFERRED_RETRY_DELAY = 1`\
Deferred tasks retry delay
- `QUEUE_WORKER_EMPTY_LIMIT = 16`\
Worker empty queue attempt limit berfore queue rebind
- `QUEUE_WORKER_EMPTY_PAUSE = 0.1`\
Worker pause in seconds on empty queue
- `QUEUE_WORKER_RECURRENT_CHECK_INTERVAL = 30`\
Recurrent interval check in seconds
- `QUEUE_WORKER_RECURRENT_TASKS_LIMIT = 1024`\
Recurrent tasks limit count

You can import and change settings manually
You can change runtime settings
```py
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
from sharded_queue.settings import settings

settings.worker_acquire_delay = 5
settings.worker_batch_size = 64

worker = Worker(RuntimeLock(), Queue(RuntimeStorage()))
worker.lock.settings.timeout = 5 * 60
worker.settings.acquire_delay = 5
worker.settings.batch_size = 64
await worker.loop()

```
66 changes: 34 additions & 32 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from asyncio import all_tasks, current_task, gather, get_event_loop, sleep
from asyncio import get_event_loop, sleep
from contextlib import asynccontextmanager
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from functools import cache, partial
from functools import cache
from importlib import import_module
from signal import SIGTERM
from typing import (Any, AsyncGenerator, Generic, NamedTuple, Optional, Self,
TypeVar, get_type_hints)

from sharded_queue.drivers import JsonTupleSerializer
from sharded_queue.protocols import Lock, Serializer, Storage
from sharded_queue.settings import settings
from sharded_queue.settings import WorkerSettings

T = TypeVar('T')


class Route(NamedTuple):
thread: int = settings.default_thread
priority: int = settings.default_priority
thread: int = 0
priority: int = 0


class Handler(Generic[T]):
Expand All @@ -36,10 +36,7 @@ def request_cls(cls) -> type[T]:

@classmethod
async def route(cls, *requests: T) -> list[Route]:
return [
Route(settings.default_thread, settings.default_priority)
for _ in requests
]
return [Route() for _ in requests]

async def start(self) -> None:
pass
Expand Down Expand Up @@ -84,7 +81,9 @@ async def context(self) -> AsyncGenerator:
@dataclass
class Queue(Generic[T]):
def __init__(
self, storage: Storage, serializer: Optional[Serializer] = None
self,
storage: Storage,
serializer: Optional[Serializer] = None,
):
self.storage = storage
self.serializer = serializer or JsonTupleSerializer()
Expand Down Expand Up @@ -133,6 +132,7 @@ class Worker:
lock: Lock
queue: Queue
pipe: Optional[str] = None
settings: WorkerSettings = field(default_factory=WorkerSettings)

async def acquire_tube(
self, handler: Optional[type[Handler]] = None
Expand Down Expand Up @@ -161,17 +161,17 @@ async def acquire_tube(
return tube

if all_pipes:
await sleep(settings.worker_acquire_delay)
await sleep(self.settings.acquire_delay)
else:
all_pipes = True

return None

def page_size(self, limit: Optional[int] = None) -> int:
if limit is None:
return settings.worker_batch_size
return self.settings.batch_size

return min(limit, settings.worker_batch_size)
return min(limit, self.settings.batch_size)

async def loop(
self,
Expand Down Expand Up @@ -213,13 +213,13 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:

async with tube.context() as instance:
if isinstance(instance, DeferredHandler | RecurrentHandler):
instance.queue = self.queue
instance.worker = self

while get_event_loop().is_running() and (
limit is None or limit > processed_counter
):
if tube.handler is RecurrentHandler:
page_size = settings.recurrent_tasks_limit
page_size = self.settings.recurrent_tasks_limit
else:
page_size = self.page_size(limit)
processed = False
Expand All @@ -234,7 +234,7 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:

if tube.handler is RecurrentHandler:
await self.prolongate_lock(
settings.recurrent_check_interval
self.settings.recurrent_check_interval
)
return len(msgs)

Expand All @@ -247,12 +247,14 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:

if not processed:
empty_counter = empty_counter + 1
if empty_counter >= settings.worker_empty_limit:
if empty_counter >= self.settings.empty_limit:
break
await sleep(settings.worker_empty_pause)
await sleep(self.settings.empty_pause)

if tube.handler is DeferredHandler:
await self.prolongate_lock(settings.deferred_retry_delay)
await self.prolongate_lock(
self.settings.deferred_retry_delay
)
return processed_counter
else:
await self.prolongate_lock()
Expand All @@ -266,7 +268,7 @@ async def prolongate_lock(self, ttl: Optional[int] = None):
if not self.pipe:
raise RuntimeError('No active pipe')
if ttl is None:
ttl = settings.lock_timeout
ttl = self.lock.settings.timeout
await self.lock.ttl(self.pipe, ttl)


Expand All @@ -289,7 +291,7 @@ def calculate_timestamp(cls, delta: float | int | timedelta) -> float:


class DeferredHandler(Handler):
queue: Queue
worker: Worker

async def handle(self, *requests: DeferredRequest) -> None:
now: float = datetime.now().timestamp()
Expand All @@ -299,7 +301,7 @@ async def handle(self, *requests: DeferredRequest) -> None:
]

if len(pending):
await self.queue.register(DeferredHandler, *pending)
await self.worker.queue.register(DeferredHandler, *pending)

todo: list[tuple[str, list]] = [
(request.pipe, request.msg)
Expand All @@ -311,18 +313,18 @@ async def handle(self, *requests: DeferredRequest) -> None:
ready = [
msg for msg in
[
self.queue.serializer.serialize(request)
self.worker.queue.serializer.serialize(request)
for (candidate_pipe, request) in todo
if candidate_pipe == pipe
]
if not await self.queue.storage.contains(pipe, msg)
if not await self.worker.queue.storage.contains(pipe, msg)
]

if len(ready):
await self.queue.storage.append(pipe, *ready)
await self.worker.queue.storage.append(pipe, *ready)

if len(pending) and not len(todo):
await sleep(settings.deferred_retry_delay)
await sleep(self.worker.settings.deferred_retry_delay)

@classmethod
def transform(
Expand Down Expand Up @@ -359,16 +361,16 @@ def get_interval(cls, interval: int | float | timedelta) -> float:


class RecurrentHandler(Handler):
queue: Queue
worker: Worker

async def handle(self, *requests: RecurrentRequest) -> None:
deferred_pipe: str = Tube(DeferredHandler, Route()).pipe
deferred_requests: list[tuple[str, str]] = [
(request.pipe, request.msg)
for request in [
self.queue.serializer.deserialize(DeferredRequest, msg)
for msg in await self.queue.storage.range(
deferred_pipe, settings.recurrent_tasks_limit
self.worker.queue.serializer.deserialize(DeferredRequest, msg)
for msg in await self.worker.queue.storage.range(
deferred_pipe, self.worker.settings.recurrent_tasks_limit
)
]
]
Expand All @@ -384,7 +386,7 @@ async def handle(self, *requests: RecurrentRequest) -> None:
]

if len(todo):
await self.queue.register(
await self.worker.queue.register(
DeferredHandler, *todo, if_not_exists=True
)

Expand Down
28 changes: 17 additions & 11 deletions sharded_queue/drivers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from json import dumps, loads
from typing import Any, List, Sequence
from typing import Any, List, Optional, Sequence

from redis.asyncio import Redis

from sharded_queue.protocols import Lock, Serializer, Storage
from sharded_queue.settings import settings
from sharded_queue.settings import LockSettings, StorageSettings


class JsonTupleSerializer(Serializer):
Expand Down Expand Up @@ -81,29 +81,32 @@ async def range(self, tube: str, max: int) -> list[str]:


class RedisLock(Lock):
def __init__(self, redis: Redis) -> None:
def __init__(
self, redis: Redis, settings: Optional[LockSettings] = None
) -> None:
self.redis = redis
self.settings = settings or LockSettings()

async def acquire(self, key: str) -> bool:
return None is not await self.redis.set(
name=settings.lock_prefix + key,
ex=settings.lock_timeout,
name=self.settings.prefix + key,
ex=self.settings.timeout,
nx=True,
value=1,
)

async def exists(self, key: str) -> bool:
checker = await self.redis.exists(
settings.lock_prefix + key
self.settings.prefix + key
)
return bool(checker)

async def release(self, key: str) -> None:
await self.redis.delete(settings.lock_prefix + key)
await self.redis.delete(self.settings.prefix + key)

async def ttl(self, key: str, ttl: int) -> bool:
setter = await self.redis.set(
settings.lock_prefix + key,
self.settings.prefix + key,
value=key,
ex=ttl,
xx=True
Expand All @@ -112,8 +115,11 @@ async def ttl(self, key: str, ttl: int) -> bool:


class RedisStorage(Storage):
def __init__(self, redis: Redis) -> None:
def __init__(
self, redis: Redis, settings: Optional[StorageSettings] = None
) -> None:
self.redis = redis
self.settings = settings or StorageSettings()

async def append(self, tube: str, *msgs: str) -> int:
return await self.redis.rpush(self.key(tube), *msgs)
Expand All @@ -122,14 +128,14 @@ async def contains(self, tube: str, msg: str) -> bool:
return await self.redis.lpos(self.key(tube), msg) is not None

def key(self, tube):
return settings.tube_prefix + tube
return self.settings.prefix + tube

async def length(self, tube: str) -> int:
return await self.redis.llen(self.key(tube))

async def pipes(self) -> list[str]:
return [
key[len(settings.tube_prefix):]
key[len(self.settings.prefix):]
for key in await self.redis.keys(self.key('*'))
]

Expand Down
6 changes: 6 additions & 0 deletions sharded_queue/protocols.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Any, Protocol

from sharded_queue.settings import LockSettings, StorageSettings


class Lock(Protocol):
settings: LockSettings = LockSettings()

async def acquire(self, key: str) -> bool: ...
async def exists(self, key: str) -> bool: ...
async def release(self, key: str) -> None: ...
Expand All @@ -15,6 +19,8 @@ def deserialize(self, cls: type[Any], source: str) -> Any: ...


class Storage(Protocol):
settings: StorageSettings = StorageSettings()

async def append(self, tube: str, *msgs: str) -> int: ...
async def contains(self, tube: str, msg: str) -> bool: ...
async def length(self, tube: str) -> int: ...
Expand Down
Loading

0 comments on commit 151db97

Please sign in to comment.