diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 88f0bd4ee4dbe..48aa904d4721d 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -43,7 +43,7 @@ OpenAIServingTokenization) from vllm.logger import init_logger from vllm.usage.usage_lib import UsageContext -from vllm.utils import FlexibleArgumentParser, get_open_port +from vllm.utils import FlexibleArgumentParser, get_open_zmq_ipc_path from vllm.version import __version__ as VLLM_VERSION TIMEOUT_KEEP_ALIVE = 5 # seconds @@ -106,16 +106,20 @@ async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]: # Otherwise, use the multiprocessing AsyncLLMEngine. else: + # Select random path for IPC. + rpc_path = get_open_zmq_ipc_path() + logger.info("Multiprocessing frontend to use %s for RPC Path.", + rpc_path) + # Start RPCServer in separate process (holds the AsyncLLMEngine). - port = get_open_port(envs.VLLM_RPC_PORT) rpc_server_process = Process(target=run_rpc_server, args=(engine_args, UsageContext.OPENAI_API_SERVER, - port)) + rpc_path)) rpc_server_process.start() # Build RPCClient, which conforms to AsyncEngineClient Protocol. - async_engine_client = AsyncEngineRPCClient(port) + async_engine_client = AsyncEngineRPCClient(rpc_path) await async_engine_client.setup() try: diff --git a/vllm/entrypoints/openai/rpc/client.py b/vllm/entrypoints/openai/rpc/client.py index 043649131560c..8552c286eeeea 100644 --- a/vllm/entrypoints/openai/rpc/client.py +++ b/vllm/entrypoints/openai/rpc/client.py @@ -21,9 +21,9 @@ class AsyncEngineRPCClient: - def __init__(self, port: int): + def __init__(self, rpc_path: str): self.context = zmq.asyncio.Context() - self.path = f"tcp://localhost:{port}" + self.rpc_path = rpc_path async def setup(self): """Setup the client before it starts sending server requests.""" @@ -58,7 +58,7 @@ def socket(self): # to enable streaming. socket = self.context.socket(zmq.constants.DEALER) try: - socket.connect(self.path) + socket.connect(self.rpc_path) yield socket finally: socket.close() diff --git a/vllm/entrypoints/openai/rpc/server.py b/vllm/entrypoints/openai/rpc/server.py index 60bb23b9bde05..617c9b7070e2c 100644 --- a/vllm/entrypoints/openai/rpc/server.py +++ b/vllm/entrypoints/openai/rpc/server.py @@ -20,7 +20,7 @@ class AsyncEngineRPCServer: def __init__(self, async_engine_args: AsyncEngineArgs, - usage_context: UsageContext, port: int): + usage_context: UsageContext, rpc_path: str): # Initialize engine first. self.engine = AsyncLLMEngine.from_engine_args(async_engine_args, usage_context) @@ -30,9 +30,7 @@ def __init__(self, async_engine_args: AsyncEngineArgs, # Init socket for readiness state. self.socket = self.context.socket(zmq.constants.ROUTER) - # Note numeric form of localhost should be used for zmq bind(), - # see https://stackoverflow.com/a/8958414 - self.socket.bind(f"tcp://127.0.0.1:{port}") + self.socket.bind(rpc_path) def cleanup(self): """Cleanup all resources.""" @@ -213,6 +211,6 @@ def signal_handler() -> None: def run_rpc_server(async_engine_args: AsyncEngineArgs, - usage_context: UsageContext, port: int): - server = AsyncEngineRPCServer(async_engine_args, usage_context, port) + usage_context: UsageContext, rpc_path: str): + server = AsyncEngineRPCServer(async_engine_args, usage_context, rpc_path) asyncio.run(run_server(server)) diff --git a/vllm/envs.py b/vllm/envs.py index 81d2d80e65e46..df4c994359dbd 100644 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -1,10 +1,11 @@ import os +import tempfile from typing import TYPE_CHECKING, Any, Callable, Dict, Optional if TYPE_CHECKING: VLLM_HOST_IP: str = "" VLLM_PORT: Optional[int] = None - VLLM_RPC_PORT: int = 5570 + VLLM_RPC_BASE_PATH: str = tempfile.gettempdir() VLLM_USE_MODELSCOPE: bool = False VLLM_RINGBUFFER_WARNING_INTERVAL: int = 60 VLLM_INSTANCE_ID: Optional[str] = None @@ -142,10 +143,10 @@ def get_default_config_root(): lambda: int(os.getenv('VLLM_PORT', '0')) if 'VLLM_PORT' in os.environ else None, - # used when the frontend api server is running in multi-processing mode, - # to communicate with the backend engine process over ZMQ. - 'VLLM_RPC_PORT': - lambda: int(os.getenv('VLLM_RPC_PORT', '5570')), + # path used for ipc when the frontend api server is running in + # multi-processing mode to communicate with the backend engine process. + 'VLLM_RPC_BASE_PATH': + lambda: os.getenv('VLLM_RPC_BASE_PATH', tempfile.gettempdir()), # If true, will load models from ModelScope instead of Hugging Face Hub. # note that the value is true or false, not numbers diff --git a/vllm/utils.py b/vllm/utils.py index 08aa889b5e447..1fd395c04ca24 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -19,6 +19,7 @@ from typing import (Any, AsyncGenerator, Awaitable, Callable, Dict, Generic, Hashable, List, Optional, OrderedDict, Set, Tuple, TypeVar, Union, overload) +from uuid import uuid4 import numpy as np import numpy.typing as npt @@ -484,10 +485,13 @@ def get_distributed_init_method(ip: str, port: int) -> str: return f"tcp://[{ip}]:{port}" if ":" in ip else f"tcp://{ip}:{port}" -def get_open_port(port: Optional[int] = None) -> int: - if port is None: - # Default behavior here is to return a port for multi-gpu communication - port = envs.VLLM_PORT +def get_open_zmq_ipc_path() -> str: + base_rpc_path = envs.VLLM_RPC_BASE_PATH + return f"ipc://{base_rpc_path}/{uuid4()}" + + +def get_open_port() -> int: + port = envs.VLLM_PORT if port is not None: while True: try: