Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] [6/N] API Server: Better Shutdown #11586

Merged
merged 13 commits into from
Dec 30, 2024
44 changes: 12 additions & 32 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
is_valid_ipv6_address, kill_process_tree, set_ulimit)
is_valid_ipv6_address, set_ulimit)
from vllm.version import __version__ as VLLM_VERSION

TIMEOUT_KEEP_ALIVE = 5 # seconds
Expand Down Expand Up @@ -133,32 +133,21 @@ async def build_async_engine_client_from_engine_args(
Returns the Client or None if the creation failed.
"""

# Fall back
# TODO: fill out feature matrix.
# AsyncLLMEngine.
if (MQLLMEngineClient.is_unsupported_config(engine_args)
or envs.VLLM_USE_V1 or disable_frontend_multiprocessing):
engine_config = engine_args.create_engine_config(
UsageContext.OPENAI_API_SERVER)
uses_ray = getattr(AsyncLLMEngine._get_executor_cls(engine_config),
"uses_ray", False)

build_engine = partial(AsyncLLMEngine.from_engine_args,
engine_args=engine_args,
engine_config=engine_config,
usage_context=UsageContext.OPENAI_API_SERVER)
if uses_ray:
# Must run in main thread with ray for its signal handlers to work
engine_client = build_engine()
else:
engine_client = await asyncio.get_running_loop().run_in_executor(
None, build_engine)

yield engine_client
if hasattr(engine_client, "shutdown"):
engine_client.shutdown()
return
engine_client: Optional[EngineClient] = None
try:
engine_client = AsyncLLMEngine.from_engine_args(
engine_args=engine_args,
usage_context=UsageContext.OPENAI_API_SERVER)
yield engine_client
finally:
if engine_client and hasattr(engine_client, "shutdown"):
engine_client.shutdown()

# Otherwise, use the multiprocessing AsyncLLMEngine.
# MQLLMEngine.
else:
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
# Make TemporaryDirectory for prometheus multiprocessing
Expand Down Expand Up @@ -737,15 +726,6 @@ def signal_handler(*_) -> None:

signal.signal(signal.SIGTERM, signal_handler)

# The child processes will send SIGQUIT to this process when
# any error happens. This process then clean up the whole tree.
# TODO(rob): move this into AsyncLLM.__init__ once we remove
# the context manager below.
def sigquit_handler(signum, frame):
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

async with build_async_engine_client(args) as engine_client:
app = build_app(args)

Expand Down
25 changes: 22 additions & 3 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import os
import signal
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union

from vllm.config import ModelConfig, VllmConfig
Expand All @@ -16,6 +18,7 @@
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.utils import kill_process_tree
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
Expand All @@ -38,6 +41,22 @@ def __init__(
log_requests: bool = True,
start_engine_loop: bool = True,
) -> None:

# The child processes will send SIGQUIT when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO: rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.
def sigquit_handler(signum, frame):
logger.fatal(
"AsyncLLM got SIGQUIT from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

assert start_engine_loop

self.log_requests = log_requests
Expand Down Expand Up @@ -274,9 +293,9 @@ async def _run_output_handler(self):
# 4) Abort any requests that finished due to stop strings.
await self.engine_core.abort_requests_async(reqs_to_abort)

except BaseException as e:
logger.error(e)
raise e
except Exception as e:
logger.exception("EngineCore output handler hit an error: %s", e)
kill_process_tree(os.getpid())

async def abort(self, request_id: str) -> None:
"""Abort RequestId in self, detokenizer, and engine core."""
Expand Down
16 changes: 6 additions & 10 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.utils import get_open_zmq_ipc_path
from vllm.utils import get_open_zmq_ipc_path, make_zmq_socket
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
EngineCoreProfile, EngineCoreRequest,
EngineCoreRequestType, EngineCoreRequestUnion)
Expand Down Expand Up @@ -144,17 +144,13 @@ def __init__(
else:
self.ctx = zmq.Context() # type: ignore[attr-defined]

# Path for IPC.
# Paths and sockets for IPC.
output_path = get_open_zmq_ipc_path()
input_path = get_open_zmq_ipc_path()

# Get output (EngineCoreOutput) from EngineCore.
self.output_socket = self.ctx.socket(zmq.constants.PULL)
self.output_socket.connect(output_path)

# Send input (EngineCoreRequest) to EngineCore.
self.input_socket = self.ctx.socket(zmq.constants.PUSH)
self.input_socket.bind(input_path)
self.output_socket = make_zmq_socket(self.ctx, output_path,
zmq.constants.PULL)
self.input_socket = make_zmq_socket(self.ctx, input_path,
zmq.constants.PUSH)

# Start EngineCore in background process.
self.proc_handle: Optional[BackgroundProcHandle]
Expand Down
Loading