Skip to content

Commit

Permalink
feat: WebSocket send stream (#3894)
Browse files Browse the repository at this point in the history
  • Loading branch information
provinzkraut authored Dec 12, 2024
1 parent b79519d commit f31ef97
Show file tree
Hide file tree
Showing 14 changed files with 765 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@
"litestar.concurrency.set_asyncio_executor": {"ThreadPoolExecutor"},
"litestar.concurrency.get_asyncio_executor": {"ThreadPoolExecutor"},
re.compile(r"litestar\.channels\.backends\.asyncpg.*"): {"asyncpg.connection.Connection", "asyncpg.Connection"},
re.compile(r"litestar\.handlers\.websocket_handlers\.stream.*"): {"WebSocketMode"},
}

# Do not warn about broken links to the following:
Expand Down
26 changes: 26 additions & 0 deletions docs/examples/websockets/stream_and_receive_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio
import time
from typing import Any, AsyncGenerator

from litestar import Litestar, WebSocket, websocket_listener
from litestar.handlers import send_websocket_stream


async def listener_lifespan(socket: WebSocket) -> None:
async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
while True:
yield {"time": time.time()}
await asyncio.sleep(0.5)

task = asyncio.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
yield
task.cancel()
await task


@websocket_listener("/", connection_lifespan=listener_lifespan)
def handler(socket: WebSocket, data: Any) -> None:
print(f"{socket.client}: {data}")


app = Litestar([handler])
27 changes: 27 additions & 0 deletions docs/examples/websockets/stream_and_receive_raw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
import time
from typing import AsyncGenerator

from litestar import Litestar, WebSocket, websocket
from litestar.handlers import send_websocket_stream


@websocket("/")
async def handler(socket: WebSocket) -> None:
await socket.accept()

async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
while True:
yield {"time": time.time()}
await asyncio.sleep(0.5)

async def handle_receive() -> None:
async for event in socket.iter_json():
print(f"{socket.client}: {event}")

async with asyncio.TaskGroup() as tg:
tg.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
tg.create_task(handle_receive())


app = Litestar([handler])
15 changes: 15 additions & 0 deletions docs/examples/websockets/stream_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
import time
from typing import AsyncGenerator

from litestar import Litestar, websocket_stream


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
while True:
yield time.time()
await asyncio.sleep(0.5)


app = Litestar([ping])
23 changes: 23 additions & 0 deletions docs/examples/websockets/stream_di_hog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import asyncio
from typing import AsyncGenerator

from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream

RESOURCE_LOCK = asyncio.Lock()


async def acquire_lock() -> AsyncGenerator[None, None]:
async with RESOURCE_LOCK:
yield


@websocket_stream("/")
async def ping(lock: asyncio.Lock) -> AsyncGenerator[float, None]:
while True:
alive = await ping_external_resource()
yield alive
await asyncio.sleep(1)


app = Litestar([ping], dependencies={"lock": acquire_lock})
19 changes: 19 additions & 0 deletions docs/examples/websockets/stream_di_hog_fix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio
from typing import AsyncGenerator

from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream

RESOURCE_LOCK = asyncio.Lock()


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
while True:
async with RESOURCE_LOCK:
alive = await ping_external_resource()
yield alive
await asyncio.sleep(1)


app = Litestar([ping])
15 changes: 15 additions & 0 deletions docs/examples/websockets/stream_socket_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
import time
from typing import Any, AsyncGenerator

from litestar import Litestar, WebSocket, websocket_stream


@websocket_stream("/")
async def ping(socket: WebSocket) -> AsyncGenerator[dict[str, Any], None]:
while True:
yield {"time": time.time(), "client": socket.client}
await asyncio.sleep(0.5)


app = Litestar([ping])
Loading

0 comments on commit f31ef97

Please sign in to comment.