Skip to content

Commit

Permalink
defer and recurrent params set int type
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaChuvaev committed Mar 18, 2024
1 parent b5db9d1 commit a55d4c1
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ def __init__(

async def register(
self, handler: type[Handler], *requests: T,
defer: Optional[float | int | timedelta] = None,
defer: Optional[int | timedelta] = None,
if_not_exists: bool = False,
recurrent: Optional[float | int | timedelta] = None,
recurrent: Optional[int | timedelta] = None,
) -> None:
routes = await handler.route(*requests)
pipe_messages: list[tuple[str, Any]] = [
Expand Down Expand Up @@ -288,15 +288,15 @@ async def prolongate_lock(self, ttl: Optional[int] = None):


class DeferredRequest(NamedTuple):
timestamp: float
timestamp: int
pipe: str
msg: list

@classmethod
def calculate_timestamp(
cls,
delta: float | int | str | timedelta,
) -> float:
delta: int | str | timedelta,
) -> int:
if isinstance(delta, str):
return rrulestr(delta).after(datetime.now()).timestamp()

Expand All @@ -308,7 +308,7 @@ def calculate_timestamp(
if not isinstance(delta, timedelta):
timestamp = timestamp + delta

return timestamp
return int(timestamp)


class DeferredHandler(Handler):
Expand Down Expand Up @@ -353,7 +353,7 @@ async def handle(self, *requests: DeferredRequest) -> None:
def transform(
cls,
pipe_messages: list[tuple[str, T]],
defer: float | int | timedelta,
defer: int | timedelta,
serializer: Serializer,
) -> list[tuple[str, DeferredRequest]]:
timestamp = DeferredRequest.calculate_timestamp(defer)
Expand All @@ -371,22 +371,22 @@ def transform(


class RecurrentRequest(NamedTuple):
interval: float | str
interval: int | str
pipe: str
msg: list

@classmethod
def get_interval(
cls,
interval: int | float | timedelta | rrule
) -> float | str:
interval: int | timedelta | rrule
) -> int | str:
if isinstance(interval, rrule):
return str(interval)

if isinstance(interval, timedelta):
return float(int(interval.total_seconds()))
return int(interval.total_seconds())

return float(interval)
return interval


class RecurrentHandler(Handler):
Expand Down Expand Up @@ -423,10 +423,10 @@ async def handle(self, *requests: RecurrentRequest) -> None:
def transform(
cls,
pipe_messages: list[tuple[str, T]],
recurrent: float | int | timedelta | rrule,
recurrent: int | timedelta | rrule,
serializer: Serializer,
) -> list[tuple[str, RecurrentRequest]]:
interval: float | str = RecurrentRequest.get_interval(recurrent)
interval: int | str = RecurrentRequest.get_interval(recurrent)
return [
(
Tube(RecurrentHandler, Route()).pipe,
Expand Down

0 comments on commit a55d4c1

Please sign in to comment.