Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added subprocess test client #3655

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/examples/testing/subprocess_sse_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Assemble components into an app that shall be tested
"""

from typing import AsyncIterator

from redis.asyncio import Redis

from litestar import Litestar, get
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.response import ServerSentEvent


@get("/notify/{topic:str}")
async def get_notified(topic: str, channels: ChannelsPlugin) -> ServerSentEvent:
async def generator() -> AsyncIterator[bytes]:
async with channels.start_subscription([topic]) as subscriber:
async for event in subscriber.iter_events():
yield event

return ServerSentEvent(generator(), event_type="Notifier")


def create_test_app() -> Litestar:
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)

return Litestar(
route_handlers=[
get_notified,
],
plugins=[channels_instance],
)


app = create_test_app()
72 changes: 72 additions & 0 deletions docs/examples/testing/test_subprocess_sse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Test the app running in a subprocess
"""

import asyncio
import pathlib
import sys
from typing import AsyncIterator

import httpx
import httpx_sse
import pytest
from redis.asyncio import Redis

from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.testing import subprocess_async_client

if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

pytestmark = pytest.mark.anyio


@pytest.fixture(scope="session")
def anyio_backend() -> str:
return "asyncio"


ROOT = pathlib.Path(__file__).parent


@pytest.fixture(name="async_client", scope="session")
async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]:
async with subprocess_async_client(workdir=ROOT, app="subprocess_sse_app:app") as client:
yield client


@pytest.fixture(name="redis_channels")
async def fx_redis_channels() -> AsyncIterator[ChannelsPlugin]:
# Expects separate redis set-up
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)
await channels_instance._on_startup()
yield channels_instance
await channels_instance._on_shutdown()


async def test_subprocess_async_client(async_client: httpx.AsyncClient, redis_channels: ChannelsPlugin) -> None:
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the
regular async test client.
"""
topic = "demo"
message = "hello"

running = asyncio.Event()
running.set()

async def send_notifications() -> None:
while running.is_set():
await redis_channels.wait_published(message, channels=[topic])
await asyncio.sleep(0.1)

task = asyncio.create_task(send_notifications())

async with httpx_sse.aconnect_sse(async_client, "GET", f"/notify/{topic}") as event_source:
async for event in event_source.aiter_sse():
assert event.data == message
running.clear()
break
await task
2 changes: 1 addition & 1 deletion docs/reference/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ testing


.. automodule:: litestar.testing
:members: RequestFactory, BaseTestClient, TestClient, AsyncTestClient, create_async_test_client, create_test_client
:members: RequestFactory, BaseTestClient, TestClient, AsyncTestClient, create_async_test_client, create_test_client, subprocess_sync_client, subprocess_async_client
:undoc-members: WebSocketTestSession


Expand Down
25 changes: 25 additions & 0 deletions docs/usage/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,31 @@ But also this:
assert response.text == "healthy"


Running a live server
---------------------

The test clients make use of HTTPX's ability to directly call into an ASGI app, without
having to run an actual server. In most cases this is sufficient but there are some
exceptions where this won't work, due to the limitations of the emulated client-server
communication.

For example, when using server-sent events with an infinite generator, it will lock up
the test client, since HTTPX tries to consume the full response before returning a
request.

Litestar offers two helper functions,
:func:`~litestar.testing.client.subprocess_client.subprocess_sync_client` and
:func:`~litestar.testing.client.subprocess_client.subprocess_async_client` that will
launch a Litestar instance with in a subprocess and set up an httpx client for running
tests. You can either load your actual app file or create subsets from it as you would
with the regular test client setup:

.. literalinclude:: /examples/testing/subprocess_sse_app.py
:language: python

.. literalinclude:: /examples/testing/test_subprocess_sse.py
:language: python

RequestFactory
--------------

Expand Down
3 changes: 3 additions & 0 deletions litestar/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from litestar.testing.client.async_client import AsyncTestClient
from litestar.testing.client.base import BaseTestClient
from litestar.testing.client.subprocess_client import subprocess_async_client, subprocess_sync_client
from litestar.testing.client.sync_client import TestClient
from litestar.testing.helpers import create_async_test_client, create_test_client
from litestar.testing.request_factory import RequestFactory
Expand All @@ -13,4 +14,6 @@
"WebSocketTestSession",
"create_async_test_client",
"create_test_client",
"subprocess_sync_client",
"subprocess_async_client",
)
70 changes: 70 additions & 0 deletions litestar/testing/client/subprocess_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pathlib
import socket
import subprocess
import time
from contextlib import asynccontextmanager, contextmanager
from typing import AsyncIterator, Iterator

import httpx


class StartupError(RuntimeError):
pass


def _get_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Bind to a free port provided by the host
try:
sock.bind(("localhost", 0))
except OSError as e:
raise StartupError("Could not find an open port") from e
else:
port: int = sock.getsockname()[1]
return port


@contextmanager
def run_app(workdir: pathlib.Path, app: str) -> Iterator[str]:
"""Launch a litestar application in a subprocess with a random available port."""
port = _get_available_port()
with subprocess.Popen(
args=["litestar", "--app", app, "run", "--port", str(port)],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
cwd=workdir,
) as proc:
url = f"http://127.0.0.1:{port}"
for _ in range(100):
try:
httpx.get(url, timeout=0.1)
break
except httpx.TransportError:
time.sleep(1)
yield url
proc.kill()


@asynccontextmanager
async def subprocess_async_client(workdir: pathlib.Path, app: str) -> AsyncIterator[httpx.AsyncClient]:
"""Provides an async httpx client for a litestar app launched in a subprocess.

Args:
workdir: Path to the directory in which the app module resides.
app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app"
"""
with run_app(workdir=workdir, app=app) as url:
async with httpx.AsyncClient(base_url=url) as client:
yield client


@contextmanager
def subprocess_sync_client(workdir: pathlib.Path, app: str) -> Iterator[httpx.Client]:
"""Provides a sync httpx client for a litestar app launched in a subprocess.

Args:
workdir: Path to the directory in which the app module resides.
app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app"
"""
with run_app(workdir=workdir, app=app) as url, httpx.Client(base_url=url) as client:
yield client
Empty file.
28 changes: 28 additions & 0 deletions tests/unit/test_testing/test_sub_client/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Assemble components into an app that shall be tested
"""

import asyncio
from typing import AsyncIterator

from litestar import Litestar, get
from litestar.response import ServerSentEvent


@get("/notify/{topic:str}")
async def get_notified(topic: str) -> ServerSentEvent:
async def generator() -> AsyncIterator[str]:
yield topic
while True:
await asyncio.sleep(0.1)

return ServerSentEvent(generator(), event_type="Notifier")


def create_test_app() -> Litestar:
return Litestar(
route_handlers=[get_notified],
)


app = create_test_app()
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Test the app running in a subprocess
"""

import asyncio
import pathlib
import sys
from typing import AsyncIterator, Iterator

import httpx
import httpx_sse
import pytest

from litestar.testing import subprocess_async_client, subprocess_sync_client

if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


ROOT = pathlib.Path(__file__).parent


@pytest.fixture(name="async_client", scope="session")
async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]:
async with subprocess_async_client(workdir=ROOT, app="demo:app") as client:
yield client


@pytest.fixture(name="sync_client", scope="session")
def fx_sync_client() -> Iterator[httpx.Client]:
with subprocess_sync_client(workdir=ROOT, app="demo:app") as client:
yield client


async def test_subprocess_async_client(async_client: httpx.AsyncClient) -> None:
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the
regular async test client.
"""

async with httpx_sse.aconnect_sse(async_client, "GET", "/notify/hello") as event_source:
async for event in event_source.aiter_sse():
assert event.data == "hello"
break


def test_subprocess_sync_client(sync_client: httpx.Client) -> None:
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the
regular async test client.
"""

with httpx_sse.connect_sse(sync_client, "GET", "/notify/hello") as event_source:
for event in event_source.iter_sse():
assert event.data == "hello"
break
Loading