diff --git a/docs/examples/testing/subprocess_sse_app.py b/docs/examples/testing/subprocess_sse_app.py new file mode 100644 index 0000000000..0e922a1f93 --- /dev/null +++ b/docs/examples/testing/subprocess_sse_app.py @@ -0,0 +1,38 @@ +""" +Assemble components into an app that shall be tested +""" + +from typing import AsyncIterator + +from redis.asyncio import Redis + +from litestar import Litestar, get +from litestar.channels import ChannelsPlugin +from litestar.channels.backends.redis import RedisChannelsPubSubBackend +from litestar.response import ServerSentEvent + + +@get("/notify/{topic:str}") +async def get_notified(topic: str, channels: ChannelsPlugin) -> ServerSentEvent: + async def generator() -> AsyncIterator[bytes]: + async with channels.start_subscription([topic]) as subscriber: + async for event in subscriber.iter_events(): + yield event + + return ServerSentEvent(generator(), event_type="Notifier") + + +def create_test_app() -> Litestar: + redis_instance = Redis() + channels_backend = RedisChannelsPubSubBackend(redis=redis_instance) + channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True) + + return Litestar( + route_handlers=[ + get_notified, + ], + plugins=[channels_instance], + ) + + +app = create_test_app() diff --git a/docs/examples/testing/test_subprocess_sse.py b/docs/examples/testing/test_subprocess_sse.py new file mode 100644 index 0000000000..900b6b9896 --- /dev/null +++ b/docs/examples/testing/test_subprocess_sse.py @@ -0,0 +1,72 @@ +""" +Test the app running in a subprocess +""" + +import asyncio +import pathlib +import sys +from typing import AsyncIterator + +import httpx +import httpx_sse +import pytest +from redis.asyncio import Redis + +from litestar.channels import ChannelsPlugin +from litestar.channels.backends.redis import RedisChannelsPubSubBackend +from litestar.testing import subprocess_async_client + +if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + +pytestmark = pytest.mark.anyio + + +@pytest.fixture(scope="session") +def anyio_backend() -> str: + return "asyncio" + + +ROOT = pathlib.Path(__file__).parent + + +@pytest.fixture(name="async_client", scope="session") +async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]: + async with subprocess_async_client(workdir=ROOT, app="subprocess_sse_app:app") as client: + yield client + + +@pytest.fixture(name="redis_channels") +async def fx_redis_channels() -> AsyncIterator[ChannelsPlugin]: + # Expects separate redis set-up + redis_instance = Redis() + channels_backend = RedisChannelsPubSubBackend(redis=redis_instance) + channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True) + await channels_instance._on_startup() + yield channels_instance + await channels_instance._on_shutdown() + + +async def test_subprocess_async_client(async_client: httpx.AsyncClient, redis_channels: ChannelsPlugin) -> None: + """Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the + regular async test client. + """ + topic = "demo" + message = "hello" + + running = asyncio.Event() + running.set() + + async def send_notifications() -> None: + while running.is_set(): + await redis_channels.wait_published(message, channels=[topic]) + await asyncio.sleep(0.1) + + task = asyncio.create_task(send_notifications()) + + async with httpx_sse.aconnect_sse(async_client, "GET", f"/notify/{topic}") as event_source: + async for event in event_source.aiter_sse(): + assert event.data == message + running.clear() + break + await task diff --git a/docs/reference/testing.rst b/docs/reference/testing.rst index 85767df5bf..ebc9bc1044 100644 --- a/docs/reference/testing.rst +++ b/docs/reference/testing.rst @@ -3,7 +3,7 @@ testing .. automodule:: litestar.testing - :members: RequestFactory, BaseTestClient, TestClient, AsyncTestClient, create_async_test_client, create_test_client + :members: RequestFactory, BaseTestClient, TestClient, AsyncTestClient, create_async_test_client, create_test_client, subprocess_sync_client, subprocess_async_client :undoc-members: WebSocketTestSession diff --git a/docs/usage/testing.rst b/docs/usage/testing.rst index e2a78651f5..d50a4c473e 100644 --- a/docs/usage/testing.rst +++ b/docs/usage/testing.rst @@ -287,6 +287,31 @@ But also this: assert response.text == "healthy" +Running a live server +--------------------- + +The test clients make use of HTTPX's ability to directly call into an ASGI app, without +having to run an actual server. In most cases this is sufficient but there are some +exceptions where this won't work, due to the limitations of the emulated client-server +communication. + +For example, when using server-sent events with an infinite generator, it will lock up +the test client, since HTTPX tries to consume the full response before returning a +request. + +Litestar offers two helper functions, +:func:`~litestar.testing.client.subprocess_client.subprocess_sync_client` and +:func:`~litestar.testing.client.subprocess_client.subprocess_async_client` that will +launch a Litestar instance with in a subprocess and set up an httpx client for running +tests. You can either load your actual app file or create subsets from it as you would +with the regular test client setup: + +.. literalinclude:: /examples/testing/subprocess_sse_app.py + :language: python + +.. literalinclude:: /examples/testing/test_subprocess_sse.py + :language: python + RequestFactory -------------- diff --git a/litestar/testing/__init__.py b/litestar/testing/__init__.py index d09fb90aa7..b3f586b755 100644 --- a/litestar/testing/__init__.py +++ b/litestar/testing/__init__.py @@ -1,5 +1,6 @@ from litestar.testing.client.async_client import AsyncTestClient from litestar.testing.client.base import BaseTestClient +from litestar.testing.client.subprocess_client import subprocess_async_client, subprocess_sync_client from litestar.testing.client.sync_client import TestClient from litestar.testing.helpers import create_async_test_client, create_test_client from litestar.testing.request_factory import RequestFactory @@ -13,4 +14,6 @@ "WebSocketTestSession", "create_async_test_client", "create_test_client", + "subprocess_sync_client", + "subprocess_async_client", ) diff --git a/litestar/testing/client/subprocess_client.py b/litestar/testing/client/subprocess_client.py new file mode 100644 index 0000000000..7f91c8f338 --- /dev/null +++ b/litestar/testing/client/subprocess_client.py @@ -0,0 +1,70 @@ +import pathlib +import socket +import subprocess +import time +from contextlib import asynccontextmanager, contextmanager +from typing import AsyncIterator, Iterator + +import httpx + + +class StartupError(RuntimeError): + pass + + +def _get_available_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + # Bind to a free port provided by the host + try: + sock.bind(("localhost", 0)) + except OSError as e: + raise StartupError("Could not find an open port") from e + else: + port: int = sock.getsockname()[1] + return port + + +@contextmanager +def run_app(workdir: pathlib.Path, app: str) -> Iterator[str]: + """Launch a litestar application in a subprocess with a random available port.""" + port = _get_available_port() + with subprocess.Popen( + args=["litestar", "--app", app, "run", "--port", str(port)], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + cwd=workdir, + ) as proc: + url = f"http://127.0.0.1:{port}" + for _ in range(100): + try: + httpx.get(url, timeout=0.1) + break + except httpx.TransportError: + time.sleep(1) + yield url + proc.kill() + + +@asynccontextmanager +async def subprocess_async_client(workdir: pathlib.Path, app: str) -> AsyncIterator[httpx.AsyncClient]: + """Provides an async httpx client for a litestar app launched in a subprocess. + + Args: + workdir: Path to the directory in which the app module resides. + app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app" + """ + with run_app(workdir=workdir, app=app) as url: + async with httpx.AsyncClient(base_url=url) as client: + yield client + + +@contextmanager +def subprocess_sync_client(workdir: pathlib.Path, app: str) -> Iterator[httpx.Client]: + """Provides a sync httpx client for a litestar app launched in a subprocess. + + Args: + workdir: Path to the directory in which the app module resides. + app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app" + """ + with run_app(workdir=workdir, app=app) as url, httpx.Client(base_url=url) as client: + yield client diff --git a/tests/unit/test_testing/test_sub_client/__init__.py b/tests/unit/test_testing/test_sub_client/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/test_testing/test_sub_client/demo.py b/tests/unit/test_testing/test_sub_client/demo.py new file mode 100644 index 0000000000..dce3de56cb --- /dev/null +++ b/tests/unit/test_testing/test_sub_client/demo.py @@ -0,0 +1,28 @@ +""" +Assemble components into an app that shall be tested +""" + +import asyncio +from typing import AsyncIterator + +from litestar import Litestar, get +from litestar.response import ServerSentEvent + + +@get("/notify/{topic:str}") +async def get_notified(topic: str) -> ServerSentEvent: + async def generator() -> AsyncIterator[str]: + yield topic + while True: + await asyncio.sleep(0.1) + + return ServerSentEvent(generator(), event_type="Notifier") + + +def create_test_app() -> Litestar: + return Litestar( + route_handlers=[get_notified], + ) + + +app = create_test_app() diff --git a/tests/unit/test_testing/test_sub_client/test_subprocess_client.py b/tests/unit/test_testing/test_sub_client/test_subprocess_client.py new file mode 100644 index 0000000000..4096ee1a61 --- /dev/null +++ b/tests/unit/test_testing/test_sub_client/test_subprocess_client.py @@ -0,0 +1,54 @@ +""" +Test the app running in a subprocess +""" + +import asyncio +import pathlib +import sys +from typing import AsyncIterator, Iterator + +import httpx +import httpx_sse +import pytest + +from litestar.testing import subprocess_async_client, subprocess_sync_client + +if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + +ROOT = pathlib.Path(__file__).parent + + +@pytest.fixture(name="async_client", scope="session") +async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]: + async with subprocess_async_client(workdir=ROOT, app="demo:app") as client: + yield client + + +@pytest.fixture(name="sync_client", scope="session") +def fx_sync_client() -> Iterator[httpx.Client]: + with subprocess_sync_client(workdir=ROOT, app="demo:app") as client: + yield client + + +async def test_subprocess_async_client(async_client: httpx.AsyncClient) -> None: + """Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the + regular async test client. + """ + + async with httpx_sse.aconnect_sse(async_client, "GET", "/notify/hello") as event_source: + async for event in event_source.aiter_sse(): + assert event.data == "hello" + break + + +def test_subprocess_sync_client(sync_client: httpx.Client) -> None: + """Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the + regular async test client. + """ + + with httpx_sse.connect_sse(sync_client, "GET", "/notify/hello") as event_source: + for event in event_source.iter_sse(): + assert event.data == "hello" + break