Skip to content

Commit

Permalink
feat: add Valkey as a native store (#3892)
Browse files Browse the repository at this point in the history
* feat: valkey first-class store support

Adds support for Valkey as a store option, alternative to Redis.

* test: test valkey store support

Adds (very) basic testing for the Valkey store type.

* docs: expand docs to include Valkey store

Adds an autogenerated documentation page for the ValkeyStore itself.
Additionally includes some notes on the existing `stores.rst` usage page indicating the equivalence of Valkey/Redis.

---------

Co-authored-by: Jordan Russell <[email protected]>
  • Loading branch information
ftsartek and Jordan Russell authored Dec 15, 2024
1 parent 87f8aa0 commit 6c2bb39
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 8 deletions.
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"trio": ("https://trio.readthedocs.io/en/stable/", None),
"pydantic": ("https://docs.pydantic.dev/latest/", None),
"typing_extensions": ("https://typing-extensions.readthedocs.io/en/stable/", None),
"valkey": ("https://valkey-py.readthedocs.io/en/latest/", None),
}

napoleon_google_docstring = True
Expand Down Expand Up @@ -102,6 +103,7 @@
(PY_CLASS, "sqlalchemy.dialects.postgresql.named_types.ENUM"),
(PY_CLASS, "sqlalchemy.orm.decl_api.DeclarativeMeta"),
(PY_CLASS, "sqlalchemy.sql.sqltypes.TupleType"),
(PY_CLASS, "valkey.asyncio.Valkey"),
(PY_METH, "_types.TypeDecorator.process_bind_param"),
(PY_METH, "_types.TypeDecorator.process_result_value"),
(PY_METH, "litestar.typing.ParsedType.is_subclass_of"),
Expand Down
1 change: 1 addition & 0 deletions docs/reference/stores/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ stores
memory
redis
registry
valkey
5 changes: 5 additions & 0 deletions docs/reference/stores/valkey.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
valkey
======

.. automodule:: litestar.stores.valkey
:members:
6 changes: 6 additions & 0 deletions docs/usage/stores.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Built-in stores
A store backend by `redis <https://redis.io/>`_. It offers all the guarantees and features of Redis, making it
suitable for almost all applications. Offers `namespacing`_.

:class:`ValKeyStore <litestar.stores.valkey.ValkeyStore>`
A store backed by `valkey <https://valkey.io>`_, a fork of Redis created as the result of Redis' license changes.
Similarly to the RedisStore, it is suitable for almost all applications and supports `namespacing`_.
At the time of writing, :class:`Valkey <valkey.asyncio.Valkey>` is equivalent to :class:`redis.asyncio.Redis`,
and all notes pertaining to Redis also apply to Valkey.

.. admonition:: Why not memcached?
:class: info

Expand Down
210 changes: 210 additions & 0 deletions litestar/stores/valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, cast

from valkey.asyncio import Valkey
from valkey.asyncio.connection import ConnectionPool

from litestar.exceptions import ImproperlyConfiguredException
from litestar.types import Empty, EmptyType
from litestar.utils.empty import value_or_default

from .base import NamespacedStore

if TYPE_CHECKING:
from types import TracebackType


__all__ = ("ValkeyStore",)


class ValkeyStore(NamespacedStore):
"""Valkey based, thread and process safe asynchronous key/value store."""

__slots__ = (
"_delete_all_script",
"_get_and_renew_script",
"_valkey",
"handle_client_shutdown",
)

def __init__(
self, valkey: Valkey, namespace: str | None | EmptyType = Empty, handle_client_shutdown: bool = False
) -> None:
"""Initialize :class:`ValkeyStore`
Args:
valkey: An :class:`valkey.asyncio.Valkey` instance
namespace: A key prefix to simulate a namespace in valkey. If not given,
defaults to ``LITESTAR``. Namespacing can be explicitly disabled by passing
``None``. This will make :meth:`.delete_all` unavailable.
handle_client_shutdown: If ``True``, handle the shutdown of the `valkey` instance automatically during the store's lifespan. Should be set to `True` unless the shutdown is handled externally
"""
self._valkey = valkey
self.namespace: str | None = value_or_default(namespace, "LITESTAR")
self.handle_client_shutdown = handle_client_shutdown

# script to get and renew a key in one atomic step
self._get_and_renew_script = self._valkey.register_script(
b"""
local key = KEYS[1]
local renew = tonumber(ARGV[1])
local data = server.call('GET', key)
local ttl = server.call('TTL', key)
if ttl > 0 then
server.call('EXPIRE', key, renew)
end
return data
"""
)

# script to delete all keys in the namespace
self._delete_all_script = self._valkey.register_script(
b"""
local cursor = 0
repeat
local result = server.call('SCAN', cursor, 'MATCH', ARGV[1])
for _,key in ipairs(result[2]) do
server.call('UNLINK', key)
end
cursor = tonumber(result[1])
until cursor == 0
"""
)

async def _shutdown(self) -> None:
if self.handle_client_shutdown:
await self._valkey.aclose(close_connection_pool=True)

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self._shutdown()

@classmethod
def with_client(
cls,
url: str = "valkey://localhost:6379",
*,
db: int | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
namespace: str | None | EmptyType = Empty,
) -> ValkeyStore:
"""Initialize a :class:`ValkeyStore` instance with a new class:`valkey.asyncio.Valkey` instance.
Args:
url: Valkey URL to connect to
db: Valkey database to use
port: Valkey port to use
username: Valkey username to use
password: Valkey password to use
namespace: Virtual key namespace to use
"""
pool: ConnectionPool = ConnectionPool.from_url(
url=url,
db=db,
decode_responses=False,
port=port,
username=username,
password=password,
)
return cls(
valkey=Valkey(connection_pool=pool),
namespace=namespace,
handle_client_shutdown=True,
)

def with_namespace(self, namespace: str) -> ValkeyStore:
"""Return a new :class:`ValkeyStore` with a nested virtual key namespace.
The current instances namespace will serve as a prefix for the namespace, so it
can be considered the parent namespace.
"""
return type(self)(
valkey=self._valkey,
namespace=f"{self.namespace}_{namespace}" if self.namespace else namespace,
handle_client_shutdown=self.handle_client_shutdown,
)

def _make_key(self, key: str) -> str:
prefix = f"{self.namespace}:" if self.namespace else ""
return prefix + key

async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
"""Set a value.
Args:
key: Key to associate the value with
value: Value to store
expires_in: Time in seconds before the key is considered expired
Returns:
``None``
"""
if isinstance(value, str):
value = value.encode("utf-8")
await self._valkey.set(self._make_key(key), value, ex=expires_in)

async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
"""Get a value.
Args:
key: Key associated with the value
renew_for: If given and the value had an initial expiry time set, renew the
expiry time for ``renew_for`` seconds. If the value has not been set
with an expiry time this is a no-op. Atomicity of this step is guaranteed
by using a lua script to execute fetch and renewal. If ``renew_for`` is
not given, the script will be bypassed so no overhead will occur
Returns:
The value associated with ``key`` if it exists and is not expired, else
``None``
"""
key = self._make_key(key)
if renew_for:
if isinstance(renew_for, timedelta):
renew_for = renew_for.seconds
data = await self._get_and_renew_script(keys=[key], args=[renew_for])
return cast("bytes | None", data)
return await self._valkey.get(key) # type: ignore[no-any-return]

async def delete(self, key: str) -> None:
"""Delete a value.
If no such key exists, this is a no-op.
Args:
key: Key of the value to delete
"""
await self._valkey.delete(self._make_key(key))

async def delete_all(self) -> None:
"""Delete all stored values in the virtual key namespace.
Raises:
ImproperlyConfiguredException: If no namespace was configured
"""
if not self.namespace:
raise ImproperlyConfiguredException("Cannot perform delete operation: No namespace configured")

await self._delete_all_script(keys=[], args=[f"{self.namespace}*:*"])

async def exists(self, key: str) -> bool:
"""Check if a given ``key`` exists."""
return await self._valkey.exists(self._make_key(key)) == 1 # type: ignore[no-any-return]

async def expires_in(self, key: str) -> int | None:
"""Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
expiry time was set, return ``None``.
"""
ttl = await self._valkey.ttl(self._make_key(key))
return None if ttl == -2 else ttl
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ brotli = ["brotli"]
cli = ["jsbeautifier", "uvicorn[standard]", "uvloop>=0.18.0; sys_platform != 'win32'"]
cryptography = ["cryptography"]
full = [
"litestar[annotated-types,attrs,brotli,cli,cryptography,jinja,jwt,mako,minijinja,opentelemetry,piccolo,picologging,prometheus,pydantic,redis,sqlalchemy,standard,structlog]",
"litestar[annotated-types,attrs,brotli,cli,cryptography,jinja,jwt,mako,minijinja,opentelemetry,piccolo,picologging,prometheus,pydantic,redis,sqlalchemy,standard,structlog,valkey]",
]
jinja = ["jinja2>=3.1.2"]
jwt = [
Expand All @@ -98,6 +98,7 @@ picologging = ["picologging"]
prometheus = ["prometheus-client"]
pydantic = ["pydantic", "email-validator", "pydantic-extra-types"]
redis = ["redis[hiredis]>=4.4.4"]
valkey = ["valkey[libvalkey]>=6.0.2"]
sqlalchemy = ["advanced-alchemy>=0.2.2"]
standard = ["jinja2", "jsbeautifier", "uvicorn[standard]", "uvloop>=0.18.0; sys_platform != 'win32'", "fast-query-parsers>=1.0.2"]
structlog = ["structlog"]
Expand Down
29 changes: 28 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from redis.asyncio import Redis as AsyncRedis
from redis.client import Redis
from time_machine import travel
from valkey.asyncio import Valkey as AsyncValkey
from valkey.client import Valkey

from litestar.logging import LoggingConfig
from litestar.middleware.session import SessionMiddleware
Expand All @@ -29,6 +31,7 @@
from litestar.stores.file import FileStore
from litestar.stores.memory import MemoryStore
from litestar.stores.redis import RedisStore
from litestar.stores.valkey import ValkeyStore
from litestar.testing import RequestFactory
from tests.helpers import not_none

Expand Down Expand Up @@ -82,6 +85,11 @@ def redis_store(redis_client: AsyncRedis) -> RedisStore:
return RedisStore(redis=redis_client)


@pytest.fixture()
def valkey_store(valkey_client: AsyncValkey) -> ValkeyStore:
return ValkeyStore(valkey=valkey_client)


@pytest.fixture()
def memory_store() -> MemoryStore:
return MemoryStore()
Expand All @@ -105,7 +113,12 @@ def file_store_create_directories_flag_false(tmp_path: Path) -> FileStore:


@pytest.fixture(
params=[pytest.param("redis_store", marks=pytest.mark.xdist_group("redis")), "memory_store", "file_store"]
params=[
pytest.param("redis_store", marks=pytest.mark.xdist_group("redis")),
pytest.param("valkey_store", marks=pytest.mark.xdist_group("valkey")),
"memory_store",
"file_store",
]
)
def store(request: FixtureRequest) -> Store:
return cast("Store", request.getfixturevalue(request.param))
Expand Down Expand Up @@ -327,6 +340,20 @@ async def redis_client(docker_ip: str, redis_service: None) -> AsyncGenerator[As
pass


@pytest.fixture()
async def valkey_client(docker_ip: str, valkey_service: None) -> AsyncGenerator[AsyncValkey, None]:
# this is to get around some weirdness with pytest-asyncio and valkey interaction
# on 3.8 and 3.9

Valkey(host=docker_ip, port=6381).flushall()
client: AsyncValkey = AsyncValkey(host=docker_ip, port=6381)
yield client
try:
await client.aclose()
except RuntimeError:
pass


@pytest.fixture(autouse=True)
def _patch_openapi_config(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("litestar.app.DEFAULT_OPENAPI_CONFIG", OpenAPIConfig(title="Litestar API", version="1.0.0"))
5 changes: 5 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ services:
restart: always
ports:
- "6397:6379" # use a non-standard port here
valkey:
image: valkey/valkey:latest
restart: always
ports:
- "6381:6379" # also a non-standard port
17 changes: 17 additions & 0 deletions tests/docker_service_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import pytest
from redis.asyncio import Redis as AsyncRedis
from redis.exceptions import ConnectionError as RedisConnectionError
from valkey.asyncio import Valkey as AsyncValkey
from valkey.exceptions import ConnectionError as ValkeyConnectionError

from litestar.utils import ensure_async_callable

Expand Down Expand Up @@ -127,6 +129,21 @@ async def redis_service(docker_services: DockerServiceRegistry) -> None:
await docker_services.start("redis", check=redis_responsive)


async def valkey_responsive(host: str) -> bool:
client: AsyncValkey = AsyncValkey(host=host, port=6381)
try:
return await client.ping()
except (ConnectionError, ValkeyConnectionError):
return False
finally:
await client.aclose()


@pytest.fixture()
async def valkey_service(docker_services: DockerServiceRegistry) -> None:
await docker_services.start("valkey", check=valkey_responsive)


async def postgres_responsive(host: str) -> bool:
try:
conn = await asyncpg.connect(
Expand Down
Loading

0 comments on commit 6c2bb39

Please sign in to comment.