Skip to content

Commit

Permalink
recurrent by schedule rule
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaChuvaev committed Feb 16, 2024
1 parent 89203dd commit ceede4f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pydantic-settings>=2.0.0
python-dateutil==2.8.2
23 changes: 18 additions & 5 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import (Any, AsyncGenerator, Generic, NamedTuple, Optional, Self,
TypeVar, get_type_hints)

from dateutil.rrule import rrule, rrulestr
from sharded_queue.drivers import JsonTupleSerializer
from sharded_queue.protocols import Lock, Serializer, Storage
from sharded_queue.settings import WorkerSettings
Expand Down Expand Up @@ -292,7 +293,13 @@ class DeferredRequest(NamedTuple):
msg: list

@classmethod
def calculate_timestamp(cls, delta: float | int | timedelta) -> float:
def calculate_timestamp(
cls,
delta: float | int | str | timedelta,
) -> float:
if isinstance(delta, str):
return rrulestr(delta).after(datetime.now()).timestamp()

now: datetime = datetime.now()
if isinstance(delta, timedelta):
now = now + delta
Expand Down Expand Up @@ -364,12 +371,18 @@ def transform(


class RecurrentRequest(NamedTuple):
interval: float
interval: float | str
pipe: str
msg: list

@classmethod
def get_interval(cls, interval: int | float | timedelta) -> float:
def get_interval(
cls,
interval: int | float | timedelta | rrule
) -> float | str:
if isinstance(interval, rrule):
return str(interval)

if isinstance(interval, timedelta):
return float(int(interval.total_seconds()))

Expand Down Expand Up @@ -410,10 +423,10 @@ async def handle(self, *requests: RecurrentRequest) -> None:
def transform(
cls,
pipe_messages: list[tuple[str, T]],
recurrent: float | int | timedelta,
recurrent: float | int | timedelta | rrule,
serializer: Serializer,
) -> list[tuple[str, RecurrentRequest]]:
interval: float = RecurrentRequest.get_interval(recurrent)
interval: float | str = RecurrentRequest.get_interval(recurrent)
return [
(
Tube(RecurrentHandler, Route()).pipe,
Expand Down

0 comments on commit ceede4f

Please sign in to comment.