diff --git a/integration_tests/base_routes.py b/integration_tests/base_routes.py index 44e512a0..04cfc8c8 100644 --- a/integration_tests/base_routes.py +++ b/integration_tests/base_routes.py @@ -1090,30 +1090,25 @@ def create_item(request, body: CreateItemBody, query: CreateItemQueryParamsParam # --- Streaming responses --- + @app.get("/stream/sync") async def sync_stream(): def generator(): for i in range(5): yield f"Chunk {i}\n".encode() - + headers = Headers({"Content-Type": "text/plain"}) - return StreamingResponse( - status_code=200, - description=generator(), - headers=headers - ) + return StreamingResponse(status_code=200, description=generator(), headers=headers) + @app.get("/stream/async") async def async_stream(): async def generator(): for i in range(5): yield f"Async Chunk {i}\n".encode() - - return StreamingResponse( - status_code=200, - headers={"Content-Type": "text/plain"}, - description=generator() - ) + + return StreamingResponse(status_code=200, headers={"Content-Type": "text/plain"}, description=generator()) + @app.get("/stream/mixed") async def mixed_stream(): @@ -1122,12 +1117,9 @@ async def generator(): yield "String chunk\n".encode() yield str(42).encode() + b"\n" yield json.dumps({"message": "JSON chunk", "number": 123}).encode() + b"\n" - - return StreamingResponse( - status_code=200, - headers={"Content-Type": "text/plain"}, - description=generator() - ) + + return StreamingResponse(status_code=200, headers={"Content-Type": "text/plain"}, description=generator()) + @app.get("/stream/events") async def server_sent_events(): @@ -1135,74 +1127,63 @@ async def event_generator(): import asyncio import json import time - + # Regular event yield f"event: message\ndata: {json.dumps({'time': time.time(), 'type': 'start'})}\n\n".encode() await asyncio.sleep(1) - + # Event with ID yield f"id: 1\nevent: update\ndata: {json.dumps({'progress': 50})}\n\n".encode() await asyncio.sleep(1) - + # Multiple data lines - data = json.dumps({'status': 'complete', 'results': [1, 2, 3]}, indent=2) + data = json.dumps({"status": "complete", "results": [1, 2, 3]}, indent=2) yield f"event: complete\ndata: {data}\n\n".encode() - + return StreamingResponse( - status_code=200, - headers={ - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - "Connection": "keep-alive" - }, - description=event_generator() + status_code=200, headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive"}, description=event_generator() ) + @app.get("/stream/large-file") async def stream_large_file(): async def file_generator(): # Simulate streaming a large file in chunks chunk_size = 1024 # 1KB chunks total_size = 10 * chunk_size # 10KB total - + for offset in range(0, total_size, chunk_size): # Simulate reading file chunk chunk = b"X" * min(chunk_size, total_size - offset) yield chunk - + return StreamingResponse( status_code=200, - headers={ - "Content-Type": "application/octet-stream", - "Content-Disposition": "attachment; filename=large-file.bin" - }, - description=file_generator() + headers={"Content-Type": "application/octet-stream", "Content-Disposition": "attachment; filename=large-file.bin"}, + description=file_generator(), ) + @app.get("/stream/csv") async def stream_csv(): async def csv_generator(): # CSV header yield "id,name,value\n".encode() - + import asyncio import random - + # Generate rows for i in range(5): await asyncio.sleep(0.5) # Simulate data processing row = f"{i},item-{i},{random.randint(1, 100)}\n" yield row.encode() - + return StreamingResponse( - status_code=200, - headers={ - "Content-Type": "text/csv", - "Content-Disposition": "attachment; filename=data.csv" - }, - description=csv_generator() + status_code=200, headers={"Content-Type": "text/csv", "Content-Disposition": "attachment; filename=data.csv"}, description=csv_generator() ) + def main(): app.set_response_header("server", "robyn") app.serve_directory( diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index c450919d..77c661e0 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -8,12 +8,10 @@ from typing import List import pytest -import pytest_asyncio -from robyn import Robyn -from integration_tests.base_routes import app from integration_tests.helpers.network_helpers import get_network_host + def spawn_process(command: List[str]) -> subprocess.Popen: if platform.system() == "Windows": command[0] = "python" @@ -129,4 +127,3 @@ def env_file(): env_path.unlink() del os.environ["ROBYN_PORT"] del os.environ["ROBYN_HOST"] - diff --git a/integration_tests/test_streaming_responses.py b/integration_tests/test_streaming_responses.py index 4e16f950..be5d8d0a 100644 --- a/integration_tests/test_streaming_responses.py +++ b/integration_tests/test_streaming_responses.py @@ -19,6 +19,7 @@ # Mark all tests in this module as async pytestmark = pytest.mark.asyncio + async def test_sync_stream(): """Test basic synchronous streaming response.""" async with aiohttp.ClientSession() as client: @@ -34,6 +35,7 @@ async def test_sync_stream(): for i, chunk in enumerate(chunks): assert chunk == f"Chunk {i}\n" + async def test_async_stream(): """Test asynchronous streaming response.""" async with aiohttp.ClientSession() as client: @@ -49,6 +51,7 @@ async def test_async_stream(): for i, chunk in enumerate(chunks): assert chunk == f"Async Chunk {i}\n" + async def test_mixed_stream(): """Test streaming of mixed content types.""" async with aiohttp.ClientSession() as client: @@ -56,12 +59,7 @@ async def test_mixed_stream(): assert response.status == 200 assert response.headers["Content-Type"] == "text/plain" - expected = [ - b"Binary chunk\n", - b"String chunk\n", - b"42\n", - json.dumps({"message": "JSON chunk", "number": 123}).encode() + b"\n" - ] + expected = [b"Binary chunk\n", b"String chunk\n", b"42\n", json.dumps({"message": "JSON chunk", "number": 123}).encode() + b"\n"] chunks = [] async for chunk in response.content: @@ -71,6 +69,7 @@ async def test_mixed_stream(): for chunk, expected_chunk in zip(chunks, expected): assert chunk == expected_chunk + async def test_server_sent_events(): """Test Server-Sent Events (SSE) streaming.""" async with aiohttp.ClientSession() as client: @@ -103,6 +102,7 @@ async def test_server_sent_events(): assert event_data["status"] == "complete" assert event_data["results"] == [1, 2, 3] + async def test_large_file_stream(): """Test streaming of large files in chunks.""" async with aiohttp.ClientSession() as client: @@ -118,6 +118,7 @@ async def test_large_file_stream(): assert total_size == 10 * 1024 # 10KB total + async def test_csv_stream(): """Test streaming of CSV data.""" async with aiohttp.ClientSession() as client: @@ -132,11 +133,11 @@ async def test_csv_stream(): # Verify header assert lines[0] == "id,name,value" - + # Verify data rows assert len(lines) == 6 # Header + 5 data rows for i, line in enumerate(lines[1:], 0): - id_, name, value = line.split(',') + id_, name, value = line.split(",") assert int(id_) == i assert name == f"item-{i}" - assert 1 <= int(value) <= 100 \ No newline at end of file + assert 1 <= int(value) <= 100 diff --git a/robyn/router.py b/robyn/router.py index ea1ba074..be931015 100644 --- a/robyn/router.py +++ b/robyn/router.py @@ -53,13 +53,13 @@ def _format_tuple_response(self, res: tuple) -> Union[Response, StreamingRespons description, headers, status_code = res formatted_response = self._format_response(description) - + # Handle StreamingResponse case if isinstance(formatted_response, StreamingResponse): formatted_response.headers.update(headers) formatted_response.status_code = status_code return formatted_response - + # Regular Response case new_headers: Headers = Headers(headers) if new_headers.contains("Content-Type"):