Skip to content

Commit

Permalink
[Frontend] Added MQEngineBatchError to improve stacktrace readability
Browse files Browse the repository at this point in the history
Signed-off-by: Wallas Santos <[email protected]>
  • Loading branch information
wallashss committed Oct 3, 2024
1 parent 7f60520 commit 0b18f78
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
6 changes: 3 additions & 3 deletions tests/mq_llm_engine/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from vllm import SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.llm_engine import LLMEngine
from vllm.engine.multiprocessing import MQEngineDeadError
from vllm.engine.multiprocessing import MQEngineBatchError, MQEngineDeadError
from vllm.engine.multiprocessing.engine import MQLLMEngine
from vllm.entrypoints.openai.api_server import build_async_engine_client
from vllm.entrypoints.openai.cli_args import make_arg_parser
Expand All @@ -22,7 +22,7 @@

MODEL = "google/gemma-1.1-2b-it"
ENGINE_ARGS = AsyncEngineArgs(model=MODEL)
RAISED_ERROR = KeyError
RAISED_ERROR = MQEngineBatchError
RAISED_VALUE = "foo"


Expand Down Expand Up @@ -164,7 +164,7 @@ async def test_failed_abort(tmp_socket):
sampling_params=SamplingParams(max_tokens=10),
request_id=uuid.uuid4()):
pass
assert "KeyError" in repr(execinfo.value)
assert "MQEngineDeadError" in repr(execinfo.value)
assert client.errored

# This should raise the original error.
Expand Down
6 changes: 6 additions & 0 deletions vllm/engine/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
IPC_DATA_EXT = "_data_socket"


# Generic exception when the engine
# fails to process a batch
class MQEngineBatchError(Exception):
pass


class MQEngineDeadError(RuntimeError):
pass

Expand Down
25 changes: 21 additions & 4 deletions vllm/engine/multiprocessing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT,
IPC_HEALTH_EXT, IPC_INPUT_EXT,
IPC_OUTPUT_EXT, RPC_REQUEST_T,
VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
RPCError, RPCProcessRequest,
RPCStartupRequest, RPCStartupResponse,
VLLM_RPC_SUCCESS_STR,
MQEngineBatchError, MQEngineDeadError,
RPCAbortRequest, RPCError,
RPCProcessRequest, RPCStartupRequest,
RPCStartupResponse,
RPCUProfileRequest)
# yapf: enable
from vllm.envs import VLLM_RPC_TIMEOUT
Expand Down Expand Up @@ -203,8 +205,23 @@ async def run_output_handler_loop(self):
self._errored_with = exception

if request_id is None:

for queue_i in tuple(self.output_queues.values()):
queue_i.put_nowait(exception)

msg = str("A batch generation failed. Inspect the "
"stacktrace to find the original error: "
f"{repr(exception)}")
# If it is a runtime exception, we assume that
# the engine is already dead, let's pass this
# information ahead. Otherwise we just set as
# batch error, and maybe the engine is still
# up running.
# For runtime exceptions vLLM process will
# shutdown immediately.
batch_error = MQEngineDeadError(msg) if isinstance(
exception,
RuntimeError) else MQEngineBatchError(msg)
queue_i.put_nowait(batch_error)
else:
queue = self.output_queues.get(request_id)
if queue is not None:
Expand Down
18 changes: 14 additions & 4 deletions vllm/entrypoints/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from vllm import envs
from vllm.engine.async_llm_engine import AsyncEngineDeadError
from vllm.engine.multiprocessing import MQEngineDeadError
from vllm.engine.multiprocessing import MQEngineBatchError, MQEngineDeadError
from vllm.logger import init_logger
from vllm.utils import find_process_using_port

Expand All @@ -28,7 +28,7 @@ async def serve_http(app: FastAPI, **uvicorn_kwargs: Any):

config = uvicorn.Config(app, **uvicorn_kwargs)
server = uvicorn.Server(config)
_add_shutdown_handlers(app, server)
_add_exception_handles(app, server)

loop = asyncio.get_running_loop()

Expand Down Expand Up @@ -58,8 +58,9 @@ async def dummy_shutdown() -> None:
return server.shutdown()


def _add_shutdown_handlers(app: FastAPI, server: uvicorn.Server) -> None:
"""Adds handlers for fatal errors that should crash the server"""
def _add_exception_handles(app: FastAPI, server: uvicorn.Server) -> None:
"""Adds handlers for custom errors that may crash the server or
improve the readability of the stacktrace"""

@app.exception_handler(RuntimeError)
async def runtime_error_handler(request: Request, __):
Expand Down Expand Up @@ -101,3 +102,12 @@ async def mq_engine_dead_handler(_, __):
server.should_exit = True

return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

@app.exception_handler(MQEngineBatchError)
async def mq_engine_batch_error_handler(_, err):
"""Log the error and pass an internal server error.
This error might be propagated to all requests of
a batch that failed to generate"""
logger.error("%s", repr(err))

return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

0 comments on commit 0b18f78

Please sign in to comment.