From 0933513881c3c2391be1028c396a4c1d028f3653 Mon Sep 17 00:00:00 2001 From: Konrad Zawora Date: Fri, 6 Dec 2024 12:30:45 +0100 Subject: [PATCH] Add multiprocessing HPU executor (#559) Signed-off-by: Konrad Zawora --- vllm/config.py | 2 +- vllm/engine/async_llm_engine.py | 4 +++ vllm/engine/llm_engine.py | 6 +++- vllm/executor/multiproc_hpu_executor.py | 48 +++++++++++++++++++++++++ 4 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 vllm/executor/multiproc_hpu_executor.py diff --git a/vllm/config.py b/vllm/config.py index 7fbe04eaaf4f8..b4a2650cca7c2 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -1002,7 +1002,7 @@ def __post_init__(self) -> None: raise ValueError(f"worker-use-ray can't be used with " f"distributed executor backend " f"'{self.distributed_executor_backend}'.") - ray_only_devices = ["tpu", "hpu"] + ray_only_devices = ["tpu"] if (current_platform.device_type in ray_only_devices and self.world_size > 1): if self.distributed_executor_backend is None: diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 60dccd7a0812c..48e65cd9acfbf 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -645,6 +645,10 @@ def _get_executor_cls( from vllm.executor.cpu_executor import CPUExecutorAsync executor_class = CPUExecutorAsync elif engine_config.device_config.device_type == "hpu": + if distributed_executor_backend == "mp": + from vllm.executor.multiproc_hpu_executor import ( + MultiprocessingHPUExecutorAsync) + executor_class = MultiprocessingHPUExecutorAsync if distributed_executor_backend == "ray": initialize_ray_cluster(engine_config.parallel_config) from vllm.executor.ray_hpu_executor import RayHPUExecutorAsync diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 560f84a008291..f28c5902bd105 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -475,7 +475,11 @@ def _get_executor_cls(cls, from vllm.executor.cpu_executor import CPUExecutor executor_class = CPUExecutor elif engine_config.device_config.device_type == "hpu": - if distributed_executor_backend == "ray": + if distributed_executor_backend == "mp": + from vllm.executor.multiproc_hpu_executor import ( + MultiprocessingHPUExecutor) + executor_class = MultiprocessingHPUExecutor + elif distributed_executor_backend == "ray": initialize_ray_cluster(engine_config.parallel_config) from vllm.executor.ray_hpu_executor import RayHPUExecutor executor_class = RayHPUExecutor diff --git a/vllm/executor/multiproc_hpu_executor.py b/vllm/executor/multiproc_hpu_executor.py new file mode 100644 index 0000000000000..0027876cc4fb4 --- /dev/null +++ b/vllm/executor/multiproc_hpu_executor.py @@ -0,0 +1,48 @@ +from typing import Callable, Optional, Tuple, Type + +import habana_frameworks.torch # noqa: F401 +import torch + +from vllm.executor.multiproc_gpu_executor import ( + MultiprocessingGPUExecutor, MultiprocessingGPUExecutorAsync) +from vllm.logger import init_logger +from vllm.utils import make_async +from vllm.worker.worker_base import WorkerBase + +logger = init_logger(__name__) + + +class MultiprocessingHPUExecutor(MultiprocessingGPUExecutor): + """Python multiprocessing-based multi-HPU executor""" + + def _get_worker_module_and_class( + self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: + worker_class_fn = None + if self.speculative_config is not None: + module_name = "vllm.spec_decode.spec_decode_worker" + class_name = "create_spec_worker" + else: + module_name = "vllm.worker.hpu_worker" + class_name = "HPUWorker" + return (module_name, class_name, worker_class_fn) + + def _check_executor_parameters(self): + world_size = self.parallel_config.world_size + tensor_parallel_size = self.parallel_config.tensor_parallel_size + + hpu_device_count = torch.hpu.device_count() + assert tensor_parallel_size <= hpu_device_count, ( + f"please set tensor_parallel_size ({tensor_parallel_size}) " + f"to less than max local hpu count ({hpu_device_count})") + + assert world_size <= hpu_device_count, ( + f"please ensure that world_size ({world_size}) " + f"is less than than max local hpu count ({hpu_device_count})") + + +class MultiprocessingHPUExecutorAsync(MultiprocessingHPUExecutor, + MultiprocessingGPUExecutorAsync): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.driver_exec_model = make_async(self.driver_worker.execute_model)