From a3f68b50147711972af33ea520d9292cf7a51a0d Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Mon, 4 Nov 2024 15:00:17 -0800 Subject: [PATCH 01/19] wip Signed-off-by: Rui Qiao --- .../test_basic_correctness.py | 16 +- vllm/distributed/parallel_state.py | 6 +- .../model_executor/layers/logits_processor.py | 1 + vllm/v1/engine/llm_engine.py | 10 +- vllm/v1/executor/distributed_gpu_executor.py | 96 +++++ vllm/v1/executor/ray_executor.py | 308 ++++++++++++++ vllm/v1/executor/ray_gpu_executor.py | 327 +++++++++++++++ vllm/v1/executor/ray_utils.py | 375 ++++++++++++++++++ vllm/v1/executor/uniproc_executor.py | 25 +- vllm/v1/worker/gpu_worker.py | 1 + 10 files changed, 1154 insertions(+), 11 deletions(-) create mode 100644 vllm/v1/executor/distributed_gpu_executor.py create mode 100644 vllm/v1/executor/ray_executor.py create mode 100644 vllm/v1/executor/ray_gpu_executor.py create mode 100644 vllm/v1/executor/ray_utils.py diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index 11d05cefb731..4e6524cfa32d 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -20,7 +20,8 @@ MODELS = [ "google/gemma-2-2b-it", - "meta-llama/Llama-3.2-1B", + # "facebook/opt-125m", + # "meta-llama/Llama-3.2-1B", ] TARGET_TEST_SUITE = os.environ.get("TARGET_TEST_SUITE", "L4") @@ -46,10 +47,11 @@ def test_vllm_gc_ed(): @pytest.mark.skip_v1 @pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS", "FLASHINFER"]) +# @pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS", "FLASHINFER"]) +@pytest.mark.parametrize("backend", ["FLASH_ATTN_VLLM_V1"]) @pytest.mark.parametrize("dtype", ["half"]) @pytest.mark.parametrize("max_tokens", [5]) -@pytest.mark.parametrize("enforce_eager", [False, True]) +@pytest.mark.parametrize("enforce_eager", [True]) def test_models( hf_runner, model: str, @@ -82,7 +84,9 @@ def test_models( max_model_len=8192, dtype=dtype, enforce_eager=enforce_eager, - gpu_memory_utilization=0.7) as vllm_model: + distributed_executor_backend="ray", + gpu_memory_utilization=0.7, + tensor_parallel_size=4) as vllm_model: vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens) check_outputs_equal( @@ -129,8 +133,8 @@ def test_models_distributed( # Import VLLM_USE_V1 dynamically to handle patching from vllm.envs import VLLM_USE_V1 - if VLLM_USE_V1 and distributed_executor_backend != "mp": - pytest.skip(f"Skip {distributed_executor_backend} for V1") + if not VLLM_USE_V1 or distributed_executor_backend == "mp": + pytest.skip(f"Skip {distributed_executor_backend} for V0") dtype = "half" max_tokens = 5 diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 5b9236f8c56b..23647503033c 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -247,9 +247,9 @@ def __init__( from vllm.distributed.device_communicators.shm_broadcast import ( MessageQueue) self.mq_broadcaster: Optional[MessageQueue] = None - if use_message_queue_broadcaster and self.world_size > 1: - self.mq_broadcaster = MessageQueue.create_from_process_group( - self.cpu_group, 1 << 22, 6) + # if use_message_queue_broadcaster and self.world_size > 1: + # self.mq_broadcaster = MessageQueue.create_from_process_group( + # self.cpu_group, 1 << 22, 6) @property def first_rank(self): diff --git a/vllm/model_executor/layers/logits_processor.py b/vllm/model_executor/layers/logits_processor.py index 2bc7e458494f..dff0ae2461f2 100644 --- a/vllm/model_executor/layers/logits_processor.py +++ b/vllm/model_executor/layers/logits_processor.py @@ -8,6 +8,7 @@ import vllm.envs as envs from vllm.distributed import (tensor_model_parallel_all_gather, tensor_model_parallel_gather) +from vllm.envs import VLLM_USE_V1 from vllm.model_executor.layers.vocab_parallel_embedding import ( VocabParallelEmbedding) from vllm.model_executor.sampling_metadata import SamplingMetadata diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index bea8c5502f61..2b9a5acf6a7e 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -6,6 +6,7 @@ from vllm.engine.arg_utils import EngineArgs from vllm.engine.metrics_types import StatLoggerBase from vllm.envs import VLLM_ENABLE_V1_MULTIPROCESSING +from vllm.executor.ray_utils import initialize_ray_cluster from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType from vllm.logger import init_logger from vllm.lora.request import LoRARequest @@ -20,6 +21,7 @@ from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor +from vllm.v1.executor.ray_executor import RayExecutor from vllm.v1.executor.abstract import Executor logger = init_logger(__name__) @@ -110,7 +112,10 @@ def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: executor_class: Type[Executor] distributed_executor_backend = ( vllm_config.parallel_config.distributed_executor_backend) - if distributed_executor_backend == "mp": + if distributed_executor_backend == "ray": + initialize_ray_cluster(vllm_config.parallel_config) + return RayExecutor + elif distributed_executor_backend == "mp": from vllm.v1.executor.multiproc_executor import MultiprocExecutor executor_class = MultiprocExecutor else: @@ -120,6 +125,9 @@ def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: return executor_class + def stop_remote_worker_execution_loop(self) -> None: + raise NotImplementedError("TP not implemented yet.") + def get_num_unfinished_requests(self) -> int: return self.detokenizer.get_num_unfinished_requests() diff --git a/vllm/v1/executor/distributed_gpu_executor.py b/vllm/v1/executor/distributed_gpu_executor.py new file mode 100644 index 000000000000..3183e21f4e05 --- /dev/null +++ b/vllm/v1/executor/distributed_gpu_executor.py @@ -0,0 +1,96 @@ +from abc import abstractmethod +from typing import Any, Optional, Tuple + +from vllm.config import VllmConfig +from vllm.logger import init_logger +from vllm.v1.core.scheduler import SchedulerOutput +from vllm.v1.outputs import ModelRunnerOutput + +logger = init_logger(__name__) + + +class DistributedGPUExecutor: + """Abstract superclass of multi-GPU executor implementations.""" + + def __init__(self, vllm_config: VllmConfig): + self.vllm_config = vllm_config + self.model_config = vllm_config.model_config + self.cache_config = vllm_config.cache_config + self.lora_config = vllm_config.lora_config + self.load_config = vllm_config.load_config + self.parallel_config = vllm_config.parallel_config + self.scheduler_config = vllm_config.scheduler_config + self.device_config = vllm_config.device_config + self.speculative_config = vllm_config.speculative_config + self.prompt_adapter_config = vllm_config.prompt_adapter_config + self.observability_config = vllm_config.observability_config + + def determine_num_available_blocks(self) -> Tuple[int, int]: + """Determine the number of available KV blocks. + + This invokes `determine_num_available_blocks` on each worker and takes + the min of the results, guaranteeing that the selected cache sizes are + compatible with all workers. + + Returns: + - tuple[num_gpu_blocks, num_cpu_blocks] + """ + # Get the maximum number of blocks that can be allocated on GPU and CPU. + num_blocks = self._run_workers("determine_num_available_blocks") + + # Since we use a shared centralized controller, we take the minimum + # number of blocks across all workers to make sure all the memory + # operators can be applied to all workers. + num_gpu_blocks = min(b[0] for b in num_blocks) + return num_gpu_blocks, 0 + + def initialize_cache(self, num_gpu_blocks: int) -> None: + """Initialize the KV cache in all workers. + """ + # NOTE: This is logged in the executor because there can be >1 worker + # with other executors. We could log in the engine level, but work + # remains to abstract away the device for non-GPU configurations. + logger.info("# GPU blocks: %d", num_gpu_blocks) + self._run_workers("initialize_cache", num_gpu_blocks) + self._run_workers("compile_or_warm_up_model") + + @abstractmethod + def execute_model( + self, + scheduler_output: SchedulerOutput, + ) -> ModelRunnerOutput: + raise NotImplementedError + + def save_sharded_state( + self, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + self._run_workers("save_sharded_state", + path=path, + pattern=pattern, + max_size=max_size) + + @abstractmethod + def _run_workers( + self, + method: str, + *args, + async_run_tensor_parallel_workers_only: bool = False, + max_concurrent_workers: Optional[int] = None, + **kwargs, + ) -> Any: + """Runs the given method on all workers. + + Args: + async_run_tensor_parallel_workers_only: If True the method will be + run only in the remote TP workers, not the driver worker. + It will also be run asynchronously and return a list of futures + rather than blocking on the results. + """ + raise NotImplementedError + + @abstractmethod + def check_health(self) -> None: + raise NotImplementedError diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py new file mode 100644 index 000000000000..a8590fc1f261 --- /dev/null +++ b/vllm/v1/executor/ray_executor.py @@ -0,0 +1,308 @@ +import os +from collections import defaultdict +from itertools import islice, repeat +from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, + Type) + +import vllm.envs as envs +from vllm.config import VllmConfig +from vllm.logger import init_logger +from vllm.utils import (get_distributed_init_method, get_ip, get_open_port) +from vllm.v1.core.scheduler import SchedulerOutput +from vllm.v1.executor.ray_utils import RayWorkerWrapper, ray +from vllm.v1.outputs import ModelRunnerOutput +from vllm.worker.worker_base import WorkerBase + +if ray is not None: + from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + +if TYPE_CHECKING: + from ray.util.placement_group import PlacementGroup + +logger = init_logger(__name__) + +class RayExecutor: + + def __init__(self, vllm_config: VllmConfig) -> None: + self.vllm_config = vllm_config + self.model_config = vllm_config.model_config + self.cache_config = vllm_config.cache_config + self.lora_config = vllm_config.lora_config + self.load_config = vllm_config.load_config + self.parallel_config = vllm_config.parallel_config + self.scheduler_config = vllm_config.scheduler_config + self.device_config = vllm_config.device_config + self.speculative_config = vllm_config.speculative_config + self.prompt_adapter_config = vllm_config.prompt_adapter_config + self.observability_config = vllm_config.observability_config + self._init_executor() + + def _init_executor(self) -> None: + self.forward_dag: Optional[ray.dag.CompiledDAG] = None + placement_group = self.parallel_config.placement_group + + # Disable Ray usage stats collection. + ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") + if ray_usage != "1": + os.environ["RAY_USAGE_STATS_ENABLED"] = "0" + + # Create the parallel GPU workers. + self._init_workers_ray(placement_group) + + def _init_workers_ray(self, placement_group: "PlacementGroup", + **ray_remote_kwargs): + if (self.parallel_config.tensor_parallel_size == 1 + and self.parallel_config.pipeline_parallel_size == 1): + # For single GPU case, we use a ray worker with constrained memory. + num_gpus = self.cache_config.gpu_memory_utilization + else: + # Otherwise, the ray workers are allocated with a full GPU. + num_gpus = 1 + + # A list of workers to run a model. + self.workers: List[RayWorkerWrapper] = [] + if self.parallel_config.ray_workers_use_nsight: + ray_remote_kwargs = self._configure_ray_workers_use_nsight( + ray_remote_kwargs) + + # Create the workers. + driver_ip = get_ip() + worker_wrapper_kwargs = self._get_worker_wrapper_args() + for bundle_id, bundle in enumerate(placement_group.bundle_specs): + if not bundle.get("GPU", 0): + continue + scheduling_strategy = PlacementGroupSchedulingStrategy( + placement_group=placement_group, + placement_group_capture_child_tasks=True, + placement_group_bundle_index=bundle_id, + ) + + worker = ray.remote( + num_cpus=0, + num_gpus=num_gpus, + scheduling_strategy=scheduling_strategy, + **ray_remote_kwargs, + )(RayWorkerWrapper).remote(**worker_wrapper_kwargs) + self.workers.append(worker) + + logger.debug("workers: %s", self.workers) + worker_ips = [ + ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined] + for worker in self.workers + ] + ip_counts: Dict[str, int] = {} + for ip in worker_ips: + ip_counts[ip] = ip_counts.get(ip, 0) + 1 + + def sort_by_driver_then_worker_ip(worker): + """ + Sort the workers based on 3 properties: + 1. If the worker is on the same node as the driver (vllm engine), + it should be placed first. + 2. Then, if the worker is on a node with fewer workers, it should + be placed first. + 3. Finally, if the work is on a node with smaller IP address, it + should be placed first. + """ + ip = ray.get(worker.get_node_ip.remote()) + return (ip != driver_ip, ip_counts[ip], ip) + + # After sorting, the workers on the same node will be + # close to each other, and the workers on the driver + # node will be placed first. + self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) + + # Get the set of GPU IDs used on each node. + worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids") + + node_workers = defaultdict(list) # node id -> list of worker ranks + node_gpus = defaultdict(list) # node id -> list of gpu ids + + for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): + node_workers[node_id].append(i) + # `gpu_ids` can be a list of strings or integers. + # convert them to integers for consistency. + # NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs), + # string sorting is not sufficient. + # see https://github.com/vllm-project/vllm/issues/5590 + gpu_ids = [int(x) for x in gpu_ids] + node_gpus[node_id].extend(gpu_ids) + + for node_id, gpu_ids in node_gpus.items(): + node_gpus[node_id] = sorted(gpu_ids) + + all_ips = set(worker_ips) + n_ips = len(all_ips) + n_nodes = len(node_workers) + + if n_nodes != n_ips: + raise RuntimeError( + f"Every node should have a unique IP address. Got {n_nodes}" + f" nodes with node ids {list(node_workers.keys())} and " + f"{n_ips} unique IP addresses {all_ips}. Please check your" + " network configuration. If you set `VLLM_HOST_IP` or " + "`HOST_IP` environment variable, make sure it is unique for" + " each node.") + + VLLM_INSTANCE_ID = get_vllm_instance_id() + + # Set environment variables for the driver and workers. + all_args_to_update_environment_variables = [({ + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[node_id])), + "VLLM_INSTANCE_ID": + VLLM_INSTANCE_ID, + "VLLM_TRACE_FUNCTION": + str(envs.VLLM_TRACE_FUNCTION), + "VLLM_USE_V1": + str(int(envs.VLLM_USE_V1)), + **({ + "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND + } if envs.VLLM_ATTENTION_BACKEND is not None else {}) + }, ) for (node_id, _) in worker_node_and_gpu_ids] + + self._env_vars_for_all_workers = ( + all_args_to_update_environment_variables) + + self._run_workers("update_environment_variables", + all_args=self._get_env_vars_to_be_updated()) + + if len(node_gpus) == 1: + # in single node case, we don't need to get the IP address. + # the loopback address is sufficient + # NOTE: a node may have several IP addresses, one for each + # network interface. `get_ip()` might return any of them, + # while they might not work for communication inside the node + # if the network setup is complicated. Using the loopback address + # solves this issue, as it always works for communication inside + # the node. + driver_ip = "127.0.0.1" + distributed_init_method = get_distributed_init_method( + driver_ip, get_open_port()) + + # Initialize the actual workers inside worker wrapper. + init_worker_all_kwargs = [ + self._get_worker_kwargs( + local_rank=node_workers[node_id].index(rank), + rank=rank, + distributed_init_method=distributed_init_method, + ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) + ] + self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) + self._run_workers("initialize") + self._run_workers("load_model") + + def _get_worker_module_and_class( + self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: + worker_module_name = "vllm.v1.worker.gpu_worker" + worker_class_name = "Worker" + return worker_module_name, worker_class_name + + def _get_worker_wrapper_args(self) -> Dict[str, Any]: + worker_module_name, worker_class_name = ( + self._get_worker_module_and_class()) + + return dict( + worker_module_name=worker_module_name, + worker_class_name=worker_class_name, + trust_remote_code=self.model_config.trust_remote_code, + ) + + def _run_workers( + self, + method: str, + *args, + all_args: Optional[List[Tuple[Any, ...]]] = None, + all_kwargs: Optional[List[Dict[str, Any]]] = None, + **kwargs, + ) -> Any: + """Runs the given method on all workers. Can be used in the following + ways: + Args: + - async_run_tensor_parallel_workers_only: If True the method will be + run only in the remote TP workers, not the driver worker. + It will also be run asynchronously and return a list of futures + rather than blocking on the results. + - args/kwargs: All workers share the same args/kwargs + - all_args/all_kwargs: args/kwargs for each worker are specified + individually + """ + count = len(self.workers) + all_worker_args = repeat(args, count) if all_args is None \ + else islice(all_args, 0, None) + all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \ + else islice(all_kwargs, 0, None) + + # Start the ray workers first. + ray_workers = self.workers + ray_worker_outputs = [ + worker.execute_method.remote(method, *worker_args, **worker_kwargs) + for (worker, worker_args, worker_kwargs + ) in zip(ray_workers, all_worker_args, all_worker_kwargs) + ] + + # Get the results of the ray workers. + if self.workers: + ray_worker_outputs = ray.get(ray_worker_outputs) + + return ray_worker_outputs + + def initialize(self, num_gpu_blocks: int) -> None: + raise NotImplementedError + + def determine_num_available_blocks(self) -> Tuple[int, int]: + raise NotImplementedError + + def execute_model( + self, + scheduler_output, + ) -> ModelRunnerOutput: + if self.forward_dag is None: + self.forward_dag = self._compiled_ray_dag() + # All workers are supposed to produce the same output. Only + # get the first output. + output = ray.get(self.forward_dag.execute(scheduler_output))[0] + return output + + def profile(self, is_start=True): + raise NotImplementedError + + def shutdown(self): + pass + + def check_health(self) -> None: + raise NotImplementedError + + def _check_ray_compiled_graph_installation(self): + # TODO: We should check versions that support compiled graph. + import importlib.util + adag_spec = importlib.util.find_spec( + "ray.experimental.compiled_dag_ref") + if adag_spec is None: + raise ValueError("Ray accelerated DAG is not installed. " + "Run `pip install ray[adag]` to install it.") + + cupy_spec = importlib.util.find_spec("cupy") + if cupy_spec is None and envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: + raise ValueError( + "cupy is not installed but required since " + "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL is set." + "Run `pip install ray[adag]` and check cupy installation.") + + def _compiled_ray_dag(self): + assert self.parallel_config.use_ray + self._check_ray_compiled_graph_installation() + from ray.dag import InputNode, MultiOutputNode + + with InputNode() as input_batches: + outputs = [ + worker.execute_model.bind(input_batches) + for worker in self.workers + ] + forward_dag = MultiOutputNode(outputs) + + return forward_dag.experimental_compile() + + + def __del__(self): + self.shutdown() \ No newline at end of file diff --git a/vllm/v1/executor/ray_gpu_executor.py b/vllm/v1/executor/ray_gpu_executor.py new file mode 100644 index 000000000000..217eb0163098 --- /dev/null +++ b/vllm/v1/executor/ray_gpu_executor.py @@ -0,0 +1,327 @@ +import os +from collections import defaultdict +from itertools import islice, repeat +from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, + Type) + +import vllm.envs as envs +from vllm.logger import init_logger +from vllm.utils import (get_distributed_init_method, get_ip, get_open_port) +from vllm.v1.core.scheduler import SchedulerOutput +from vllm.v1.executor.distributed_gpu_executor import DistributedGPUExecutor +from vllm.v1.executor.ray_utils import RayWorkerWrapper, ray +from vllm.v1.outputs import ModelRunnerOutput +from vllm.worker.worker_base import WorkerBase + +if ray is not None: + from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + +if TYPE_CHECKING: + from ray.util.placement_group import PlacementGroup + +logger = init_logger(__name__) + + +class RayGPUExecutor(DistributedGPUExecutor): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._init_executor() + + def _init_executor(self) -> None: + self.forward_dag: Optional[ray.dag.CompiledDAG] = None + placement_group = self.parallel_config.placement_group + + # Disable Ray usage stats collection. + ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") + if ray_usage != "1": + os.environ["RAY_USAGE_STATS_ENABLED"] = "0" + + # Create the parallel GPU workers. + self._init_workers_ray(placement_group) + + def shutdown(self) -> None: + if hasattr(self, "forward_dag") and self.forward_dag is not None: + self.forward_dag.teardown() + import ray + for worker in self.workers: + ray.kill(worker) + self.forward_dag = None + + def _get_worker_module_and_class( + self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: + worker_module_name = "vllm.v1.worker.gpu_worker" + worker_class_name = "Worker" + return worker_module_name, worker_class_name + + def _get_worker_kwargs( + self, + local_rank: int = 0, + rank: int = 0, + distributed_init_method: Optional[str] = None) -> Dict[str, Any]: + """Return worker init args for a given rank.""" + if distributed_init_method is None: + distributed_init_method = get_distributed_init_method( + get_ip(), get_open_port()) + return dict( + vllm_config=self.vllm_config, + local_rank=local_rank, + rank=rank, + distributed_init_method=distributed_init_method, + ) + + def _configure_ray_workers_use_nsight(self, + ray_remote_kwargs) -> Dict[str, Any]: + # If nsight profiling is enabled, we need to set the profiling + # configuration for the ray workers as runtime env. + runtime_env = ray_remote_kwargs.setdefault("runtime_env", {}) + runtime_env.update({ + "nsight": { + "t": "cuda,cudnn,cublas", + "o": "'worker_process_%p'", + "cuda-graph-trace": "node", + } + }) + + return ray_remote_kwargs + + def _get_worker_wrapper_args(self) -> Dict[str, Any]: + worker_module_name, worker_class_name = ( + self._get_worker_module_and_class()) + + return dict( + worker_module_name=worker_module_name, + worker_class_name=worker_class_name, + trust_remote_code=self.model_config.trust_remote_code, + ) + + # child class could overwrite this to return actual env vars. + def _get_env_vars_to_be_updated(self): + return self._env_vars_for_all_workers + + def _init_workers_ray(self, placement_group: "PlacementGroup", + **ray_remote_kwargs): + if (self.parallel_config.tensor_parallel_size == 1 + and self.parallel_config.pipeline_parallel_size == 1): + # For single GPU case, we use a ray worker with constrained memory. + num_gpus = self.cache_config.gpu_memory_utilization + else: + # Otherwise, the ray workers are allocated with a full GPU. + num_gpus = 1 + + # A list of workers to run a model. + self.workers: List[RayWorkerWrapper] = [] + if self.parallel_config.ray_workers_use_nsight: + ray_remote_kwargs = self._configure_ray_workers_use_nsight( + ray_remote_kwargs) + + # Create the workers. + driver_ip = get_ip() + worker_wrapper_kwargs = self._get_worker_wrapper_args() + for bundle_id, bundle in enumerate(placement_group.bundle_specs): + if not bundle.get("GPU", 0): + continue + scheduling_strategy = PlacementGroupSchedulingStrategy( + placement_group=placement_group, + placement_group_capture_child_tasks=True, + placement_group_bundle_index=bundle_id, + ) + + worker = ray.remote( + num_cpus=0, + num_gpus=num_gpus, + scheduling_strategy=scheduling_strategy, + **ray_remote_kwargs, + )(RayWorkerWrapper).remote(**worker_wrapper_kwargs) + self.workers.append(worker) + + logger.debug("workers: %s", self.workers) + worker_ips = [ + ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined] + for worker in self.workers + ] + ip_counts: Dict[str, int] = {} + for ip in worker_ips: + ip_counts[ip] = ip_counts.get(ip, 0) + 1 + + def sort_by_driver_then_worker_ip(worker): + """ + Sort the workers based on 3 properties: + 1. If the worker is on the same node as the driver (vllm engine), + it should be placed first. + 2. Then, if the worker is on a node with fewer workers, it should + be placed first. + 3. Finally, if the work is on a node with smaller IP address, it + should be placed first. + """ + ip = ray.get(worker.get_node_ip.remote()) + return (ip != driver_ip, ip_counts[ip], ip) + + # After sorting, the workers on the same node will be + # close to each other, and the workers on the driver + # node will be placed first. + self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) + + # Get the set of GPU IDs used on each node. + worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids") + + node_workers = defaultdict(list) # node id -> list of worker ranks + node_gpus = defaultdict(list) # node id -> list of gpu ids + + for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): + node_workers[node_id].append(i) + # `gpu_ids` can be a list of strings or integers. + # convert them to integers for consistency. + # NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs), + # string sorting is not sufficient. + # see https://github.com/vllm-project/vllm/issues/5590 + gpu_ids = [int(x) for x in gpu_ids] + node_gpus[node_id].extend(gpu_ids) + + for node_id, gpu_ids in node_gpus.items(): + node_gpus[node_id] = sorted(gpu_ids) + + all_ips = set(worker_ips) + n_ips = len(all_ips) + n_nodes = len(node_workers) + + if n_nodes != n_ips: + raise RuntimeError( + f"Every node should have a unique IP address. Got {n_nodes}" + f" nodes with node ids {list(node_workers.keys())} and " + f"{n_ips} unique IP addresses {all_ips}. Please check your" + " network configuration. If you set `VLLM_HOST_IP` or " + "`HOST_IP` environment variable, make sure it is unique for" + " each node.") + + # VLLM_INSTANCE_ID = get_vllm_instance_id() + + # Set environment variables for the driver and workers. + all_args_to_update_environment_variables = [({ + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[node_id])), + # "VLLM_INSTANCE_ID": + # VLLM_INSTANCE_ID, + "VLLM_TRACE_FUNCTION": + str(envs.VLLM_TRACE_FUNCTION), + "VLLM_USE_V1": + str(int(envs.VLLM_USE_V1)), + **({ + "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND + } if envs.VLLM_ATTENTION_BACKEND is not None else {}) + }, ) for (node_id, _) in worker_node_and_gpu_ids] + + self._env_vars_for_all_workers = ( + all_args_to_update_environment_variables) + + self._run_workers("update_environment_variables", + all_args=self._get_env_vars_to_be_updated()) + + if len(node_gpus) == 1: + # in single node case, we don't need to get the IP address. + # the loopback address is sufficient + # NOTE: a node may have several IP addresses, one for each + # network interface. `get_ip()` might return any of them, + # while they might not work for communication inside the node + # if the network setup is complicated. Using the loopback address + # solves this issue, as it always works for communication inside + # the node. + driver_ip = "127.0.0.1" + distributed_init_method = get_distributed_init_method( + driver_ip, get_open_port()) + + # Initialize the actual workers inside worker wrapper. + init_worker_all_kwargs = [ + self._get_worker_kwargs( + local_rank=node_workers[node_id].index(rank), + rank=rank, + distributed_init_method=distributed_init_method, + ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) + ] + self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) + self._run_workers("initialize") + self._run_workers("load_model") + + def execute_model( + self, + scheduler_output: SchedulerOutput, + ) -> ModelRunnerOutput: + if self.forward_dag is None: + self.forward_dag = self._compiled_ray_dag() + # All workers are supposed to produce the same output. Only + # get the first output. + output = ray.get(self.forward_dag.execute(scheduler_output))[0] + return output + + def _run_workers( + self, + method: str, + *args, + all_args: Optional[List[Tuple[Any, ...]]] = None, + all_kwargs: Optional[List[Dict[str, Any]]] = None, + **kwargs, + ) -> Any: + """Runs the given method on all workers. Can be used in the following + ways: + + Args: + - async_run_tensor_parallel_workers_only: If True the method will be + run only in the remote TP workers, not the driver worker. + It will also be run asynchronously and return a list of futures + rather than blocking on the results. + - args/kwargs: All workers share the same args/kwargs + - all_args/all_kwargs: args/kwargs for each worker are specified + individually + """ + count = len(self.workers) + all_worker_args = repeat(args, count) if all_args is None \ + else islice(all_args, 0, None) + all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \ + else islice(all_kwargs, 0, None) + + # Start the ray workers first. + ray_workers = self.workers + ray_worker_outputs = [ + worker.execute_method.remote(method, *worker_args, **worker_kwargs) + for (worker, worker_args, worker_kwargs + ) in zip(ray_workers, all_worker_args, all_worker_kwargs) + ] + + # Get the results of the ray workers. + if self.workers: + ray_worker_outputs = ray.get(ray_worker_outputs) + + return ray_worker_outputs + + def _check_ray_compiled_graph_installation(self): + # TODO(sang): We should check versions that support compiled graph. + import importlib.util + adag_spec = importlib.util.find_spec( + "ray.experimental.compiled_dag_ref") + if adag_spec is None: + raise ValueError("Ray accelerated DAG is not installed. " + "Run `pip install ray[adag]` to install it.") + + cupy_spec = importlib.util.find_spec("cupy") + if cupy_spec is None and envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: + raise ValueError( + "cupy is not installed but required since " + "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL is set." + "Run `pip install ray[adag]` and check cupy installation.") + + def _compiled_ray_dag(self): + assert self.parallel_config.use_ray + self._check_ray_compiled_graph_installation() + from ray.dag import InputNode, MultiOutputNode + + with InputNode() as input_batches: + outputs = [ + worker.execute_model.bind(input_batches) + for worker in self.workers + ] + forward_dag = MultiOutputNode(outputs) + + return forward_dag.experimental_compile() + + def __del__(self): + self.shutdown() diff --git a/vllm/v1/executor/ray_utils.py b/vllm/v1/executor/ray_utils.py new file mode 100644 index 000000000000..e2ae367d0d5a --- /dev/null +++ b/vllm/v1/executor/ray_utils.py @@ -0,0 +1,375 @@ +import importlib +import os +import time +from collections import defaultdict +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple + +from vllm.config import ParallelConfig +from vllm.logger import init_logger +from vllm.platforms import current_platform +from vllm.utils import get_ip, update_environment_variables +from vllm.v1.outputs import ModelRunnerOutput + +if TYPE_CHECKING: + from vllm.v1.core.scheduler import SchedulerOutput + +logger = init_logger(__name__) +PG_WAIT_TIMEOUT = 60 + + +class WorkerWrapperBase: + """ + The whole point of this class is to lazily initialize the worker. + We first instantiate the WorkerWrapper, which remembers the worker module + and class name. Then, when we call `update_environment_variables`, and the + real initialization happens in `init_worker`. + + Otherwise, the worker class will be obtained by dynamically importing it + using worker_module_name and worker_class_name. + """ + + def __init__(self, + worker_module_name: str, + worker_class_name: str, + trust_remote_code: bool = False) -> None: + self.worker_module_name = worker_module_name + self.worker_class_name = worker_class_name + self.worker = None + + if trust_remote_code: + # note: lazy import to avoid importing torch before initializing + from vllm.utils import init_cached_hf_modules + init_cached_hf_modules() + + @staticmethod + def update_environment_variables(envs: Dict[str, str]) -> None: + key = 'CUDA_VISIBLE_DEVICES' + if key in envs and key in os.environ: + # overwriting CUDA_VISIBLE_DEVICES is desired behavior + # suppress the warning in `update_environment_variables` + del os.environ[key] + update_environment_variables(envs) + + def init_worker(self, *args, **kwargs): + """ + Here we inject some common logic before initializing the worker. + Arguments are passed to the worker class constructor. + """ + # TODO(sang): Enable it + # enable_trace_function_call_for_thread() + + # see https://github.com/NVIDIA/nccl/issues/1234 + os.environ['NCCL_CUMEM_ENABLE'] = '0' + + # TODO(sang): Enable it + # from vllm.plugins import load_general_plugins + # load_general_plugins() + + mod = importlib.import_module(self.worker_module_name) + worker_class = getattr(mod, self.worker_class_name) + + self.worker = worker_class(*args, **kwargs) + assert self.worker is not None + + def execute_method(self, method, *args, **kwargs): + try: + target = self if self.worker is None else self.worker + executor = getattr(target, method) + return executor(*args, **kwargs) + except Exception as e: + # if the driver worker also execute methods, + # exceptions in the rest worker may cause deadlock in rpc like ray + # see https://github.com/vllm-project/vllm/issues/3455 + # print the error and inform the user to solve the error + msg = (f"Error executing method {method}. " + "This might cause deadlock in distributed execution.") + logger.exception(msg) + raise e + + +try: + import ray + from ray.util import placement_group_table + from ray.util.placement_group import PlacementGroup + try: + from ray._private.state import available_resources_per_node + except ImportError: + # Ray 2.9.x doesn't expose `available_resources_per_node` + from ray._private.state import state as _state + available_resources_per_node = _state._available_resources_per_node + + class RayWorkerWrapper(WorkerWrapperBase): + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + # Since the compiled DAG runs a main execution + # in a different thread that calls cuda.set_device. + # The flag indicates is set_device is called on + # that thread. It will be removed soon. + self.compiled_dag_cuda_device_set = False + + def get_node_ip(self) -> str: + return get_ip() + + def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]: + node_id = ray.get_runtime_context().get_node_id() + gpu_ids = ray.get_gpu_ids() + return node_id, gpu_ids + + def setup_device_if_necessary(self): + # TODO(swang): This is needed right now because Ray CG executes + # on a background thread, so we need to reset torch's current + # device. + # We can remove this API after it is fixed in compiled graph. + import torch + if not self.compiled_dag_cuda_device_set: + torch.cuda.set_device(self.worker.device) + self.compiled_dag_cuda_device_set = True + + def execute_model( + self, + scheduler_output: "SchedulerOutput", + ) -> ModelRunnerOutput: + self.setup_device_if_necessary() + output = self.worker.model_runner.execute_model(scheduler_output) + return output + + ray_import_err = None + +except ImportError as e: + ray = None # type: ignore + ray_import_err = e + RayWorkerWrapper = None # type: ignore + + +def ray_is_available() -> bool: + """Returns True if Ray is available.""" + return ray is not None + + +def assert_ray_available(): + """Raise an exception if Ray is not available.""" + if ray is None: + raise ValueError("Failed to import Ray, please install Ray with " + "`pip install ray`.") from ray_import_err + + +def _verify_bundles(placement_group: "PlacementGroup", + parallel_config: ParallelConfig, device_str: str): + """Verify a given placement group has bundles located in the right place. + + There are 2 rules. + - Warn if all tensor parallel workers cannot fit in a single node. + - Fail if driver node is not included in a placement group. + """ + assert ray.is_initialized(), ( + "Ray is not initialized although distributed-executor-backend is ray.") + pg_data = placement_group_table(placement_group) + # bundle_idx -> node_id + bundle_to_node_ids = pg_data["bundles_to_node_id"] + # bundle_idx -> bundle (e.g., {"GPU": 1}) + bundles = pg_data["bundles"] + # node_id -> List of bundle (e.g., {"GPU": 1}) + node_id_to_bundle: Dict[str, List[Dict[str, float]]] = defaultdict(list) + + for bundle_idx, node_id in bundle_to_node_ids.items(): + node_id_to_bundle[node_id].append(bundles[bundle_idx]) + driver_node_id = ray.get_runtime_context().get_node_id() + + if driver_node_id not in node_id_to_bundle: + raise RuntimeError( + f"driver node id {driver_node_id} is not included in a placement " + f"group {placement_group.id}. Node id -> bundles " + f"{node_id_to_bundle}. " + "You don't have enough GPUs available in a current node. Check " + "`ray status` to see if you have available GPUs in a node " + f"{driver_node_id} before starting an vLLM engine.") + + for node_id, bundles in node_id_to_bundle.items(): + if len(bundles) < parallel_config.tensor_parallel_size: + logger.warning( + "tensor_parallel_size=%d " + "is bigger than a reserved number of %ss (%d " + "%ss) in a node %s. Tensor parallel workers can be " + "spread out to 2+ nodes which can degrade the performance " + "unless you have fast interconnect across nodes, like " + "Infiniband. To resolve this issue, make sure you have more " + "than %d GPUs available at each node.", + parallel_config.tensor_parallel_size, device_str, len(bundles), + device_str, node_id, parallel_config.tensor_parallel_size) + + +def _wait_until_pg_ready(current_placement_group: "PlacementGroup"): + """Wait until a placement group is ready. + + It prints the informative log messages if the placement group is + not created within time. + + """ + # Wait until PG is ready - this will block until all + # requested resources are available, and will timeout + # if they cannot be provisioned. + placement_group_specs = current_placement_group.bundle_specs + + s = time.time() + pg_ready_ref = current_placement_group.ready() + wait_interval = 10 + while time.time() - s < PG_WAIT_TIMEOUT: + ready, _ = ray.wait([pg_ready_ref], timeout=wait_interval) + if len(ready) > 0: + break + + # Exponential backoff for warning print. + wait_interval *= 2 + logger.info( + "Waiting for creating a placement group of specs for " + "%d seconds. specs=%s. Check " + "`ray status` to see if you have enough resources.", + int(time.time() - s), placement_group_specs) + + try: + ray.get(pg_ready_ref, timeout=0) + except ray.exceptions.GetTimeoutError: + raise ValueError( + "Cannot provide a placement group of " + f"{placement_group_specs=} within {PG_WAIT_TIMEOUT} seconds. See " + "`ray status` to make sure the cluster has enough resources." + ) from None + + +def _wait_until_pg_removed(current_placement_group: "PlacementGroup"): + ray.util.remove_placement_group(current_placement_group) + s = time.time() + wait_interval = 10 + while time.time() - s < PG_WAIT_TIMEOUT: + pg = ray.util.get_current_placement_group() + if pg is None: + break + + # Exponential backoff for warning print. + wait_interval *= 2 + logger.info( + "Waiting for removing a placement group of specs for " + "%d seconds.", int(time.time() - s)) + time.sleep(wait_interval) + + +def initialize_ray_cluster( + parallel_config: ParallelConfig, + ray_address: Optional[str] = None, +): + """Initialize the distributed cluster with Ray. + + it will connect to the Ray cluster and create a placement group + for the workers, which includes the specification of the resources + for each distributed worker. + + Args: + parallel_config: The configurations for parallel execution. + ray_address: The address of the Ray cluster. If None, uses + the default Ray cluster address. + """ + assert_ray_available() + + # Connect to a ray cluster. + if current_platform.is_rocm() or current_platform.is_xpu(): + # Try to connect existing ray instance and create a new one if not found + try: + ray.init("auto") + except ConnectionError: + logger.warning( + "No existing RAY instance detected. " + "A new instance will be launched with current node resources.") + ray.init(address=ray_address, + ignore_reinit_error=True, + num_gpus=parallel_config.world_size) + else: + ray.init(address=ray_address, ignore_reinit_error=True) + + if parallel_config.placement_group: + # Placement group is already set. + return + + device_str = "GPU" if not current_platform.is_tpu() else "TPU" + # Create placement group for worker processes + current_placement_group = ray.util.get_current_placement_group() + if current_placement_group: + # We are in a placement group + bundles = current_placement_group.bundle_specs + # Verify that we can use the placement group. + device_bundles = 0 + for bundle in bundles: + bundle_devices = bundle.get(device_str, 0) + if bundle_devices > 1: + raise ValueError( + "Placement group bundle cannot have more than 1 " + f"{device_str}.") + if bundle_devices: + device_bundles += 1 + if parallel_config.world_size > device_bundles: + raise ValueError( + f"The number of required {device_str}s exceeds the total " + f"number of available {device_str}s in the placement group." + f"Required number of devices: {parallel_config.world_size}. " + f"Total number of devices: {device_bundles}.") + else: + num_devices_in_cluster = ray.cluster_resources().get(device_str, 0) + if parallel_config.world_size > num_devices_in_cluster: + raise ValueError( + f"The number of required {device_str}s exceeds the total " + f"number of available {device_str}s in the placement group.") + # Create a new placement group + placement_group_specs: List[Dict[str, float]] = ([{ + device_str: 1.0 + } for _ in range(parallel_config.world_size)]) + + # vLLM engine is also a worker to execute model with an accelerator, + # so it requires to have the device in a current node. Check if + # the current node has at least one device. + current_ip = get_ip() + current_node_id = ray.get_runtime_context().get_node_id() + current_node_resource = available_resources_per_node()[current_node_id] + if current_node_resource.get(device_str, 0) < 1: + raise ValueError( + f"Current node has no {device_str} available. " + f"{current_node_resource=}. vLLM engine cannot start without " + f"{device_str}. Make sure you have at least 1 {device_str} " + f"available in a node {current_node_id=} {current_ip=}.") + # This way, at least bundle is required to be created in a current + # node. + placement_group_specs[0][f"node:{current_ip}"] = 0.001 + + # By default, Ray packs resources as much as possible. + current_placement_group = ray.util.placement_group( + placement_group_specs, strategy="PACK") + _wait_until_pg_ready(current_placement_group) + + assert current_placement_group is not None + _verify_bundles(current_placement_group, parallel_config, device_str) + # Set the placement group in the parallel config + parallel_config.placement_group = current_placement_group + + +def get_num_tpu_nodes() -> int: + from ray._private.accelerators import TPUAcceleratorManager + cluster_resources = ray.cluster_resources() + total_tpus = int(cluster_resources["TPU"]) + tpus_per_node = TPUAcceleratorManager.get_current_node_num_accelerators() + assert total_tpus % tpus_per_node == 0 + return total_tpus // tpus_per_node + + +def get_num_nodes_in_placement_group() -> int: + pg_table = ray.util.placement_group_table() + current_pg = ray.util.get_current_placement_group() + num_nodes = 0 + + if current_pg: + nodes_in_pg = set() + for pg_key, pg in pg_table.items(): + if pg_key == current_pg.id.hex(): + for _, node in pg["bundles_to_node_id"].items(): + nodes_in_pg.add(node) + num_nodes = len(nodes_in_pg) + + return num_nodes diff --git a/vllm/v1/executor/uniproc_executor.py b/vllm/v1/executor/uniproc_executor.py index be058318de58..7a7a4cbd4656 100644 --- a/vllm/v1/executor/uniproc_executor.py +++ b/vllm/v1/executor/uniproc_executor.py @@ -1,5 +1,5 @@ import os -from typing import Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple, Type from vllm.config import VllmConfig from vllm.logger import init_logger @@ -7,6 +7,7 @@ from vllm.v1.executor.abstract import Executor from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.worker.gpu_worker import Worker +from vllm.worker.worker_base import WorkerBase logger = init_logger(__name__) @@ -82,3 +83,25 @@ def check_health(self) -> None: # UniprocExecutor will always be healthy as long as # it's running. return + + def _get_worker_module_and_class( + self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: + worker_module_name = "vllm.v1.worker.gpu_worker" + worker_class_name = "Worker" + return worker_module_name, worker_class_name + + def _get_worker_kwargs( + self, + local_rank: int = 0, + rank: int = 0, + distributed_init_method: Optional[str] = None) -> Dict[str, Any]: + """Return worker init args for a given rank.""" + if distributed_init_method is None: + distributed_init_method = get_distributed_init_method( + get_ip(), get_open_port()) + return dict( + vllm_config=self.vllm_config, + local_rank=local_rank, + rank=rank, + distributed_init_method=distributed_init_method, + ) diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index 33491f700de1..11a2805f4700 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -93,6 +93,7 @@ def initialize(self): gc.collect() torch.cuda.empty_cache() self.init_gpu_memory = torch.cuda.mem_get_info()[0] + self.model_runner = GPUModelRunner(self.vllm_config, self.device) else: raise RuntimeError( f"Not support device type: {self.device_config.device}") From 4ea0cd0f6e910269ab6cf2598e8d0ae89fa31c72 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Wed, 11 Dec 2024 19:25:54 +0000 Subject: [PATCH 02/19] fix Signed-off-by: Rui Qiao --- .../model_executor/layers/logits_processor.py | 1 - vllm/v1/engine/llm_engine.py | 2 +- vllm/v1/executor/ray_executor.py | 101 +++++++++++++----- vllm/v1/executor/ray_gpu_executor.py | 31 +++--- 4 files changed, 95 insertions(+), 40 deletions(-) diff --git a/vllm/model_executor/layers/logits_processor.py b/vllm/model_executor/layers/logits_processor.py index dff0ae2461f2..2bc7e458494f 100644 --- a/vllm/model_executor/layers/logits_processor.py +++ b/vllm/model_executor/layers/logits_processor.py @@ -8,7 +8,6 @@ import vllm.envs as envs from vllm.distributed import (tensor_model_parallel_all_gather, tensor_model_parallel_gather) -from vllm.envs import VLLM_USE_V1 from vllm.model_executor.layers.vocab_parallel_embedding import ( VocabParallelEmbedding) from vllm.model_executor.sampling_metadata import SamplingMetadata diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 2b9a5acf6a7e..fc8889ccb6de 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -21,8 +21,8 @@ from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor -from vllm.v1.executor.ray_executor import RayExecutor from vllm.v1.executor.abstract import Executor +from vllm.v1.executor.ray_executor import RayExecutor logger = init_logger(__name__) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index a8590fc1f261..c86c8a99fcbf 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -7,8 +7,7 @@ import vllm.envs as envs from vllm.config import VllmConfig from vllm.logger import init_logger -from vllm.utils import (get_distributed_init_method, get_ip, get_open_port) -from vllm.v1.core.scheduler import SchedulerOutput +from vllm.utils import get_distributed_init_method, get_ip, get_open_port from vllm.v1.executor.ray_utils import RayWorkerWrapper, ray from vllm.v1.outputs import ModelRunnerOutput from vllm.worker.worker_base import WorkerBase @@ -21,6 +20,7 @@ logger = init_logger(__name__) + class RayExecutor: def __init__(self, vllm_config: VllmConfig) -> None: @@ -144,22 +144,25 @@ def sort_by_driver_then_worker_ip(worker): "`HOST_IP` environment variable, make sure it is unique for" " each node.") - VLLM_INSTANCE_ID = get_vllm_instance_id() + # VLLM_INSTANCE_ID = get_vllm_instance_id() # Set environment variables for the driver and workers. - all_args_to_update_environment_variables = [({ - "CUDA_VISIBLE_DEVICES": - ",".join(map(str, node_gpus[node_id])), - "VLLM_INSTANCE_ID": - VLLM_INSTANCE_ID, - "VLLM_TRACE_FUNCTION": - str(envs.VLLM_TRACE_FUNCTION), - "VLLM_USE_V1": - str(int(envs.VLLM_USE_V1)), - **({ - "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND - } if envs.VLLM_ATTENTION_BACKEND is not None else {}) - }, ) for (node_id, _) in worker_node_and_gpu_ids] + all_args_to_update_environment_variables = [ + ( + { + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[node_id])), + # "VLLM_INSTANCE_ID": + # VLLM_INSTANCE_ID, + "VLLM_TRACE_FUNCTION": + str(envs.VLLM_TRACE_FUNCTION), + "VLLM_USE_V1": + str(int(envs.VLLM_USE_V1)), + **({ + "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND + } if envs.VLLM_ATTENTION_BACKEND is not None else {}) + }, ) for (node_id, _) in worker_node_and_gpu_ids + ] self._env_vars_for_all_workers = ( all_args_to_update_environment_variables) @@ -192,12 +195,31 @@ def sort_by_driver_then_worker_ip(worker): self._run_workers("initialize") self._run_workers("load_model") + def _get_env_vars_to_be_updated(self): + return self._env_vars_for_all_workers + def _get_worker_module_and_class( self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: worker_module_name = "vllm.v1.worker.gpu_worker" worker_class_name = "Worker" return worker_module_name, worker_class_name + def _get_worker_kwargs( + self, + local_rank: int = 0, + rank: int = 0, + distributed_init_method: Optional[str] = None) -> Dict[str, Any]: + """Return worker init args for a given rank.""" + if distributed_init_method is None: + distributed_init_method = get_distributed_init_method( + get_ip(), get_open_port()) + return dict( + vllm_config=self.vllm_config, + local_rank=local_rank, + rank=rank, + distributed_init_method=distributed_init_method, + ) + def _get_worker_wrapper_args(self) -> Dict[str, Any]: worker_module_name, worker_class_name = ( self._get_worker_module_and_class()) @@ -208,6 +230,44 @@ def _get_worker_wrapper_args(self) -> Dict[str, Any]: trust_remote_code=self.model_config.trust_remote_code, ) + def determine_num_available_blocks(self) -> Tuple[int, int]: + """Determine the number of available KV blocks. + This invokes `determine_num_available_blocks` on each worker and takes + the min of the results, guaranteeing that the selected cache sizes are + compatible with all workers. + Returns: + - tuple[num_gpu_blocks, num_cpu_blocks] + """ + # Get the maximum number of blocks that can be allocated on GPU and CPU. + num_blocks = self._run_workers("determine_num_available_blocks") + + # Since we use a shared centralized controller, we take the minimum + # number of blocks across all workers to make sure all the memory + # operators can be applied to all workers. + num_gpu_blocks = min(b[0] for b in num_blocks) + return num_gpu_blocks, 0 + + def initialize(self, num_gpu_blocks: int) -> None: + """Initialize the KV cache in all workers. + """ + # NOTE: This is logged in the executor because there can be >1 worker + # with other executors. We could log in the engine level, but work + # remains to abstract away the device for non-GPU configurations. + logger.info("# GPU blocks: %d", num_gpu_blocks) + self._run_workers("initialize_cache", num_gpu_blocks) + self._run_workers("compile_or_warm_up_model") + + def save_sharded_state( + self, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + self._run_workers("save_sharded_state", + path=path, + pattern=pattern, + max_size=max_size) + def _run_workers( self, method: str, @@ -247,12 +307,6 @@ def _run_workers( return ray_worker_outputs - def initialize(self, num_gpu_blocks: int) -> None: - raise NotImplementedError - - def determine_num_available_blocks(self) -> Tuple[int, int]: - raise NotImplementedError - def execute_model( self, scheduler_output, @@ -303,6 +357,5 @@ def _compiled_ray_dag(self): return forward_dag.experimental_compile() - def __del__(self): - self.shutdown() \ No newline at end of file + self.shutdown() diff --git a/vllm/v1/executor/ray_gpu_executor.py b/vllm/v1/executor/ray_gpu_executor.py index 217eb0163098..37717ddabd77 100644 --- a/vllm/v1/executor/ray_gpu_executor.py +++ b/vllm/v1/executor/ray_gpu_executor.py @@ -6,7 +6,7 @@ import vllm.envs as envs from vllm.logger import init_logger -from vllm.utils import (get_distributed_init_method, get_ip, get_open_port) +from vllm.utils import get_distributed_init_method, get_ip, get_open_port from vllm.v1.core.scheduler import SchedulerOutput from vllm.v1.executor.distributed_gpu_executor import DistributedGPUExecutor from vllm.v1.executor.ray_utils import RayWorkerWrapper, ray @@ -197,19 +197,22 @@ def sort_by_driver_then_worker_ip(worker): # VLLM_INSTANCE_ID = get_vllm_instance_id() # Set environment variables for the driver and workers. - all_args_to_update_environment_variables = [({ - "CUDA_VISIBLE_DEVICES": - ",".join(map(str, node_gpus[node_id])), - # "VLLM_INSTANCE_ID": - # VLLM_INSTANCE_ID, - "VLLM_TRACE_FUNCTION": - str(envs.VLLM_TRACE_FUNCTION), - "VLLM_USE_V1": - str(int(envs.VLLM_USE_V1)), - **({ - "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND - } if envs.VLLM_ATTENTION_BACKEND is not None else {}) - }, ) for (node_id, _) in worker_node_and_gpu_ids] + all_args_to_update_environment_variables = [ + ( + { + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[node_id])), + # "VLLM_INSTANCE_ID": + # VLLM_INSTANCE_ID, + "VLLM_TRACE_FUNCTION": + str(envs.VLLM_TRACE_FUNCTION), + "VLLM_USE_V1": + str(int(envs.VLLM_USE_V1)), + **({ + "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND + } if envs.VLLM_ATTENTION_BACKEND is not None else {}) + }, ) for (node_id, _) in worker_node_and_gpu_ids + ] self._env_vars_for_all_workers = ( all_args_to_update_environment_variables) From 1efa5bcfba4d3c03f41d98fc87c20e35dc6eb135 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Wed, 11 Dec 2024 21:17:25 +0000 Subject: [PATCH 03/19] fix shutdown Signed-off-by: Rui Qiao --- .../basic_correctness/test_basic_correctness.py | 17 +++++++++-------- vllm/v1/executor/ray_executor.py | 7 ++++++- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index 4e6524cfa32d..b27ca8d1c570 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -100,15 +100,16 @@ def test_models( @multi_gpu_test(num_gpus=2) @pytest.mark.parametrize( "model, distributed_executor_backend, attention_backend, " - "test_suite", [ - ("facebook/opt-125m", "ray", "", "L4"), - ("facebook/opt-125m", "mp", "", "L4"), + "test_suite", + [ + # ("facebook/opt-125m", "ray", "", "L4"), + # ("facebook/opt-125m", "mp", "", "L4"), ("meta-llama/Llama-2-7b-hf", "ray", "", "L4"), - ("meta-llama/Llama-2-7b-hf", "mp", "", "L4"), - ("facebook/opt-125m", "ray", "", "A100"), - ("facebook/opt-125m", "mp", "", "A100"), - ("facebook/opt-125m", "mp", "FLASHINFER", "A100"), - ("meta-llama/Meta-Llama-3-8B", "ray", "FLASHINFER", "A100"), + # ("meta-llama/Llama-2-7b-hf", "mp", "", "L4"), + # ("facebook/opt-125m", "ray", "", "A100"), + # ("facebook/opt-125m", "mp", "", "A100"), + # ("facebook/opt-125m", "mp", "FLASHINFER", "A100"), + # ("meta-llama/Meta-Llama-3-8B", "ray", "FLASHINFER", "A100"), ]) def test_models_distributed( hf_runner, diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index c86c8a99fcbf..ca99049f023a 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -322,7 +322,12 @@ def profile(self, is_start=True): raise NotImplementedError def shutdown(self): - pass + if hasattr(self, "forward_dag") and self.forward_dag is not None: + self.forward_dag.teardown() + import ray + for worker in self.workers: + ray.kill(worker) + self.forward_dag = None def check_health(self) -> None: raise NotImplementedError From b1df508c94119c76699960aef5b5c81646bb653b Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Wed, 11 Dec 2024 21:22:48 +0000 Subject: [PATCH 04/19] cleanup Signed-off-by: Rui Qiao --- .../test_basic_correctness.py | 9 +- vllm/v1/executor/distributed_gpu_executor.py | 96 ----- vllm/v1/executor/ray_gpu_executor.py | 330 ------------------ 3 files changed, 3 insertions(+), 432 deletions(-) delete mode 100644 vllm/v1/executor/distributed_gpu_executor.py delete mode 100644 vllm/v1/executor/ray_gpu_executor.py diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index b27ca8d1c570..505b8bf2d5df 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -47,11 +47,10 @@ def test_vllm_gc_ed(): @pytest.mark.skip_v1 @pytest.mark.parametrize("model", MODELS) -# @pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS", "FLASHINFER"]) -@pytest.mark.parametrize("backend", ["FLASH_ATTN_VLLM_V1"]) +@pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS", "FLASHINFER"]) @pytest.mark.parametrize("dtype", ["half"]) @pytest.mark.parametrize("max_tokens", [5]) -@pytest.mark.parametrize("enforce_eager", [True]) +@pytest.mark.parametrize("enforce_eager", [False, True]) def test_models( hf_runner, model: str, @@ -84,9 +83,7 @@ def test_models( max_model_len=8192, dtype=dtype, enforce_eager=enforce_eager, - distributed_executor_backend="ray", - gpu_memory_utilization=0.7, - tensor_parallel_size=4) as vllm_model: + gpu_memory_utilization=0.7) as vllm_model: vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens) check_outputs_equal( diff --git a/vllm/v1/executor/distributed_gpu_executor.py b/vllm/v1/executor/distributed_gpu_executor.py deleted file mode 100644 index 3183e21f4e05..000000000000 --- a/vllm/v1/executor/distributed_gpu_executor.py +++ /dev/null @@ -1,96 +0,0 @@ -from abc import abstractmethod -from typing import Any, Optional, Tuple - -from vllm.config import VllmConfig -from vllm.logger import init_logger -from vllm.v1.core.scheduler import SchedulerOutput -from vllm.v1.outputs import ModelRunnerOutput - -logger = init_logger(__name__) - - -class DistributedGPUExecutor: - """Abstract superclass of multi-GPU executor implementations.""" - - def __init__(self, vllm_config: VllmConfig): - self.vllm_config = vllm_config - self.model_config = vllm_config.model_config - self.cache_config = vllm_config.cache_config - self.lora_config = vllm_config.lora_config - self.load_config = vllm_config.load_config - self.parallel_config = vllm_config.parallel_config - self.scheduler_config = vllm_config.scheduler_config - self.device_config = vllm_config.device_config - self.speculative_config = vllm_config.speculative_config - self.prompt_adapter_config = vllm_config.prompt_adapter_config - self.observability_config = vllm_config.observability_config - - def determine_num_available_blocks(self) -> Tuple[int, int]: - """Determine the number of available KV blocks. - - This invokes `determine_num_available_blocks` on each worker and takes - the min of the results, guaranteeing that the selected cache sizes are - compatible with all workers. - - Returns: - - tuple[num_gpu_blocks, num_cpu_blocks] - """ - # Get the maximum number of blocks that can be allocated on GPU and CPU. - num_blocks = self._run_workers("determine_num_available_blocks") - - # Since we use a shared centralized controller, we take the minimum - # number of blocks across all workers to make sure all the memory - # operators can be applied to all workers. - num_gpu_blocks = min(b[0] for b in num_blocks) - return num_gpu_blocks, 0 - - def initialize_cache(self, num_gpu_blocks: int) -> None: - """Initialize the KV cache in all workers. - """ - # NOTE: This is logged in the executor because there can be >1 worker - # with other executors. We could log in the engine level, but work - # remains to abstract away the device for non-GPU configurations. - logger.info("# GPU blocks: %d", num_gpu_blocks) - self._run_workers("initialize_cache", num_gpu_blocks) - self._run_workers("compile_or_warm_up_model") - - @abstractmethod - def execute_model( - self, - scheduler_output: SchedulerOutput, - ) -> ModelRunnerOutput: - raise NotImplementedError - - def save_sharded_state( - self, - path: str, - pattern: Optional[str] = None, - max_size: Optional[int] = None, - ) -> None: - self._run_workers("save_sharded_state", - path=path, - pattern=pattern, - max_size=max_size) - - @abstractmethod - def _run_workers( - self, - method: str, - *args, - async_run_tensor_parallel_workers_only: bool = False, - max_concurrent_workers: Optional[int] = None, - **kwargs, - ) -> Any: - """Runs the given method on all workers. - - Args: - async_run_tensor_parallel_workers_only: If True the method will be - run only in the remote TP workers, not the driver worker. - It will also be run asynchronously and return a list of futures - rather than blocking on the results. - """ - raise NotImplementedError - - @abstractmethod - def check_health(self) -> None: - raise NotImplementedError diff --git a/vllm/v1/executor/ray_gpu_executor.py b/vllm/v1/executor/ray_gpu_executor.py deleted file mode 100644 index 37717ddabd77..000000000000 --- a/vllm/v1/executor/ray_gpu_executor.py +++ /dev/null @@ -1,330 +0,0 @@ -import os -from collections import defaultdict -from itertools import islice, repeat -from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, - Type) - -import vllm.envs as envs -from vllm.logger import init_logger -from vllm.utils import get_distributed_init_method, get_ip, get_open_port -from vllm.v1.core.scheduler import SchedulerOutput -from vllm.v1.executor.distributed_gpu_executor import DistributedGPUExecutor -from vllm.v1.executor.ray_utils import RayWorkerWrapper, ray -from vllm.v1.outputs import ModelRunnerOutput -from vllm.worker.worker_base import WorkerBase - -if ray is not None: - from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy - -if TYPE_CHECKING: - from ray.util.placement_group import PlacementGroup - -logger = init_logger(__name__) - - -class RayGPUExecutor(DistributedGPUExecutor): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._init_executor() - - def _init_executor(self) -> None: - self.forward_dag: Optional[ray.dag.CompiledDAG] = None - placement_group = self.parallel_config.placement_group - - # Disable Ray usage stats collection. - ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") - if ray_usage != "1": - os.environ["RAY_USAGE_STATS_ENABLED"] = "0" - - # Create the parallel GPU workers. - self._init_workers_ray(placement_group) - - def shutdown(self) -> None: - if hasattr(self, "forward_dag") and self.forward_dag is not None: - self.forward_dag.teardown() - import ray - for worker in self.workers: - ray.kill(worker) - self.forward_dag = None - - def _get_worker_module_and_class( - self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: - worker_module_name = "vllm.v1.worker.gpu_worker" - worker_class_name = "Worker" - return worker_module_name, worker_class_name - - def _get_worker_kwargs( - self, - local_rank: int = 0, - rank: int = 0, - distributed_init_method: Optional[str] = None) -> Dict[str, Any]: - """Return worker init args for a given rank.""" - if distributed_init_method is None: - distributed_init_method = get_distributed_init_method( - get_ip(), get_open_port()) - return dict( - vllm_config=self.vllm_config, - local_rank=local_rank, - rank=rank, - distributed_init_method=distributed_init_method, - ) - - def _configure_ray_workers_use_nsight(self, - ray_remote_kwargs) -> Dict[str, Any]: - # If nsight profiling is enabled, we need to set the profiling - # configuration for the ray workers as runtime env. - runtime_env = ray_remote_kwargs.setdefault("runtime_env", {}) - runtime_env.update({ - "nsight": { - "t": "cuda,cudnn,cublas", - "o": "'worker_process_%p'", - "cuda-graph-trace": "node", - } - }) - - return ray_remote_kwargs - - def _get_worker_wrapper_args(self) -> Dict[str, Any]: - worker_module_name, worker_class_name = ( - self._get_worker_module_and_class()) - - return dict( - worker_module_name=worker_module_name, - worker_class_name=worker_class_name, - trust_remote_code=self.model_config.trust_remote_code, - ) - - # child class could overwrite this to return actual env vars. - def _get_env_vars_to_be_updated(self): - return self._env_vars_for_all_workers - - def _init_workers_ray(self, placement_group: "PlacementGroup", - **ray_remote_kwargs): - if (self.parallel_config.tensor_parallel_size == 1 - and self.parallel_config.pipeline_parallel_size == 1): - # For single GPU case, we use a ray worker with constrained memory. - num_gpus = self.cache_config.gpu_memory_utilization - else: - # Otherwise, the ray workers are allocated with a full GPU. - num_gpus = 1 - - # A list of workers to run a model. - self.workers: List[RayWorkerWrapper] = [] - if self.parallel_config.ray_workers_use_nsight: - ray_remote_kwargs = self._configure_ray_workers_use_nsight( - ray_remote_kwargs) - - # Create the workers. - driver_ip = get_ip() - worker_wrapper_kwargs = self._get_worker_wrapper_args() - for bundle_id, bundle in enumerate(placement_group.bundle_specs): - if not bundle.get("GPU", 0): - continue - scheduling_strategy = PlacementGroupSchedulingStrategy( - placement_group=placement_group, - placement_group_capture_child_tasks=True, - placement_group_bundle_index=bundle_id, - ) - - worker = ray.remote( - num_cpus=0, - num_gpus=num_gpus, - scheduling_strategy=scheduling_strategy, - **ray_remote_kwargs, - )(RayWorkerWrapper).remote(**worker_wrapper_kwargs) - self.workers.append(worker) - - logger.debug("workers: %s", self.workers) - worker_ips = [ - ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined] - for worker in self.workers - ] - ip_counts: Dict[str, int] = {} - for ip in worker_ips: - ip_counts[ip] = ip_counts.get(ip, 0) + 1 - - def sort_by_driver_then_worker_ip(worker): - """ - Sort the workers based on 3 properties: - 1. If the worker is on the same node as the driver (vllm engine), - it should be placed first. - 2. Then, if the worker is on a node with fewer workers, it should - be placed first. - 3. Finally, if the work is on a node with smaller IP address, it - should be placed first. - """ - ip = ray.get(worker.get_node_ip.remote()) - return (ip != driver_ip, ip_counts[ip], ip) - - # After sorting, the workers on the same node will be - # close to each other, and the workers on the driver - # node will be placed first. - self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) - - # Get the set of GPU IDs used on each node. - worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids") - - node_workers = defaultdict(list) # node id -> list of worker ranks - node_gpus = defaultdict(list) # node id -> list of gpu ids - - for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): - node_workers[node_id].append(i) - # `gpu_ids` can be a list of strings or integers. - # convert them to integers for consistency. - # NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs), - # string sorting is not sufficient. - # see https://github.com/vllm-project/vllm/issues/5590 - gpu_ids = [int(x) for x in gpu_ids] - node_gpus[node_id].extend(gpu_ids) - - for node_id, gpu_ids in node_gpus.items(): - node_gpus[node_id] = sorted(gpu_ids) - - all_ips = set(worker_ips) - n_ips = len(all_ips) - n_nodes = len(node_workers) - - if n_nodes != n_ips: - raise RuntimeError( - f"Every node should have a unique IP address. Got {n_nodes}" - f" nodes with node ids {list(node_workers.keys())} and " - f"{n_ips} unique IP addresses {all_ips}. Please check your" - " network configuration. If you set `VLLM_HOST_IP` or " - "`HOST_IP` environment variable, make sure it is unique for" - " each node.") - - # VLLM_INSTANCE_ID = get_vllm_instance_id() - - # Set environment variables for the driver and workers. - all_args_to_update_environment_variables = [ - ( - { - "CUDA_VISIBLE_DEVICES": - ",".join(map(str, node_gpus[node_id])), - # "VLLM_INSTANCE_ID": - # VLLM_INSTANCE_ID, - "VLLM_TRACE_FUNCTION": - str(envs.VLLM_TRACE_FUNCTION), - "VLLM_USE_V1": - str(int(envs.VLLM_USE_V1)), - **({ - "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND - } if envs.VLLM_ATTENTION_BACKEND is not None else {}) - }, ) for (node_id, _) in worker_node_and_gpu_ids - ] - - self._env_vars_for_all_workers = ( - all_args_to_update_environment_variables) - - self._run_workers("update_environment_variables", - all_args=self._get_env_vars_to_be_updated()) - - if len(node_gpus) == 1: - # in single node case, we don't need to get the IP address. - # the loopback address is sufficient - # NOTE: a node may have several IP addresses, one for each - # network interface. `get_ip()` might return any of them, - # while they might not work for communication inside the node - # if the network setup is complicated. Using the loopback address - # solves this issue, as it always works for communication inside - # the node. - driver_ip = "127.0.0.1" - distributed_init_method = get_distributed_init_method( - driver_ip, get_open_port()) - - # Initialize the actual workers inside worker wrapper. - init_worker_all_kwargs = [ - self._get_worker_kwargs( - local_rank=node_workers[node_id].index(rank), - rank=rank, - distributed_init_method=distributed_init_method, - ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) - ] - self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) - self._run_workers("initialize") - self._run_workers("load_model") - - def execute_model( - self, - scheduler_output: SchedulerOutput, - ) -> ModelRunnerOutput: - if self.forward_dag is None: - self.forward_dag = self._compiled_ray_dag() - # All workers are supposed to produce the same output. Only - # get the first output. - output = ray.get(self.forward_dag.execute(scheduler_output))[0] - return output - - def _run_workers( - self, - method: str, - *args, - all_args: Optional[List[Tuple[Any, ...]]] = None, - all_kwargs: Optional[List[Dict[str, Any]]] = None, - **kwargs, - ) -> Any: - """Runs the given method on all workers. Can be used in the following - ways: - - Args: - - async_run_tensor_parallel_workers_only: If True the method will be - run only in the remote TP workers, not the driver worker. - It will also be run asynchronously and return a list of futures - rather than blocking on the results. - - args/kwargs: All workers share the same args/kwargs - - all_args/all_kwargs: args/kwargs for each worker are specified - individually - """ - count = len(self.workers) - all_worker_args = repeat(args, count) if all_args is None \ - else islice(all_args, 0, None) - all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \ - else islice(all_kwargs, 0, None) - - # Start the ray workers first. - ray_workers = self.workers - ray_worker_outputs = [ - worker.execute_method.remote(method, *worker_args, **worker_kwargs) - for (worker, worker_args, worker_kwargs - ) in zip(ray_workers, all_worker_args, all_worker_kwargs) - ] - - # Get the results of the ray workers. - if self.workers: - ray_worker_outputs = ray.get(ray_worker_outputs) - - return ray_worker_outputs - - def _check_ray_compiled_graph_installation(self): - # TODO(sang): We should check versions that support compiled graph. - import importlib.util - adag_spec = importlib.util.find_spec( - "ray.experimental.compiled_dag_ref") - if adag_spec is None: - raise ValueError("Ray accelerated DAG is not installed. " - "Run `pip install ray[adag]` to install it.") - - cupy_spec = importlib.util.find_spec("cupy") - if cupy_spec is None and envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL: - raise ValueError( - "cupy is not installed but required since " - "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL is set." - "Run `pip install ray[adag]` and check cupy installation.") - - def _compiled_ray_dag(self): - assert self.parallel_config.use_ray - self._check_ray_compiled_graph_installation() - from ray.dag import InputNode, MultiOutputNode - - with InputNode() as input_batches: - outputs = [ - worker.execute_model.bind(input_batches) - for worker in self.workers - ] - forward_dag = MultiOutputNode(outputs) - - return forward_dag.experimental_compile() - - def __del__(self): - self.shutdown() From 4545d1664feda923109c0cdc5cfe6b3a52a980c5 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Wed, 11 Dec 2024 21:39:21 +0000 Subject: [PATCH 05/19] cleanup Signed-off-by: Rui Qiao --- tests/basic_correctness/test_basic_correctness.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index 505b8bf2d5df..b195af68f4c9 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -20,8 +20,7 @@ MODELS = [ "google/gemma-2-2b-it", - # "facebook/opt-125m", - # "meta-llama/Llama-3.2-1B", + "meta-llama/Llama-3.2-1B", ] TARGET_TEST_SUITE = os.environ.get("TARGET_TEST_SUITE", "L4") From 19a6d08b065ae8dfc5a4dfbe0b7028de53dc0786 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 13 Dec 2024 00:33:12 +0000 Subject: [PATCH 06/19] cleanup Signed-off-by: Rui Qiao --- vllm/v1/engine/llm_engine.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index fc8889ccb6de..f4719b319948 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -22,7 +22,6 @@ from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor -from vllm.v1.executor.ray_executor import RayExecutor logger = init_logger(__name__) @@ -114,7 +113,8 @@ def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: vllm_config.parallel_config.distributed_executor_backend) if distributed_executor_backend == "ray": initialize_ray_cluster(vllm_config.parallel_config) - return RayExecutor + from vllm.v1.executor.ray_executor import RayExecutor + executor_class = RayExecutor elif distributed_executor_backend == "mp": from vllm.v1.executor.multiproc_executor import MultiprocExecutor executor_class = MultiprocExecutor @@ -125,9 +125,6 @@ def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: return executor_class - def stop_remote_worker_execution_loop(self) -> None: - raise NotImplementedError("TP not implemented yet.") - def get_num_unfinished_requests(self) -> int: return self.detokenizer.get_num_unfinished_requests() From fd75e6e9a6c1c53405735c2f8522ac9b0644690b Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 13 Dec 2024 01:47:42 +0000 Subject: [PATCH 07/19] cleanup Signed-off-by: Rui Qiao --- vllm/v1/engine/llm_engine.py | 2 +- vllm/v1/executor/ray_executor.py | 56 ++++++++++++---------------- vllm/v1/executor/ray_utils.py | 48 ------------------------ vllm/v1/executor/uniproc_executor.py | 25 +------------ vllm/v1/worker/gpu_worker.py | 2 - 5 files changed, 25 insertions(+), 108 deletions(-) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index f4719b319948..9ad51575b3cc 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -6,7 +6,6 @@ from vllm.engine.arg_utils import EngineArgs from vllm.engine.metrics_types import StatLoggerBase from vllm.envs import VLLM_ENABLE_V1_MULTIPROCESSING -from vllm.executor.ray_utils import initialize_ray_cluster from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType from vllm.logger import init_logger from vllm.lora.request import LoRARequest @@ -22,6 +21,7 @@ from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor +from vllm.v1.executor.ray_utils import initialize_ray_cluster logger = init_logger(__name__) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index ca99049f023a..d28d8f6af464 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -144,25 +144,18 @@ def sort_by_driver_then_worker_ip(worker): "`HOST_IP` environment variable, make sure it is unique for" " each node.") - # VLLM_INSTANCE_ID = get_vllm_instance_id() - # Set environment variables for the driver and workers. - all_args_to_update_environment_variables = [ - ( - { - "CUDA_VISIBLE_DEVICES": - ",".join(map(str, node_gpus[node_id])), - # "VLLM_INSTANCE_ID": - # VLLM_INSTANCE_ID, - "VLLM_TRACE_FUNCTION": - str(envs.VLLM_TRACE_FUNCTION), - "VLLM_USE_V1": - str(int(envs.VLLM_USE_V1)), - **({ - "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND - } if envs.VLLM_ATTENTION_BACKEND is not None else {}) - }, ) for (node_id, _) in worker_node_and_gpu_ids - ] + all_args_to_update_environment_variables = [({ + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[node_id])), + "VLLM_TRACE_FUNCTION": + str(envs.VLLM_TRACE_FUNCTION), + "VLLM_USE_V1": + str(int(envs.VLLM_USE_V1)), + **({ + "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND + } if envs.VLLM_ATTENTION_BACKEND is not None else {}) + }, ) for (node_id, _) in worker_node_and_gpu_ids] self._env_vars_for_all_workers = ( all_args_to_update_environment_variables) @@ -209,7 +202,9 @@ def _get_worker_kwargs( local_rank: int = 0, rank: int = 0, distributed_init_method: Optional[str] = None) -> Dict[str, Any]: - """Return worker init args for a given rank.""" + """ + Return worker init args for a given rank. + """ if distributed_init_method is None: distributed_init_method = get_distributed_init_method( get_ip(), get_open_port()) @@ -231,10 +226,13 @@ def _get_worker_wrapper_args(self) -> Dict[str, Any]: ) def determine_num_available_blocks(self) -> Tuple[int, int]: - """Determine the number of available KV blocks. + """ + Determine the number of available KV blocks. + This invokes `determine_num_available_blocks` on each worker and takes the min of the results, guaranteeing that the selected cache sizes are compatible with all workers. + Returns: - tuple[num_gpu_blocks, num_cpu_blocks] """ @@ -248,7 +246,8 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: return num_gpu_blocks, 0 def initialize(self, num_gpu_blocks: int) -> None: - """Initialize the KV cache in all workers. + """ + Initialize the KV cache in all workers. """ # NOTE: This is logged in the executor because there can be >1 worker # with other executors. We could log in the engine level, but work @@ -257,17 +256,6 @@ def initialize(self, num_gpu_blocks: int) -> None: self._run_workers("initialize_cache", num_gpu_blocks) self._run_workers("compile_or_warm_up_model") - def save_sharded_state( - self, - path: str, - pattern: Optional[str] = None, - max_size: Optional[int] = None, - ) -> None: - self._run_workers("save_sharded_state", - path=path, - pattern=pattern, - max_size=max_size) - def _run_workers( self, method: str, @@ -276,8 +264,10 @@ def _run_workers( all_kwargs: Optional[List[Dict[str, Any]]] = None, **kwargs, ) -> Any: - """Runs the given method on all workers. Can be used in the following + """ + Runs the given method on all workers. Can be used in the following ways: + Args: - async_run_tensor_parallel_workers_only: If True the method will be run only in the remote TP workers, not the driver worker. diff --git a/vllm/v1/executor/ray_utils.py b/vllm/v1/executor/ray_utils.py index e2ae367d0d5a..a2d958053af5 100644 --- a/vllm/v1/executor/ray_utils.py +++ b/vllm/v1/executor/ray_utils.py @@ -55,16 +55,10 @@ def init_worker(self, *args, **kwargs): Here we inject some common logic before initializing the worker. Arguments are passed to the worker class constructor. """ - # TODO(sang): Enable it - # enable_trace_function_call_for_thread() # see https://github.com/NVIDIA/nccl/issues/1234 os.environ['NCCL_CUMEM_ENABLE'] = '0' - # TODO(sang): Enable it - # from vllm.plugins import load_general_plugins - # load_general_plugins() - mod = importlib.import_module(self.worker_module_name) worker_class = getattr(mod, self.worker_class_name) @@ -237,23 +231,6 @@ def _wait_until_pg_ready(current_placement_group: "PlacementGroup"): ) from None -def _wait_until_pg_removed(current_placement_group: "PlacementGroup"): - ray.util.remove_placement_group(current_placement_group) - s = time.time() - wait_interval = 10 - while time.time() - s < PG_WAIT_TIMEOUT: - pg = ray.util.get_current_placement_group() - if pg is None: - break - - # Exponential backoff for warning print. - wait_interval *= 2 - logger.info( - "Waiting for removing a placement group of specs for " - "%d seconds.", int(time.time() - s)) - time.sleep(wait_interval) - - def initialize_ray_cluster( parallel_config: ParallelConfig, ray_address: Optional[str] = None, @@ -348,28 +325,3 @@ def initialize_ray_cluster( _verify_bundles(current_placement_group, parallel_config, device_str) # Set the placement group in the parallel config parallel_config.placement_group = current_placement_group - - -def get_num_tpu_nodes() -> int: - from ray._private.accelerators import TPUAcceleratorManager - cluster_resources = ray.cluster_resources() - total_tpus = int(cluster_resources["TPU"]) - tpus_per_node = TPUAcceleratorManager.get_current_node_num_accelerators() - assert total_tpus % tpus_per_node == 0 - return total_tpus // tpus_per_node - - -def get_num_nodes_in_placement_group() -> int: - pg_table = ray.util.placement_group_table() - current_pg = ray.util.get_current_placement_group() - num_nodes = 0 - - if current_pg: - nodes_in_pg = set() - for pg_key, pg in pg_table.items(): - if pg_key == current_pg.id.hex(): - for _, node in pg["bundles_to_node_id"].items(): - nodes_in_pg.add(node) - num_nodes = len(nodes_in_pg) - - return num_nodes diff --git a/vllm/v1/executor/uniproc_executor.py b/vllm/v1/executor/uniproc_executor.py index 7a7a4cbd4656..be058318de58 100644 --- a/vllm/v1/executor/uniproc_executor.py +++ b/vllm/v1/executor/uniproc_executor.py @@ -1,5 +1,5 @@ import os -from typing import Any, Callable, Dict, Optional, Tuple, Type +from typing import Optional, Tuple from vllm.config import VllmConfig from vllm.logger import init_logger @@ -7,7 +7,6 @@ from vllm.v1.executor.abstract import Executor from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.worker.gpu_worker import Worker -from vllm.worker.worker_base import WorkerBase logger = init_logger(__name__) @@ -83,25 +82,3 @@ def check_health(self) -> None: # UniprocExecutor will always be healthy as long as # it's running. return - - def _get_worker_module_and_class( - self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: - worker_module_name = "vllm.v1.worker.gpu_worker" - worker_class_name = "Worker" - return worker_module_name, worker_class_name - - def _get_worker_kwargs( - self, - local_rank: int = 0, - rank: int = 0, - distributed_init_method: Optional[str] = None) -> Dict[str, Any]: - """Return worker init args for a given rank.""" - if distributed_init_method is None: - distributed_init_method = get_distributed_init_method( - get_ip(), get_open_port()) - return dict( - vllm_config=self.vllm_config, - local_rank=local_rank, - rank=rank, - distributed_init_method=distributed_init_method, - ) diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index 11a2805f4700..0000b09bfaa3 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -93,7 +93,6 @@ def initialize(self): gc.collect() torch.cuda.empty_cache() self.init_gpu_memory = torch.cuda.mem_get_info()[0] - self.model_runner = GPUModelRunner(self.vllm_config, self.device) else: raise RuntimeError( f"Not support device type: {self.device_config.device}") @@ -203,7 +202,6 @@ def execute_model( ) -> ModelRunnerOutput: output = self.model_runner.execute_model(scheduler_output) return output if self.rank == 0 else None - return output def profile(self, is_start: bool = True): if self.profiler is None: From bcecb8b907ed0f18e26e91422fcce4e0caf23da8 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 13 Dec 2024 02:27:08 +0000 Subject: [PATCH 08/19] up Signed-off-by: Rui Qiao --- .../test_basic_correctness.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index b195af68f4c9..9ff353248030 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -98,14 +98,14 @@ def test_models( "model, distributed_executor_backend, attention_backend, " "test_suite", [ - # ("facebook/opt-125m", "ray", "", "L4"), - # ("facebook/opt-125m", "mp", "", "L4"), + ("facebook/opt-125m", "ray", "", "L4"), + ("facebook/opt-125m", "mp", "", "L4"), ("meta-llama/Llama-2-7b-hf", "ray", "", "L4"), - # ("meta-llama/Llama-2-7b-hf", "mp", "", "L4"), - # ("facebook/opt-125m", "ray", "", "A100"), - # ("facebook/opt-125m", "mp", "", "A100"), - # ("facebook/opt-125m", "mp", "FLASHINFER", "A100"), - # ("meta-llama/Meta-Llama-3-8B", "ray", "FLASHINFER", "A100"), + ("meta-llama/Llama-2-7b-hf", "mp", "", "L4"), + ("facebook/opt-125m", "ray", "", "A100"), + ("facebook/opt-125m", "mp", "", "A100"), + ("facebook/opt-125m", "mp", "FLASHINFER", "A100"), + ("meta-llama/Meta-Llama-3-8B", "ray", "FLASHINFER", "A100"), ]) def test_models_distributed( hf_runner, @@ -128,11 +128,6 @@ def test_models_distributed( if attention_backend: os.environ["VLLM_ATTENTION_BACKEND"] = attention_backend - # Import VLLM_USE_V1 dynamically to handle patching - from vllm.envs import VLLM_USE_V1 - if not VLLM_USE_V1 or distributed_executor_backend == "mp": - pytest.skip(f"Skip {distributed_executor_backend} for V0") - dtype = "half" max_tokens = 5 From 37bd5c46ca13deda6b360178531b7e82889b54d2 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 13 Dec 2024 02:33:59 +0000 Subject: [PATCH 09/19] up Signed-off-by: Rui Qiao --- tests/basic_correctness/test_basic_correctness.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index 9ff353248030..1c2193bb17a5 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -96,8 +96,7 @@ def test_models( @multi_gpu_test(num_gpus=2) @pytest.mark.parametrize( "model, distributed_executor_backend, attention_backend, " - "test_suite", - [ + "test_suite", [ ("facebook/opt-125m", "ray", "", "L4"), ("facebook/opt-125m", "mp", "", "L4"), ("meta-llama/Llama-2-7b-hf", "ray", "", "L4"), From b46ee5538bd36882f0579bacdc9f08a34d1da8b5 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 13 Dec 2024 03:43:37 +0000 Subject: [PATCH 10/19] up Signed-off-by: Rui Qiao --- vllm/distributed/parallel_state.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 23647503033c..5b9236f8c56b 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -247,9 +247,9 @@ def __init__( from vllm.distributed.device_communicators.shm_broadcast import ( MessageQueue) self.mq_broadcaster: Optional[MessageQueue] = None - # if use_message_queue_broadcaster and self.world_size > 1: - # self.mq_broadcaster = MessageQueue.create_from_process_group( - # self.cpu_group, 1 << 22, 6) + if use_message_queue_broadcaster and self.world_size > 1: + self.mq_broadcaster = MessageQueue.create_from_process_group( + self.cpu_group, 1 << 22, 6) @property def first_rank(self): From 0e9fe0ff7d4b5db5ca0aa4f7a2c74a44aaeffa40 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Sun, 15 Dec 2024 00:27:08 +0000 Subject: [PATCH 11/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 2 +- vllm/v1/executor/ray_utils.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index d28d8f6af464..9439fd966f0c 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -320,7 +320,7 @@ def shutdown(self): self.forward_dag = None def check_health(self) -> None: - raise NotImplementedError + logger.debug("Called check_health.") def _check_ray_compiled_graph_installation(self): # TODO: We should check versions that support compiled graph. diff --git a/vllm/v1/executor/ray_utils.py b/vllm/v1/executor/ray_utils.py index a2d958053af5..c6316103c42c 100644 --- a/vllm/v1/executor/ray_utils.py +++ b/vllm/v1/executor/ray_utils.py @@ -142,7 +142,9 @@ def ray_is_available() -> bool: def assert_ray_available(): - """Raise an exception if Ray is not available.""" + """ + Raise an exception if Ray is not available. + """ if ray is None: raise ValueError("Failed to import Ray, please install Ray with " "`pip install ray`.") from ray_import_err @@ -150,11 +152,17 @@ def assert_ray_available(): def _verify_bundles(placement_group: "PlacementGroup", parallel_config: ParallelConfig, device_str: str): - """Verify a given placement group has bundles located in the right place. + """ + Verify a given placement group has bundles located in the right place. There are 2 rules. - Warn if all tensor parallel workers cannot fit in a single node. - Fail if driver node is not included in a placement group. + + Args: + placement_group: The placement group to verify. + parallel_config: The parallel configuration. + device_str: The required device. """ assert ray.is_initialized(), ( "Ray is not initialized although distributed-executor-backend is ray.") From 3e7e60da4419f63e415885f8de6818067d81d223 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Sun, 15 Dec 2024 01:34:31 +0000 Subject: [PATCH 12/19] mypy Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 32 +++++++++++++++++++++++--------- vllm/v1/executor/ray_utils.py | 2 ++ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index 9439fd966f0c..f283bc377d0e 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -1,16 +1,15 @@ import os from collections import defaultdict from itertools import islice, repeat -from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, - Type) +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import vllm.envs as envs from vllm.config import VllmConfig from vllm.logger import init_logger from vllm.utils import get_distributed_init_method, get_ip, get_open_port +from vllm.v1.executor.abstract import Executor from vllm.v1.executor.ray_utils import RayWorkerWrapper, ray from vllm.v1.outputs import ModelRunnerOutput -from vllm.worker.worker_base import WorkerBase if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -21,7 +20,7 @@ logger = init_logger(__name__) -class RayExecutor: +class RayExecutor(Executor): def __init__(self, vllm_config: VllmConfig) -> None: self.vllm_config = vllm_config @@ -188,11 +187,25 @@ def sort_by_driver_then_worker_ip(worker): self._run_workers("initialize") self._run_workers("load_model") + def _configure_ray_workers_use_nsight(self, + ray_remote_kwargs) -> Dict[str, Any]: + # If nsight profiling is enabled, we need to set the profiling + # configuration for the ray workers as runtime env. + runtime_env = ray_remote_kwargs.setdefault("runtime_env", {}) + runtime_env.update({ + "nsight": { + "t": "cuda,cudnn,cublas", + "o": "'worker_process_%p'", + "cuda-graph-trace": "node", + } + }) + + return ray_remote_kwargs + def _get_env_vars_to_be_updated(self): return self._env_vars_for_all_workers - def _get_worker_module_and_class( - self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]: + def _get_worker_module_and_class(self) -> Tuple[str, str]: worker_module_name = "vllm.v1.worker.gpu_worker" worker_class_name = "Worker" return worker_module_name, worker_class_name @@ -286,7 +299,8 @@ def _run_workers( # Start the ray workers first. ray_workers = self.workers ray_worker_outputs = [ - worker.execute_method.remote(method, *worker_args, **worker_kwargs) + worker.execute_method.remote( # type: ignore[attr-defined] + method, *worker_args, **worker_kwargs) for (worker, worker_args, worker_kwargs ) in zip(ray_workers, all_worker_args, all_worker_kwargs) ] @@ -345,8 +359,8 @@ def _compiled_ray_dag(self): with InputNode() as input_batches: outputs = [ - worker.execute_model.bind(input_batches) - for worker in self.workers + worker.execute_model.bind( # type: ignore[attr-defined] + input_batches) for worker in self.workers ] forward_dag = MultiOutputNode(outputs) diff --git a/vllm/v1/executor/ray_utils.py b/vllm/v1/executor/ray_utils.py index c6316103c42c..4e54c240ec13 100644 --- a/vllm/v1/executor/ray_utils.py +++ b/vllm/v1/executor/ray_utils.py @@ -116,6 +116,7 @@ def setup_device_if_necessary(self): # device. # We can remove this API after it is fixed in compiled graph. import torch + assert self.worker is not None, "Worker is not initialized" if not self.compiled_dag_cuda_device_set: torch.cuda.set_device(self.worker.device) self.compiled_dag_cuda_device_set = True @@ -125,6 +126,7 @@ def execute_model( scheduler_output: "SchedulerOutput", ) -> ModelRunnerOutput: self.setup_device_if_necessary() + assert self.worker is not None, "Worker is not initialized" output = self.worker.model_runner.execute_model(scheduler_output) return output From a2db3f8e9f422246390bd5fda698fe685d461e40 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 17 Dec 2024 06:33:43 +0000 Subject: [PATCH 13/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index f283bc377d0e..6ddd7610154f 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -50,14 +50,6 @@ def _init_executor(self) -> None: def _init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): - if (self.parallel_config.tensor_parallel_size == 1 - and self.parallel_config.pipeline_parallel_size == 1): - # For single GPU case, we use a ray worker with constrained memory. - num_gpus = self.cache_config.gpu_memory_utilization - else: - # Otherwise, the ray workers are allocated with a full GPU. - num_gpus = 1 - # A list of workers to run a model. self.workers: List[RayWorkerWrapper] = [] if self.parallel_config.ray_workers_use_nsight: @@ -78,7 +70,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", worker = ray.remote( num_cpus=0, - num_gpus=num_gpus, + num_gpus=1, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerWrapper).remote(**worker_wrapper_kwargs) From 7909cf126e05972896841cf68e6b1110420c0c28 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 17 Dec 2024 16:19:37 +0000 Subject: [PATCH 14/19] up Signed-off-by: Rui Qiao --- tests/basic_correctness/test_basic_correctness.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index 1c2193bb17a5..9e4eb16fc6cc 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -127,6 +127,11 @@ def test_models_distributed( if attention_backend: os.environ["VLLM_ATTENTION_BACKEND"] = attention_backend + # Import VLLM_USE_V1 dynamically to handle patching + from vllm.envs import VLLM_USE_V1 + if VLLM_USE_V1 and distributed_executor_backend != "mp": + os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0" + dtype = "half" max_tokens = 5 From 3dfb1ea5c4725e6952caabf3a33686a4dd338980 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 17 Dec 2024 16:35:35 +0000 Subject: [PATCH 15/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index 6ddd7610154f..a6dea5f155ed 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -329,12 +329,20 @@ def check_health(self) -> None: logger.debug("Called check_health.") def _check_ray_compiled_graph_installation(self): - # TODO: We should check versions that support compiled graph. + import pkg_resources + from packaging import version + + required_version = version.parse("2.39") + current_version = version.parse( + pkg_resources.get_distribution("ray").version) + if current_version < required_version: + raise ValueError(f"Ray version {required_version} is " + f"required, but found {current_version}") + import importlib.util - adag_spec = importlib.util.find_spec( - "ray.experimental.compiled_dag_ref") - if adag_spec is None: - raise ValueError("Ray accelerated DAG is not installed. " + raycg = importlib.util.find_spec("ray.experimental.compiled_dag_ref") + if raycg is None: + raise ValueError("Ray Compiled Graph is not installed. " "Run `pip install ray[adag]` to install it.") cupy_spec = importlib.util.find_spec("cupy") From aa2e4f80297d847eed90cd97ac726c5f93e7e1cb Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Wed, 18 Dec 2024 21:43:54 +0000 Subject: [PATCH 16/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 40 ++++++++------------------------ 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index a6dea5f155ed..c7c70d4ba45b 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -24,27 +24,16 @@ class RayExecutor(Executor): def __init__(self, vllm_config: VllmConfig) -> None: self.vllm_config = vllm_config - self.model_config = vllm_config.model_config - self.cache_config = vllm_config.cache_config - self.lora_config = vllm_config.lora_config - self.load_config = vllm_config.load_config self.parallel_config = vllm_config.parallel_config - self.scheduler_config = vllm_config.scheduler_config - self.device_config = vllm_config.device_config - self.speculative_config = vllm_config.speculative_config - self.prompt_adapter_config = vllm_config.prompt_adapter_config - self.observability_config = vllm_config.observability_config - self._init_executor() - - def _init_executor(self) -> None: + self.model_config = vllm_config.model_config self.forward_dag: Optional[ray.dag.CompiledDAG] = None - placement_group = self.parallel_config.placement_group # Disable Ray usage stats collection. ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") if ray_usage != "1": os.environ["RAY_USAGE_STATS_ENABLED"] = "0" + placement_group = self.parallel_config.placement_group # Create the parallel GPU workers. self._init_workers_ray(placement_group) @@ -85,6 +74,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", for ip in worker_ips: ip_counts[ip] = ip_counts.get(ip, 0) + 1 + worker_to_ip = dict(zip(self.workers, worker_ips)) + def sort_by_driver_then_worker_ip(worker): """ Sort the workers based on 3 properties: @@ -95,7 +86,7 @@ def sort_by_driver_then_worker_ip(worker): 3. Finally, if the work is on a node with smaller IP address, it should be placed first. """ - ip = ray.get(worker.get_node_ip.remote()) + ip = worker_to_ip[worker] return (ip != driver_ip, ip_counts[ip], ip) # After sorting, the workers on the same node will be @@ -274,10 +265,6 @@ def _run_workers( ways: Args: - - async_run_tensor_parallel_workers_only: If True the method will be - run only in the remote TP workers, not the driver worker. - It will also be run asynchronously and return a list of futures - rather than blocking on the results. - args/kwargs: All workers share the same args/kwargs - all_args/all_kwargs: args/kwargs for each worker are specified individually @@ -288,20 +275,13 @@ def _run_workers( all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \ else islice(all_kwargs, 0, None) - # Start the ray workers first. - ray_workers = self.workers - ray_worker_outputs = [ + ray_worker_refs = [ worker.execute_method.remote( # type: ignore[attr-defined] method, *worker_args, **worker_kwargs) for (worker, worker_args, worker_kwargs - ) in zip(ray_workers, all_worker_args, all_worker_kwargs) + ) in zip(self.workers, all_worker_args, all_worker_kwargs) ] - - # Get the results of the ray workers. - if self.workers: - ray_worker_outputs = ray.get(ray_worker_outputs) - - return ray_worker_outputs + return ray.get(ray_worker_refs) def execute_model( self, @@ -309,8 +289,8 @@ def execute_model( ) -> ModelRunnerOutput: if self.forward_dag is None: self.forward_dag = self._compiled_ray_dag() - # All workers are supposed to produce the same output. Only - # get the first output. + # Only the first worker (with rank 0) returns the execution result. + # Others return None. output = ray.get(self.forward_dag.execute(scheduler_output))[0] return output From 8d362f7861f9d1406b5b1332bc691dc12ce1b1e8 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Thu, 19 Dec 2024 22:10:30 +0000 Subject: [PATCH 17/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index c7c70d4ba45b..463dc6060143 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -50,6 +50,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", worker_wrapper_kwargs = self._get_worker_wrapper_args() for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): + # Skip bundles that don't have GPUs, + # as each worker needs one GPU. continue scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=placement_group, @@ -84,7 +86,8 @@ def sort_by_driver_then_worker_ip(worker): 2. Then, if the worker is on a node with fewer workers, it should be placed first. 3. Finally, if the work is on a node with smaller IP address, it - should be placed first. + should be placed first. This is simply a tiebreaker to make + sure the workers are sorted in a deterministic way. """ ip = worker_to_ip[worker] return (ip != driver_ip, ip_counts[ip], ip) @@ -239,7 +242,9 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: # number of blocks across all workers to make sure all the memory # operators can be applied to all workers. num_gpu_blocks = min(b[0] for b in num_blocks) - return num_gpu_blocks, 0 + num_cpu_blocks = min(b[1] for b in num_blocks) + + return num_gpu_blocks, num_cpu_blocks def initialize(self, num_gpu_blocks: int) -> None: """ From 090caa06138316c64ee17d48c1813ebca3ef56ae Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 20 Dec 2024 21:38:42 +0000 Subject: [PATCH 18/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 19 +-------- vllm/v1/executor/ray_utils.py | 70 +------------------------------- 2 files changed, 4 insertions(+), 85 deletions(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index 463dc6060143..209eb48c9f54 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -47,7 +47,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Create the workers. driver_ip = get_ip() - worker_wrapper_kwargs = self._get_worker_wrapper_args() + # worker_wrapper_kwargs = self._get_worker_wrapper_args() for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): # Skip bundles that don't have GPUs, @@ -64,7 +64,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_gpus=1, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerWrapper).remote(**worker_wrapper_kwargs) + )(RayWorkerWrapper).remote(vllm_config=self.vllm_config) self.workers.append(worker) logger.debug("workers: %s", self.workers) @@ -191,11 +191,6 @@ def _configure_ray_workers_use_nsight(self, def _get_env_vars_to_be_updated(self): return self._env_vars_for_all_workers - def _get_worker_module_and_class(self) -> Tuple[str, str]: - worker_module_name = "vllm.v1.worker.gpu_worker" - worker_class_name = "Worker" - return worker_module_name, worker_class_name - def _get_worker_kwargs( self, local_rank: int = 0, @@ -214,16 +209,6 @@ def _get_worker_kwargs( distributed_init_method=distributed_init_method, ) - def _get_worker_wrapper_args(self) -> Dict[str, Any]: - worker_module_name, worker_class_name = ( - self._get_worker_module_and_class()) - - return dict( - worker_module_name=worker_module_name, - worker_class_name=worker_class_name, - trust_remote_code=self.model_config.trust_remote_code, - ) - def determine_num_available_blocks(self) -> Tuple[int, int]: """ Determine the number of available KV blocks. diff --git a/vllm/v1/executor/ray_utils.py b/vllm/v1/executor/ray_utils.py index 4e54c240ec13..7733610e59c7 100644 --- a/vllm/v1/executor/ray_utils.py +++ b/vllm/v1/executor/ray_utils.py @@ -1,5 +1,3 @@ -import importlib -import os import time from collections import defaultdict from typing import TYPE_CHECKING, Dict, List, Optional, Tuple @@ -7,8 +5,9 @@ from vllm.config import ParallelConfig from vllm.logger import init_logger from vllm.platforms import current_platform -from vllm.utils import get_ip, update_environment_variables +from vllm.utils import get_ip from vllm.v1.outputs import ModelRunnerOutput +from vllm.worker.worker_base import WorkerWrapperBase if TYPE_CHECKING: from vllm.v1.core.scheduler import SchedulerOutput @@ -16,71 +15,6 @@ logger = init_logger(__name__) PG_WAIT_TIMEOUT = 60 - -class WorkerWrapperBase: - """ - The whole point of this class is to lazily initialize the worker. - We first instantiate the WorkerWrapper, which remembers the worker module - and class name. Then, when we call `update_environment_variables`, and the - real initialization happens in `init_worker`. - - Otherwise, the worker class will be obtained by dynamically importing it - using worker_module_name and worker_class_name. - """ - - def __init__(self, - worker_module_name: str, - worker_class_name: str, - trust_remote_code: bool = False) -> None: - self.worker_module_name = worker_module_name - self.worker_class_name = worker_class_name - self.worker = None - - if trust_remote_code: - # note: lazy import to avoid importing torch before initializing - from vllm.utils import init_cached_hf_modules - init_cached_hf_modules() - - @staticmethod - def update_environment_variables(envs: Dict[str, str]) -> None: - key = 'CUDA_VISIBLE_DEVICES' - if key in envs and key in os.environ: - # overwriting CUDA_VISIBLE_DEVICES is desired behavior - # suppress the warning in `update_environment_variables` - del os.environ[key] - update_environment_variables(envs) - - def init_worker(self, *args, **kwargs): - """ - Here we inject some common logic before initializing the worker. - Arguments are passed to the worker class constructor. - """ - - # see https://github.com/NVIDIA/nccl/issues/1234 - os.environ['NCCL_CUMEM_ENABLE'] = '0' - - mod = importlib.import_module(self.worker_module_name) - worker_class = getattr(mod, self.worker_class_name) - - self.worker = worker_class(*args, **kwargs) - assert self.worker is not None - - def execute_method(self, method, *args, **kwargs): - try: - target = self if self.worker is None else self.worker - executor = getattr(target, method) - return executor(*args, **kwargs) - except Exception as e: - # if the driver worker also execute methods, - # exceptions in the rest worker may cause deadlock in rpc like ray - # see https://github.com/vllm-project/vllm/issues/3455 - # print the error and inform the user to solve the error - msg = (f"Error executing method {method}. " - "This might cause deadlock in distributed execution.") - logger.exception(msg) - raise e - - try: import ray from ray.util import placement_group_table From 33e559998b0e3fdc1e0232f2bc1dc49fec7914f1 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 20 Dec 2024 21:40:57 +0000 Subject: [PATCH 19/19] up Signed-off-by: Rui Qiao --- vllm/v1/executor/ray_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/v1/executor/ray_executor.py b/vllm/v1/executor/ray_executor.py index 209eb48c9f54..dfeb69fa701a 100644 --- a/vllm/v1/executor/ray_executor.py +++ b/vllm/v1/executor/ray_executor.py @@ -47,7 +47,6 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Create the workers. driver_ip = get_ip() - # worker_wrapper_kwargs = self._get_worker_wrapper_args() for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): # Skip bundles that don't have GPUs,