From 3c005530aeed1df3b32a197246b3c27d6e15d1e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Mon, 22 Jul 2024 16:23:53 +0200 Subject: [PATCH 1/7] Add `_NoDaemonPool` class --- src/distilabel/pipeline/local.py | 42 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/src/distilabel/pipeline/local.py b/src/distilabel/pipeline/local.py index 9285a0f218..b7bb3a8c70 100644 --- a/src/distilabel/pipeline/local.py +++ b/src/distilabel/pipeline/local.py @@ -15,7 +15,8 @@ import multiprocessing as mp import signal import sys -from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Union, cast +from multiprocessing.pool import Pool +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Optional, Union, cast import tblib @@ -48,6 +49,40 @@ def _init_worker(log_queue: "Queue[Any]") -> None: setup_logging(log_queue) +# We create a custom `Pool` class so the created processes are not daemons, allowing +# them to created child processes if necessary (for example when using `vLLM` with `tensor_parallel_size`) +# https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic +class _NoDaemonProcess(mp.Process): + @property + def daemon(self) -> bool: + return False + + @daemon.setter + def daemon(self, value: bool) -> None: # type: ignore + pass + + +class _NoDaemonContext(type(mp.get_context())): + Process = _NoDaemonProcess + + +class _NoDaemonPool(Pool): + def __init__( + self, + processes: int | None = None, + initializer: Callable[..., object] | None = None, + initargs: Iterable[Any] = ..., # type: ignore + maxtasksperchild: int | None = None, + ) -> None: + super().__init__( + processes=processes, + initializer=initializer, + initargs=initargs, + maxtasksperchild=maxtasksperchild, + context=_NoDaemonContext(), # type: ignore + ) + + class Pipeline(BasePipeline): """Local pipeline implementation using `multiprocessing`.""" @@ -133,10 +168,9 @@ def run( return distiset num_processes = self.dag.get_total_replica_count() - ctx = mp.get_context() # type: ignore with ( - ctx.Manager() as manager, - ctx.Pool( + mp.Manager() as manager, + _NoDaemonPool( num_processes, initializer=_init_worker, initargs=(self._log_queue,), From 8e3e2eba38c1c1274ed62c8b3e6820e051d99eb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Mon, 22 Jul 2024 16:41:15 +0200 Subject: [PATCH 2/7] Use `Union` --- src/distilabel/pipeline/local.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/distilabel/pipeline/local.py b/src/distilabel/pipeline/local.py index b7bb3a8c70..2c3a865e2a 100644 --- a/src/distilabel/pipeline/local.py +++ b/src/distilabel/pipeline/local.py @@ -69,10 +69,10 @@ class _NoDaemonContext(type(mp.get_context())): class _NoDaemonPool(Pool): def __init__( self, - processes: int | None = None, - initializer: Callable[..., object] | None = None, + processes: Union[int, None] = None, + initializer: Union[Callable[..., object], None] = None, initargs: Iterable[Any] = ..., # type: ignore - maxtasksperchild: int | None = None, + maxtasksperchild: Union[int, None] = None, ) -> None: super().__init__( processes=processes, From 1d26533151e8b4f35ab72be1edbc1b50d5fab612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 11:39:26 +0200 Subject: [PATCH 3/7] Update src/distilabel/pipeline/local.py Co-authored-by: Agus --- src/distilabel/pipeline/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/distilabel/pipeline/local.py b/src/distilabel/pipeline/local.py index 2c3a865e2a..2142eedfa2 100644 --- a/src/distilabel/pipeline/local.py +++ b/src/distilabel/pipeline/local.py @@ -50,7 +50,7 @@ def _init_worker(log_queue: "Queue[Any]") -> None: # We create a custom `Pool` class so the created processes are not daemons, allowing -# them to created child processes if necessary (for example when using `vLLM` with `tensor_parallel_size`) +# them to create child processes if necessary (for example when using `vLLM` with `tensor_parallel_size`) # https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic class _NoDaemonProcess(mp.Process): @property From 550d7a06a7193a3c9a46d2baa074ca85037da452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 11:52:23 +0200 Subject: [PATCH 4/7] Update dependency version to `vllm>=0.5.3` and add `setuptools` --- pyproject.toml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7593c1c6f5..6ec4a483bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,13 @@ openai = ["openai >= 1.0.0"] outlines = ["outlines >= 0.0.40"] ray = ["ray[default] >= 2.31.0"] vertexai = ["google-cloud-aiplatform >= 1.38.0"] -vllm = ["vllm >= 0.4.0", "outlines == 0.0.34", "filelock >= 3.13.4"] +vllm = [ + "vllm >= 0.5.3", + "outlines == 0.0.34", + "filelock >= 3.13.4", + # `setuptools` is needed to be installed if installed with `uv pip install distilabel[vllm]` + "setuptools", +] [project.urls] Documentation = "https://distilabel.argilla.io/" From 51e1adc9536e8ceb63ff766957f59fee71f2894d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 11:54:36 +0200 Subject: [PATCH 5/7] Remove pinned `outlines==0.34.0` --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6ec4a483bb..7c15678ce1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,7 +87,6 @@ ray = ["ray[default] >= 2.31.0"] vertexai = ["google-cloud-aiplatform >= 1.38.0"] vllm = [ "vllm >= 0.5.3", - "outlines == 0.0.34", "filelock >= 3.13.4", # `setuptools` is needed to be installed if installed with `uv pip install distilabel[vllm]` "setuptools", From 0afef6eedd1edbc40adc9d2a6543ad8f2c3138bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 15:27:04 +0200 Subject: [PATCH 6/7] Fix docstring --- src/distilabel/llms/huggingface/inference_endpoints.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/distilabel/llms/huggingface/inference_endpoints.py b/src/distilabel/llms/huggingface/inference_endpoints.py index 440a30377d..f2be978620 100644 --- a/src/distilabel/llms/huggingface/inference_endpoints.py +++ b/src/distilabel/llms/huggingface/inference_endpoints.py @@ -516,7 +516,7 @@ async def agenerate( # type: ignore input: a single input in chat format to generate responses for. max_new_tokens: the maximum number of new tokens that the model will generate. Defaults to `128`. - frequence_penalty: a value between `-2.0` and `2.0`. Positive values penalize + frequency_penalty: a value between `-2.0` and `2.0`. Positive values penalize new tokens based on their existing frequency in the text so far, decreasing model's likelihood to repeat the same line verbatim. Defauls to `None`. logit_bias: modify the likelihood of specified tokens appearing in the completion. @@ -545,8 +545,8 @@ async def agenerate( # type: ignore only if `tokenizer_id` is `None`. Defaults to `None`. top_p: the top-p value to use for the generation. Defaults to `1.0`. do_sample: whether to use sampling for the generation. This argument is exclusive - of the `text_generation` method and will be only used if `tokenizer_id` is not - `None`. Defaults to `False`. + of the `text_generation` method and will be only used if `tokenizer_id` is not + `None`. Defaults to `False`. repetition_penalty: the repetition penalty to use for the generation. This argument is exclusive of the `text_generation` method and will be only used if `tokenizer_id` is not `None`. Defaults to `None`. From c0ece5308c73128208719dcf5b652896a44e32cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 15:36:21 +0200 Subject: [PATCH 7/7] Add docs about `vLLM` with `ray` --- .../advanced/scaling_with_ray.md | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/docs/sections/how_to_guides/advanced/scaling_with_ray.md b/docs/sections/how_to_guides/advanced/scaling_with_ray.md index de7d8ceab6..0f6594ba8b 100644 --- a/docs/sections/how_to_guides/advanced/scaling_with_ray.md +++ b/docs/sections/how_to_guides/advanced/scaling_with_ray.md @@ -208,3 +208,36 @@ ray job submit --address http://localhost:8265 --working-dir ray-pipeline -- pyt 1. In this case, we just want two nodes: one to run the Ray head node and one to run a worker. 2. We just want to run a task per node i.e. the Ray command that starts the head/worker node. 3. We have selected 1 GPU per node, but we could have selected more depending on the pipeline. + +## `vLLM` and `tensor_parallel_size` + +In order to use `vLLM` multi-GPU and multi-node capabilities with `ray`, we need to do a few changes in the example pipeline from above. The first change needed is to specify a value for `tensor_parallel_size` aka "In how many GPUs do I want you to load the model", and the second one is to define `ray` as the `distributed_executor_backend` as the default one in `vLLM` is to use `multiprocessing`: + + +```python +with Pipeline(name="text-generation-ray-pipeline") as pipeline: + load_data_from_hub = LoadDataFromHub(output_mappings={"prompt": "instruction"}) + + text_generation = TextGeneration( + llm=vLLM( + model="meta-llama/Meta-Llama-3.1-70B-Instruct", + tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct", + extra_kwargs={ + "tensor_parallel_size": 8, + "distributed_executor_backend": "ray", + } + ) + ) + + load_data_from_hub >> text_generation +``` + +Finally, we need to define two environment variables in our `runtime_env.yaml` file: + +```yaml +env_vars: + VLLM_USE_RAY_COMPILED_DAG: "1" + VLLM_USE_RAY_SPMD_WORKER: "1" +``` + +More information about distributed inference with `vLLM` can be found here: [vLLM - Distributed Serving](https://docs.vllm.ai/en/latest/serving/distributed_serving.html)