Skip to content

Commit

Permalink
redis coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 12, 2023
1 parent f8a3f21 commit e2270ab
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 23 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ async def example():
'''
```

## Drivers
There are several implementations of components:
- `RedisCoordinator` persist queue binds in redis using setnx api
- `RedisStorage` persist msgs using lists and lrange/lpop/rpush api
- `RuntimeCoordinator` persist queue binds in memory and can be used in a simple runtime distribution
- `RuntimeStorage` persist msgs in dict and can be used in a simple runtime distribution
## Handler lifecycle

As you can notice, routing is made using static method, but perform is an instance method. When a worker start processing requests it can bootstrap and tear down the handler using `start` and `stop` methods
Expand Down Expand Up @@ -143,6 +149,10 @@ class ParseEventHandler(Handler):
You can configure sharded queue using env
- `QUEUE_COORDINATOR_DELAY = 1`\
Coordinator delay in seconds on empty queues
- `QUEUE_COORDINATOR_PREFIX = 'lock_'`\
Coordinator lock prefix
- `QUEUE_COORDINATOR_TIMEOUT = 24 * 60 * 60`\
Coordinator lock ttl
- `QUEUE_DEFAULT_PRIORITY = 0`\
Default queue priority
- `QUEUE_DEFAULT_THREAD = 0`\
Expand Down
72 changes: 49 additions & 23 deletions sharded_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ class ShardedQueueSettings(BaseSettings):
title="Coordinator delay in seconds on empty queues"
)

coordinator_prefix: str = Field(
default="lock_",
title="Coordinator lock prefix"
)

coordinator_timeout: int = Field(
default=24*60*60,
title="Coordinator lock ttl"
)

default_priority: int = Field(
default='0',
title='Default queue priority'
Expand Down Expand Up @@ -294,29 +304,6 @@ async def unbind(self, pipe: str) -> None:
del self.binds[pipe]


class RedisStorage(Storage):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def append(self, tube: str, *msgs: str) -> int:
return await self.redis.rpush(settings.tube_prefix + tube, *msgs)

async def length(self, tube: str) -> int:
return await self.redis.llen(settings.tube_prefix + tube)

async def pop(self, tube: str, max: int) -> list[str]:
return await self.redis.lpop(settings.tube_prefix + tube, max) or []

async def pipes(self) -> list[str]:
return [
key[len(settings.tube_prefix):]
for key in await self.redis.keys(settings.tube_prefix + '*')
]

async def range(self, tube: str, max: int) -> list[str]:
return await self.redis.lrange(settings.tube_prefix + tube, 0, max-1) or []


class RuntimeStorage(Storage):
data: dict[str, List[str]]

Expand Down Expand Up @@ -347,3 +334,42 @@ async def range(self, tube: str, max: int) -> list[str]:
return self.data[tube][0:max] if tube in self.data else []


class RedisCoordinator(Coordinator):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def bind(self, tube: str) -> bool:
return None is not await self.redis.set(
name=settings.coordinator_prefix + tube,
ex=settings.coordinator_timeout,
nx=True,
value=1,
)

async def unbind(self, tube: str) -> None:
await self.redis.delete(settings.coordinator_prefix + tube)


class RedisStorage(Storage):
def __init__(self, redis: Redis) -> None:
self.redis = redis

async def append(self, tube: str, *msgs: str) -> int:
return await self.redis.rpush(settings.tube_prefix + tube, *msgs)

async def length(self, tube: str) -> int:
return await self.redis.llen(settings.tube_prefix + tube)

async def pop(self, tube: str, max: int) -> list[str]:
return await self.redis.lpop(settings.tube_prefix + tube, max) or []

async def pipes(self) -> list[str]:
return [
key[len(settings.tube_prefix):]
for key in await self.redis.keys(settings.tube_prefix + '*')
]

async def range(self, tube: str, max: int) -> list[str]:
return await self.redis.lrange(settings.tube_prefix + tube, 0, max-1) or []


0 comments on commit e2270ab

Please sign in to comment.