From 104d97907e7a68fceebcf2c7446431f792026e5e Mon Sep 17 00:00:00 2001 From: AlpinDale Date: Mon, 16 Dec 2024 03:15:09 +0000 Subject: [PATCH] api: error suppression cleanup + timeout suppression on aborts --- aphrodite/endpoints/openai/api_server.py | 5 ++--- aphrodite/endpoints/openai/rpc/client.py | 12 +++++++++++- tests/endpoints/openai/rpc/test_zmq_client.py | 11 ++++++----- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/aphrodite/endpoints/openai/api_server.py b/aphrodite/endpoints/openai/api_server.py index 1460b181d..486cb7214 100644 --- a/aphrodite/endpoints/openai/api_server.py +++ b/aphrodite/endpoints/openai/api_server.py @@ -7,7 +7,7 @@ import re import tempfile from argparse import Namespace -from contextlib import asynccontextmanager, suppress +from contextlib import asynccontextmanager from distutils.util import strtobool from http import HTTPStatus from typing import AsyncGenerator, AsyncIterator, List, Optional, Set, Tuple @@ -98,8 +98,7 @@ async def lifespan(app: FastAPI): async def _force_log(): while True: await asyncio.sleep(10) - with suppress(Exception): - await async_engine_client.do_log_stats() + await async_engine_client.do_log_stats() if not engine_args.disable_log_stats: task = asyncio.create_task(_force_log()) diff --git a/aphrodite/endpoints/openai/rpc/client.py b/aphrodite/endpoints/openai/rpc/client.py index edce34e26..caeebb34b 100644 --- a/aphrodite/endpoints/openai/rpc/client.py +++ b/aphrodite/endpoints/openai/rpc/client.py @@ -309,7 +309,17 @@ async def _get_lora_config_rpc(self) -> LoRAConfig: async def abort(self, request_id: str): """Send an ABORT_REQUEST signal to the RPC Server""" - with suppress(RPCClientClosedError): + # Suppress timeouts as well. + # In cases where the server is busy processing requests and a very + # large volume of abort requests arrive, it is likely that the server + # will not be able to ack all of them in time. We have seen this when + # we abort 20k requests at once while another 2k are processing- many + # of them time out, but we see the server successfully abort all of the + # requests. + # In this case we assume that the server has received or will receive + # these abort requests, and ignore the timeout. This prevents a massive + # wall of `TimeoutError` stack traces. + with suppress(RPCClientClosedError, TimeoutError): await self._send_one_way_rpc_request( request=RPCAbortRequest(request_id), error_message=f"RPCAbortRequest {request_id} failed") diff --git a/tests/endpoints/openai/rpc/test_zmq_client.py b/tests/endpoints/openai/rpc/test_zmq_client.py index 8006780f0..c097282fd 100644 --- a/tests/endpoints/openai/rpc/test_zmq_client.py +++ b/tests/endpoints/openai/rpc/test_zmq_client.py @@ -72,12 +72,13 @@ async def test_client_aborts_use_timeouts( # Hang all abort requests m.setattr(dummy_server, "abort", lambda x: None) m.setattr(client, "_data_timeout", 10) - # Ensure the client doesn't hang + + # The client should suppress timeouts on `abort`s + # and return normally, assuming the server will eventually + # abort the request. client_task = asyncio.get_running_loop().create_task( - client.abort("test request id") - ) - with pytest.raises(TimeoutError, match="Server didn't reply within"): - await asyncio.wait_for(client_task, timeout=0.05) + client.abort("test request id")) + await asyncio.wait_for(client_task, timeout=0.05) @pytest.mark.asyncio