-
-
Notifications
You must be signed in to change notification settings - Fork 389
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
342 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
""" | ||
Assemble components into an app that shall be tested | ||
""" | ||
|
||
from typing import AsyncIterator | ||
|
||
from redis.asyncio import Redis | ||
|
||
from litestar import Litestar, get | ||
from litestar.channels import ChannelsPlugin | ||
from litestar.channels.backends.redis import RedisChannelsPubSubBackend | ||
from litestar.response import ServerSentEvent | ||
|
||
|
||
@get("/notify/{topic:str}") | ||
async def get_notified(topic: str, channels: ChannelsPlugin) -> ServerSentEvent: | ||
async def generator() -> AsyncIterator[bytes]: | ||
async with channels.start_subscription([topic]) as subscriber: | ||
async for event in subscriber.iter_events(): | ||
yield event | ||
|
||
return ServerSentEvent(generator(), event_type="Notifier") | ||
|
||
|
||
def create_test_app() -> Litestar: | ||
redis_instance = Redis() | ||
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance) | ||
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True) | ||
|
||
return Litestar( | ||
route_handlers=[ | ||
get_notified, | ||
], | ||
plugins=[channels_instance], | ||
) | ||
|
||
|
||
app = create_test_app() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
""" | ||
Test the app running in a subprocess | ||
""" | ||
|
||
import asyncio | ||
import pathlib | ||
import sys | ||
from typing import AsyncIterator | ||
|
||
import httpx | ||
import httpx_sse | ||
import pytest | ||
from redis.asyncio import Redis | ||
|
||
from litestar.channels import ChannelsPlugin | ||
from litestar.channels.backends.redis import RedisChannelsPubSubBackend | ||
from litestar.testing import subprocess_async_client | ||
|
||
if sys.platform == "win32": | ||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | ||
|
||
pytestmark = pytest.mark.anyio | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def anyio_backend() -> str: | ||
return "asyncio" | ||
|
||
|
||
ROOT = pathlib.Path(__file__).parent | ||
|
||
|
||
@pytest.fixture(name="async_client", scope="session") | ||
async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]: | ||
async with subprocess_async_client(workdir=ROOT, app="subprocess_sse_app:app") as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture(name="redis_channels") | ||
async def fx_redis_channels() -> AsyncIterator[ChannelsPlugin]: | ||
# Expects separate redis set-up | ||
redis_instance = Redis() | ||
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance) | ||
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True) | ||
await channels_instance._on_startup() | ||
yield channels_instance | ||
await channels_instance._on_shutdown() | ||
|
||
|
||
async def test_subprocess_async_client(async_client: httpx.AsyncClient, redis_channels: ChannelsPlugin) -> None: | ||
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the | ||
regular async test client. | ||
""" | ||
topic = "demo" | ||
message = "hello" | ||
|
||
running = asyncio.Event() | ||
running.set() | ||
|
||
async def send_notifications() -> None: | ||
while running.is_set(): | ||
await redis_channels.wait_published(message, channels=[topic]) | ||
await asyncio.sleep(0.1) | ||
|
||
task = asyncio.create_task(send_notifications()) | ||
|
||
async with httpx_sse.aconnect_sse(async_client, "GET", f"/notify/{topic}") as event_source: | ||
async for event in event_source.aiter_sse(): | ||
assert event.data == message | ||
running.clear() | ||
break | ||
await task |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import pathlib | ||
import socket | ||
import subprocess | ||
import time | ||
from contextlib import asynccontextmanager, contextmanager | ||
from typing import AsyncIterator, Iterator | ||
|
||
import httpx | ||
|
||
|
||
class StartupError(RuntimeError): | ||
pass | ||
|
||
|
||
def _get_available_port() -> int: | ||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | ||
# Bind to a free port provided by the host | ||
try: | ||
sock.bind(("localhost", 0)) | ||
except OSError as e: | ||
raise StartupError("Could not find an open port") from e | ||
else: | ||
port: int = sock.getsockname()[1] | ||
return port | ||
|
||
|
||
@contextmanager | ||
def run_app(workdir: pathlib.Path, app: str) -> Iterator[str]: | ||
"""Launch a litestar application in a subprocess with a random available port.""" | ||
port = _get_available_port() | ||
proc = subprocess.Popen( | ||
args=["litestar", "--app", app, "run", "--port", str(port)], | ||
stderr=subprocess.PIPE, | ||
stdout=subprocess.PIPE, | ||
cwd=workdir, | ||
) | ||
url = f"http://127.0.0.1:{port}" | ||
for _ in range(100): | ||
try: | ||
httpx.get(url, timeout=0.1) | ||
break | ||
except httpx.TransportError: | ||
time.sleep(1) | ||
yield url | ||
proc.kill() | ||
|
||
|
||
@asynccontextmanager | ||
async def subprocess_async_client(workdir: pathlib.Path, app: str) -> AsyncIterator[httpx.AsyncClient]: | ||
"""Provides an async httpx client for a litestar app launched in a subprocess. | ||
Args: | ||
workdir: Path to the directory in which the app module resides. | ||
app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app" | ||
""" | ||
with run_app(workdir=workdir, app=app) as url: | ||
async with httpx.AsyncClient(base_url=url) as client: | ||
yield client | ||
|
||
|
||
@contextmanager | ||
def subprocess_sync_client(workdir: pathlib.Path, app: str) -> Iterator[httpx.Client]: | ||
"""Provides a sync httpx client for a litestar app launched in a subprocess. | ||
Args: | ||
workdir: Path to the directory in which the app module resides. | ||
app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app" | ||
""" | ||
with run_app(workdir=workdir, app=app) as url, httpx.Client(base_url=url) as client: | ||
yield client |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
""" | ||
Assemble components into an app that shall be tested | ||
""" | ||
|
||
from typing import AsyncIterator | ||
|
||
from redis.asyncio import Redis | ||
|
||
from litestar import Litestar, get | ||
from litestar.channels import ChannelsPlugin | ||
from litestar.channels.backends.redis import RedisChannelsPubSubBackend | ||
from litestar.response import ServerSentEvent | ||
|
||
|
||
@get("/notify/{topic:str}") | ||
async def get_notified(topic: str, channels: ChannelsPlugin) -> ServerSentEvent: | ||
async def generator() -> AsyncIterator[bytes]: | ||
async with channels.start_subscription([topic]) as subscriber: | ||
async for event in subscriber.iter_events(): | ||
yield event | ||
|
||
return ServerSentEvent(generator(), event_type="Notifier") | ||
|
||
|
||
def create_test_app() -> Litestar: | ||
redis_instance = Redis() | ||
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance) | ||
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True) | ||
|
||
return Litestar( | ||
route_handlers=[ | ||
get_notified, | ||
], | ||
plugins=[channels_instance], | ||
) | ||
|
||
|
||
app = create_test_app() |
108 changes: 108 additions & 0 deletions
108
tests/unit/test_testing/test_sub_client/test_subprocess_client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
""" | ||
Test the app running in a subprocess | ||
""" | ||
|
||
import asyncio | ||
import pathlib | ||
import sys | ||
import threading | ||
from typing import Any, AsyncIterator, Iterator | ||
|
||
import httpx | ||
import httpx_sse | ||
import pytest | ||
from redis.asyncio import Redis | ||
|
||
from litestar.channels import ChannelsPlugin | ||
from litestar.channels.backends.redis import RedisChannelsPubSubBackend | ||
from litestar.testing import subprocess_async_client, subprocess_sync_client | ||
|
||
if sys.platform == "win32": | ||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | ||
|
||
pytestmark = pytest.mark.anyio | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def anyio_backend() -> str: | ||
return "asyncio" | ||
|
||
|
||
ROOT = pathlib.Path(__file__).parent | ||
|
||
|
||
@pytest.fixture(name="async_client", scope="session") | ||
async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]: | ||
async with subprocess_async_client(workdir=ROOT, app="demo:app") as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture(name="sync_client", scope="session") | ||
def fx_sync_client() -> Iterator[httpx.Client]: | ||
with subprocess_sync_client(workdir=ROOT, app="demo:app") as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture(name="redis_channels") | ||
async def fx_redis_channels(redis_service: Any) -> AsyncIterator[ChannelsPlugin]: | ||
redis_instance = Redis() | ||
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance) | ||
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True) | ||
await channels_instance._on_startup() | ||
yield channels_instance | ||
await channels_instance._on_shutdown() | ||
|
||
|
||
async def test_subprocess_async_client(async_client: httpx.AsyncClient, redis_channels: ChannelsPlugin) -> None: | ||
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the | ||
regular async test client. | ||
""" | ||
topic = "demo" | ||
message = "hello" | ||
|
||
running = asyncio.Event() | ||
running.set() | ||
|
||
async def send_notifications() -> None: | ||
while running.is_set(): | ||
await redis_channels.wait_published(message, channels=[topic]) | ||
await asyncio.sleep(0.1) | ||
|
||
task = asyncio.create_task(send_notifications()) | ||
|
||
async with httpx_sse.aconnect_sse(async_client, "GET", f"/notify/{topic}") as event_source: | ||
async for event in event_source.aiter_sse(): | ||
assert event.data == message | ||
running.clear() | ||
break | ||
await task | ||
|
||
|
||
async def test_subprocess_sync_client(sync_client: httpx.Client, redis_channels: ChannelsPlugin) -> None: | ||
"""Demonstrates functionality of the sync client with an infinite SSE source that cannot be tested with the | ||
regular sync test client. | ||
""" | ||
topic = "demo" | ||
message = "hello" | ||
|
||
running = threading.Event() | ||
running.set() | ||
|
||
async def send_notifications() -> None: | ||
while running.is_set(): | ||
await redis_channels.wait_published(message, channels=[topic]) | ||
await asyncio.sleep(0.1) | ||
|
||
task = asyncio.create_task(send_notifications()) | ||
|
||
def consume_notifications() -> None: | ||
with httpx_sse.connect_sse(sync_client, "GET", f"/notify/{topic}") as event_source: | ||
for event in event_source.iter_sse(): | ||
assert event.data == message | ||
running.clear() | ||
break | ||
|
||
thread_consume = threading.Thread(target=consume_notifications, daemon=True) | ||
thread_consume.start() | ||
await task | ||
thread_consume.join() |