Skip to content

Commit

Permalink
api: error suppression cleanup + timeout suppression on aborts
Browse files Browse the repository at this point in the history
  • Loading branch information
AlpinDale committed Dec 16, 2024
1 parent ab533e0 commit 104d979
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
5 changes: 2 additions & 3 deletions aphrodite/endpoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import tempfile
from argparse import Namespace
from contextlib import asynccontextmanager, suppress
from contextlib import asynccontextmanager
from distutils.util import strtobool
from http import HTTPStatus
from typing import AsyncGenerator, AsyncIterator, List, Optional, Set, Tuple
Expand Down Expand Up @@ -98,8 +98,7 @@ async def lifespan(app: FastAPI):
async def _force_log():
while True:
await asyncio.sleep(10)
with suppress(Exception):
await async_engine_client.do_log_stats()
await async_engine_client.do_log_stats()

if not engine_args.disable_log_stats:
task = asyncio.create_task(_force_log())
Expand Down
12 changes: 11 additions & 1 deletion aphrodite/endpoints/openai/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,17 @@ async def _get_lora_config_rpc(self) -> LoRAConfig:

async def abort(self, request_id: str):
"""Send an ABORT_REQUEST signal to the RPC Server"""
with suppress(RPCClientClosedError):
# Suppress timeouts as well.
# In cases where the server is busy processing requests and a very
# large volume of abort requests arrive, it is likely that the server
# will not be able to ack all of them in time. We have seen this when
# we abort 20k requests at once while another 2k are processing- many
# of them time out, but we see the server successfully abort all of the
# requests.
# In this case we assume that the server has received or will receive
# these abort requests, and ignore the timeout. This prevents a massive
# wall of `TimeoutError` stack traces.
with suppress(RPCClientClosedError, TimeoutError):
await self._send_one_way_rpc_request(
request=RPCAbortRequest(request_id),
error_message=f"RPCAbortRequest {request_id} failed")
Expand Down
11 changes: 6 additions & 5 deletions tests/endpoints/openai/rpc/test_zmq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ async def test_client_aborts_use_timeouts(
# Hang all abort requests
m.setattr(dummy_server, "abort", lambda x: None)
m.setattr(client, "_data_timeout", 10)
# Ensure the client doesn't hang

# The client should suppress timeouts on `abort`s
# and return normally, assuming the server will eventually
# abort the request.
client_task = asyncio.get_running_loop().create_task(
client.abort("test request id")
)
with pytest.raises(TimeoutError, match="Server didn't reply within"):
await asyncio.wait_for(client_task, timeout=0.05)
client.abort("test request id"))
await asyncio.wait_for(client_task, timeout=0.05)


@pytest.mark.asyncio
Expand Down

0 comments on commit 104d979

Please sign in to comment.