Skip to content

Commit

Permalink
deferred api fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 18, 2023
1 parent cc55810 commit 96ddb0d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 33 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ There is an optional if_not_exists flag. If it is set, request will be registere
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
```
## Deffered tasks
You can use built-in deffered task handler to delay call
## Deferred tasks
You can use built-in deferred task handler to defer call
```py
await queue.register(Housekeep, Room(402), delay=5) # numeric means seconds
await queue.register(Housekeep, Room(324), delay=timedelta(minutes=15))
await queue.register(Housekeep, Room(402), defer=5) # numeric means seconds
await queue.register(Housekeep, Room(324), defer=timedelta(minutes=15))
```
## Performance
Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with `pytest -s`, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about `200_000` rps for batch request registration with sharding and about `600_000` requests for request handling in concurrent mode.
Expand All @@ -173,8 +173,8 @@ You can configure sharded queue using env
Default queue priority
- `QUEUE_DEFAULT_THREAD = 0`\
Default queue thread
- `QUEUE_DEFFERED_RETRY_DELAY = 1`\
Deffered tasks retry delay
- `QUEUE_DEFERRED_RETRY_DELAY = 1`\
Deferred tasks retry delay
- `QUEUE_LOCK_PREFIX = 'lock_'`\
Lock key prefix
- `QUEUE_LOCK_TIMEOUT = 24 * 60 * 60`\
Expand Down
34 changes: 17 additions & 17 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(

async def register(
self, handler: type[Handler], *requests: T,
delay: Optional[float | int | timedelta] = None,
defer: Optional[float | int | timedelta] = None,
if_not_exists: bool = False
) -> None:
routes = await handler.route(*requests)
Expand All @@ -96,12 +96,12 @@ async def register(
for n in range(len(routes))
]

if delay:
timestamp = DefferedRequest.calculate_timestamp(delay)
if defer:
timestamp = DeferredRequest.calculate_timestamp(defer)
pipe_messages = [
(
Tube(DefferedHandler, Route()).pipe,
DefferedRequest(timestamp, pipe, values),
Tube(DeferredHandler, Route()).pipe,
DeferredRequest(timestamp, pipe, values),
)
for (pipe, values)
in [
Expand Down Expand Up @@ -183,7 +183,7 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
]

async with tube.context() as instance:
if isinstance(instance, DefferedHandler):
if isinstance(instance, DeferredHandler):
instance.queue = self.queue

while limit is None or limit > processed_counter:
Expand Down Expand Up @@ -216,36 +216,36 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int:
return processed_counter


class DefferedRequest(NamedTuple):
class DeferredRequest(NamedTuple):
timestamp: float
pipe: str
msg: list

@classmethod
def calculate_timestamp(cls, delay: float | int | timedelta) -> float:
def calculate_timestamp(cls, delta: float | int | timedelta) -> float:
now: datetime = datetime.now()
if isinstance(delay, timedelta):
now = now + delay
if isinstance(delta, timedelta):
now = now + delta

timestamp: float = now.timestamp()
if not isinstance(delay, timedelta):
timestamp = delay = delay
if not isinstance(delta, timedelta):
timestamp = delta = delta

return timestamp


class DefferedHandler(Handler):
class DeferredHandler(Handler):
queue: Queue

async def handle(self, *requests: DefferedRequest) -> None:
async def handle(self, *requests: DeferredRequest) -> None:
now: float = datetime.now().timestamp()
pending: list[DefferedRequest] = [
pending: list[DeferredRequest] = [
request for request in requests
if request.timestamp > now
]

if len(pending):
await self.queue.register(DefferedHandler, *pending)
await self.queue.register(DeferredHandler, *pending)

todo: list[tuple[str, list]] = [
(request.pipe, request.msg)
Expand All @@ -265,4 +265,4 @@ async def handle(self, *requests: DefferedRequest) -> None:
])

if len(pending) and not len(todo):
await sleep(settings.deffered_retry_delay)
await sleep(settings.deferred_retry_delay)
4 changes: 2 additions & 2 deletions sharded_queue/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class ShardedQueueSettings(BaseSettings):
title='Default queue thread'
)

deffered_retry_delay: float = Field(
deferred_retry_delay: float = Field(
default=1,
title='Deffered tasks retry delay'
title='Defereed tasks retry delay'
)

lock_prefix: str = Field(
Expand Down
16 changes: 8 additions & 8 deletions tests/test_deffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pytest import mark

from sharded_queue import DefferedHandler, Handler, Queue, Route, Tube, Worker
from sharded_queue import DeferredHandler, Handler, Queue, Route, Tube, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
from sharded_queue.protocols import Storage

Expand All @@ -19,31 +19,31 @@ async def handle(self, *requests: BucketRequest) -> None:


@mark.asyncio
async def test_deffered() -> None:
async def test_deferred() -> None:
storage: Storage = RuntimeStorage()
queue: Queue = Queue(storage)
await queue.register(
DropBucket,
BucketRequest(1),
delay=timedelta(milliseconds=10),
defer=timedelta(milliseconds=10),
)

deffered_pipe: str = Tube(DefferedHandler, Route()).pipe
deferred_pipe: str = Tube(DeferredHandler, Route()).pipe
drop_pipe: str = Tube(DropBucket, Route()).pipe

assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deffered_pipe) == 1
assert await queue.storage.length(deferred_pipe) == 1

await Worker(RuntimeLock(), queue).loop(1)
assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deffered_pipe) == 1
assert await queue.storage.length(deferred_pipe) == 1

await sleep(0.01)

await Worker(RuntimeLock(), queue).loop(1)
assert await queue.storage.length(drop_pipe) == 1
assert await queue.storage.length(deffered_pipe) == 0
assert await queue.storage.length(deferred_pipe) == 0

await Worker(RuntimeLock(), queue).loop(1)
assert await queue.storage.length(drop_pipe) == 0
assert await queue.storage.length(deffered_pipe) == 0
assert await queue.storage.length(deferred_pipe) == 0

0 comments on commit 96ddb0d

Please sign in to comment.