From be61d20aae679a4856a7d4161805f47113ce1b99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 30 Jul 2024 14:56:16 +0200 Subject: [PATCH] Create `PlacementGroup` for steps using `vLLM` (#842) * Create placement group for `vLLM` * Use `SPREAD` if `pipeline_parallel_size>1` * Fix bundle initialization * Fix wrong dictionary * Remove using `SPMD` from ray docs * Refactor creating `PlacementGroup` for `vLLM` --- .../advanced/scaling_with_ray.md | 8 -- src/distilabel/pipeline/ray.py | 76 +++++++++++++++++-- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/docs/sections/how_to_guides/advanced/scaling_with_ray.md b/docs/sections/how_to_guides/advanced/scaling_with_ray.md index 0f6594ba8b..452a653369 100644 --- a/docs/sections/how_to_guides/advanced/scaling_with_ray.md +++ b/docs/sections/how_to_guides/advanced/scaling_with_ray.md @@ -232,12 +232,4 @@ with Pipeline(name="text-generation-ray-pipeline") as pipeline: load_data_from_hub >> text_generation ``` -Finally, we need to define two environment variables in our `runtime_env.yaml` file: - -```yaml -env_vars: - VLLM_USE_RAY_COMPILED_DAG: "1" - VLLM_USE_RAY_SPMD_WORKER: "1" -``` - More information about distributed inference with `vLLM` can be found here: [vLLM - Distributed Serving](https://docs.vllm.ai/en/latest/serving/distributed_serving.html) diff --git a/src/distilabel/pipeline/ray.py b/src/distilabel/pipeline/ray.py index 06d51e90e5..0b5094809c 100644 --- a/src/distilabel/pipeline/ray.py +++ b/src/distilabel/pipeline/ray.py @@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from distilabel.distiset import create_distiset +from distilabel.llms.vllm import vLLM from distilabel.pipeline.base import BasePipeline from distilabel.pipeline.constants import INPUT_QUEUE_ATTR_NAME from distilabel.pipeline.step_wrapper import _StepWrapper @@ -26,6 +27,8 @@ from os import PathLike from queue import Queue + from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + from distilabel.distiset import Distiset from distilabel.pipeline.typing import InputDataset from distilabel.steps.base import _Step @@ -69,6 +72,7 @@ def __init__( self._ray_head_node_url = ray_head_node_url self._ray_init_kwargs = ray_init_kwargs or {} + self._ray_node_ids = {} def run( self, @@ -171,6 +175,8 @@ def _init_ray(self) -> None: else: ray.init(**self._ray_init_kwargs) + self._ray_node_ids = {node["NodeID"]: False for node in ray.nodes()} + @property def QueueClass(self) -> Callable: from ray.util.queue import Queue @@ -218,17 +224,20 @@ def run(self) -> str: "name": f"distilabel-{self.name}-{step.name}-{replica}" } - if step.resources.cpus is not None: - resources["num_cpus"] = step.resources.cpus + if hasattr(step, "llm") and isinstance(step.llm, vLLM): # type: ignore + resources["scheduling_strategy"] = self._create_vllm_placement_group(step) + else: + if step.resources.cpus is not None: + resources["num_cpus"] = step.resources.cpus - if step.resources.gpus is not None: - resources["num_gpus"] = step.resources.gpus + if step.resources.gpus is not None: + resources["num_gpus"] = step.resources.gpus - if step.resources.memory is not None: - resources["memory"] = step.resources.memory + if step.resources.memory is not None: + resources["memory"] = step.resources.memory - if step.resources.resources is not None: - resources["resources"] = step.resources.resources + if step.resources.resources is not None: + resources["resources"] = step.resources.resources _StepWrapperRay = _StepWrapperRay.options(**resources) # type: ignore @@ -255,6 +264,57 @@ def run(self) -> str: ) step_wrapper.run.remote() + def _create_vllm_placement_group( + self, step: "_Step" + ) -> "PlacementGroupSchedulingStrategy": + """Creates a Ray placement group with as many GPU bundles as `tensor_parallel_size` + specified in the `vLLM` initialisation. The created placement group uses the `STRICT_PACK` + strategy if the `pipeline_parallel_size` is less or equal to 1, otherwise it uses + `SPREAD` (placement group with GPU bundles in several nodes). In addition, the created + placement group is targeted to be created in a specific node. This avoids having + `vLLM` raising the exception `Ray does not allocate any GPUs on the driver node...`, + as it assures that the driver `_StepWrapperRay` actor created resides in the same + node as the ray actors created by `vLLM` for the distributed inference. + + Args: + step: the step which uses `vLLM`. + + Returns: + A `PlacementGroupSchedulingStrategy` using the created `PlacementGroup`. + """ + import ray + + llm = step.llm # type: ignore + tensor_parallel_size = llm.extra_kwargs.get("tensor_parallel_size", 1) # type: ignore + pipeline_parallel_size = llm.extra_kwargs.get( # type: ignore + "pipeline_parallel_size", 1 + ) + + node_id = next( + node_id for node_id, used in self._ray_node_ids.items() if not used + ) + + self._ray_node_ids[node_id] = True + + # Create a placement group + pg = ray.util.placement_group( + # Create `tensor_parallel_size` GPU bundles and at least one CPU bundle + # so the actors can be scheduled and executed (1 CPU bundle can have infinite actors): + # https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources + bundles=[{"CPU": 1}] + [{"GPU": 1}] * tensor_parallel_size, + strategy="SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK", + _soft_target_node_id=node_id if pipeline_parallel_size is None else None, + ) + + self._logger.info( + f"Step '{step.name}' uses `vLLM`. Created a Ray placement group with bundle" + f" specs: {pg.bundle_specs}" + ) + + return ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( # type: ignore + placement_group=pg, + ) + def _teardown(self) -> None: """Clean/release/stop resources reserved to run the pipeline.""" if self._write_buffer: