Skip to content

Commit

Permalink
Create a GeneratorStep from a dataset using a helper function (#812)
Browse files Browse the repository at this point in the history
* Add helper function to create generator step from dataset

* Add integration tests for make_generator_step

* Redirect import

* Update LoadDataFromHub to not call load if a dataset is already defined

* Update docs

* Add unit tests for the new helper function

* Update filename to utils

* Add helper method to insert a root step

* Add logic to create a generator step internally from a dataset

* Pass the dataset variable from all the pipeline implementations

* Add type for the input datasets

* Avoid circular imports

* Add test for pipelines with generator step and dataset

* Add integration tests for dataset passed via run method

* Fix error evaluation dataframe

* Add example on quickstart and entry on how to guide

* Update docs/sections/getting_started/quickstart.md

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Update docs/sections/getting_started/quickstart.md

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Update src/distilabel/pipeline/base.py

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Update src/distilabel/pipeline/ray.py

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Update src/distilabel/steps/generators/utils.py

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Update src/distilabel/steps/generators/utils.py

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Update src/distilabel/pipeline/local.py

Co-authored-by: Gabriel Martín Blázquez <[email protected]>

* Respect import order

* Move functionality to a proper internal method

* Run linter

* Fix format

---------

Co-authored-by: Gabriel Martín Blázquez <[email protected]>
Co-authored-by: David Berenstein <[email protected]>
  • Loading branch information
3 people authored Jul 29, 2024
1 parent 25601bb commit fc7e82e
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docs/api/step/generator_step.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ This section contains the API reference for the [`GeneratorStep`][distilabel.ste
For more information and examples on how to use existing generator steps or create custom ones, please refer to [Tutorial - Step - GeneratorStep](../../sections/how_to_guides/basic/step/generator_step.md).

::: distilabel.steps.base.GeneratorStep

::: distilabel.steps.generators.utils.make_generator_step
28 changes: 28 additions & 0 deletions docs/sections/getting_started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,31 @@ if __name__ == "__main__":
7. We run the pipeline with the parameters for the `load_dataset` and `text_generation` steps. The `load_dataset` step will use the repository `distilabel-internal-testing/instruction-dataset-mini` and the `test` split, and the `text_generation` task will use the `generation_kwargs` with the `temperature` set to `0.7` and the `max_new_tokens` set to `512`.

8. Optionally, we can push the generated [`Distiset`][distilabel.distiset.Distiset] to the Hugging Face Hub repository `distilabel-example`. This will allow you to share the generated dataset with others and use it in other pipelines.

## Minimal example

`distilabel` gives a lot of flexibility to create your pipelines, but to start right away, you can omit a lot of the details and let default values:

```python
from distilabel.llms import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps.tasks import TextGeneration
from datasets import load_dataset


dataset = load_dataset("distilabel-internal-testing/instruction-dataset-mini", split="test")

with Pipeline() as pipeline: # (1)
TextGeneration(llm=InferenceEndpointsLLM(model_id="meta-llama/Meta-Llama-3.1-8B-Instruct")) # (2)


if __name__ == "__main__":
distiset = pipeline.run(dataset=dataset) # (3)
distiset.push_to_hub(repo_id="distilabel-example")
```

1. The [`Pipeline`][distilabel.pipeline.Pipeline] can take no arguments and generate a default name on it's own that will be tracked internally.

2. Just as with the [`Pipeline`][distilabel.pipeline.Pipeline], the [`Step`][distilabel.steps.base.Step]s don't explicitly need a name.

3. You can generate the dataset as you would normally do with Hugging Face and pass the dataset to the run method.
98 changes: 98 additions & 0 deletions docs/sections/how_to_guides/basic/pipeline/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,56 @@ with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
```

!!! Tip "Easily load your datasets"

If you are already used to work with Hugging Face's `Dataset` via `load_dataset` or `pd.DataFrame`, you can create the `GeneratorStep` directly from the dataset (or dataframe), and create the step with the help of [`make_generator_step`][distilabel.steps.generators.utils.make_generator_step]:

=== "From a list of dicts"

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = [{"instruction": "Tell me a joke."}]

with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
```

=== "From `datasets.Dataset`"

```python
from datasets import load_dataset
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = load_dataset(
"DIBT/10k_prompts_ranked",
split="train"
).filter(
lambda r: r["avg_rating"]>=4 and r["num_responses"]>=2
).select(range(500))

with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
```

=== "From `pd.DataFrame`"

```python
import pandas as pd
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = pd.read_csv("path/to/dataset.csv")

with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
```

Next, we will use `prompt` column from the dataset obtained through `LoadDataFromHub` and use several `LLM`s to execute a `TextGeneration` task. We will also use the `Task.connect()` method to connect the steps, so the output of one step is the input of the next one.

!!! NOTE
Expand Down Expand Up @@ -282,6 +332,54 @@ if __name__ == "__main__":
distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")
```

#### Pipeline.run with a dataset

Note that in most cases if you don't need the extra flexibility the [`GeneratorSteps`][distilabel.steps.base.GeneratorStep] bring you, you can create a dataset as you would normally do and pass it to the [Pipeline.run][distilabel.pipeline.base.BasePipeline.run] method directly. Look at the highlighted lines to see the updated lines:

```python hl_lines="11-14 33 38"
import random
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
return random.sample(steps, 2)

dataset = load_dataset(
"distilabel-internal-testing/instruction-dataset-mini",
split="test"
)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)

combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)

sample_two_steps >> tasks >> combine_generations


if __name__ == "__main__":
distiset = pipeline.run(
dataset=dataset,
parameters=...
)
```


### Stopping the pipeline

In case you want to stop the pipeline while it's running, you can press ++ctrl+c++ or ++cmd+c++ depending on your OS (or send a `SIGINT` to the main process), and the outputs will be stored in the cache. Pressing an additional time will force the pipeline to stop its execution, but this can lead to losing the generated outputs for certain batches.
Expand Down
12 changes: 11 additions & 1 deletion src/distilabel/pipeline/_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

if TYPE_CHECKING:
from distilabel.mixins.runtime_parameters import RuntimeParametersNames
from distilabel.steps.base import Step, _Step
from distilabel.steps.base import GeneratorStep, Step, _Step


class DAG(_Serializable):
Expand Down Expand Up @@ -146,6 +146,16 @@ def add_edge(self, from_step: str, to_step: str) -> None:

self.G.add_edge(from_step, to_step)

def add_root_step(self, step: "GeneratorStep") -> None:
"""Adds a root step, helper method used when a pipeline receives a dataset in the run
method.
Args:
step: The generator step that will be set as the new root.
"""
self.add_step(step)
self.add_edge(step.name, next(iter(self)))

@cached_property
def root_steps(self) -> Set[str]:
"""The steps that don't have any predecessors i.e. generator steps.
Expand Down
36 changes: 35 additions & 1 deletion src/distilabel/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
STEP_ATTR_NAME,
)
from distilabel.pipeline.write_buffer import _WriteBuffer
from distilabel.steps.base import GeneratorStep
from distilabel.steps.generators.utils import make_generator_step
from distilabel.utils.logging import setup_logging, stop_logging
from distilabel.utils.serialization import (
TYPE_INFO_KEY,
Expand All @@ -66,7 +68,11 @@

from distilabel.distiset import Distiset
from distilabel.pipeline.routing_batch_function import RoutingBatchFunction
from distilabel.pipeline.typing import PipelineRuntimeParametersInfo, StepLoadStatus
from distilabel.pipeline.typing import (
InputDataset,
PipelineRuntimeParametersInfo,
StepLoadStatus,
)
from distilabel.steps.base import Step, _Step

class _CacheLocation(TypedDict):
Expand Down Expand Up @@ -290,6 +296,7 @@ def run(
use_cache: bool = True,
storage_parameters: Optional[Dict[str, Any]] = None,
use_fs_to_pass_data: bool = False,
dataset: Optional["InputDataset"] = None,
) -> "Distiset": # type: ignore
"""Run the pipeline. It will set the runtime parameters for the steps and validate
the pipeline.
Expand All @@ -313,6 +320,9 @@ def run(
the `_Batch`es between the steps. Even if this parameter is `False`, the
`Batch`es received by `GlobalStep`s will always use the file system to
pass the data. Defaults to `False`.
dataset: If given, it will be used to create a `GeneratorStep` and put it as the
root step. Convenient method when you have already processed the dataset in
your script and just want to pass it already processed. Defaults to `None`.
Returns:
The `Distiset` created by the pipeline.
Expand All @@ -329,6 +339,9 @@ def run(
log_queue=self._log_queue, filename=str(self._cache_location["log_file"])
)

if dataset is not None:
self._add_dataset_generator_step(dataset)

# Validate the pipeline DAG to check that all the steps are chainable, there are
# no missing runtime parameters, batch sizes are correct, etc.
self.dag.validate()
Expand Down Expand Up @@ -412,6 +425,27 @@ def dry_run(
self._dry_run = False
return distiset

def _add_dataset_generator_step(self, dataset: "InputDataset") -> None:
"""Create a root step to work as the `GeneratorStep` for the pipeline using a
dataset.
Args:
dataset: A dataset that will be used to create a `GeneratorStep` and
placed in the DAG as the root step.
Raises:
ValueError: If there's already a `GeneratorStep` in the pipeline.
"""
for step_name in self.dag:
step = self.dag.get_step(step_name)[STEP_ATTR_NAME]
if isinstance(step_name, GeneratorStep):
raise ValueError(
"There is already a `GeneratorStep` in the pipeline, you can either pass a `dataset` to the "
f"run method, or create a `GeneratorStep` explictly. `GeneratorStep`: {step}"
)
loader = make_generator_step(dataset)
self.dag.add_root_step(loader)

def get_runtime_parameters_info(self) -> "PipelineRuntimeParametersInfo":
"""Get the runtime parameters for the steps in the pipeline.
Expand Down
12 changes: 11 additions & 1 deletion src/distilabel/pipeline/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from queue import Queue

from distilabel.distiset import Distiset
from distilabel.pipeline.typing import InputDataset
from distilabel.steps.base import _Step


Expand Down Expand Up @@ -125,6 +126,7 @@ def run(
use_cache: bool = True,
storage_parameters: Optional[Dict[str, Any]] = None,
use_fs_to_pass_data: bool = False,
dataset: Optional["InputDataset"] = None,
) -> "Distiset":
"""Runs the pipeline.
Expand All @@ -144,6 +146,9 @@ def run(
the `_Batch`es between the steps. Even if this parameter is `False`, the
`Batch`es received by `GlobalStep`s will always use the file system to
pass the data. Defaults to `False`.
dataset: If given, it will be used to create a `GeneratorStep` and put it as the
root step. Convenient method when you have already processed the dataset in
your script and just want to pass it already processed. Defaults to `None`.
Returns:
The `Distiset` created by the pipeline.
Expand All @@ -158,12 +163,17 @@ def run(
use_cache=use_cache,
storage_parameters=storage_parameters,
use_fs_to_pass_data=use_fs_to_pass_data,
dataset=dataset,
)

self._log_queue = cast("Queue[Any]", mp.Queue())

if distiset := super().run(
parameters, use_cache, storage_parameters, use_fs_to_pass_data
parameters,
use_cache,
storage_parameters,
use_fs_to_pass_data,
dataset=dataset,
):
return distiset

Expand Down
11 changes: 10 additions & 1 deletion src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from queue import Queue

from distilabel.distiset import Distiset
from distilabel.pipeline.typing import InputDataset
from distilabel.steps.base import _Step


Expand Down Expand Up @@ -75,6 +76,7 @@ def run(
use_cache: bool = True,
storage_parameters: Optional[Dict[str, Any]] = None,
use_fs_to_pass_data: bool = False,
dataset: Optional["InputDataset"] = None,
) -> "Distiset":
"""Runs the pipeline in the Ray cluster.
Expand All @@ -94,6 +96,9 @@ def run(
the `_Batch`es between the steps. Even if this parameter is `False`, the
`Batch`es received by `GlobalStep`s will always use the file system to
pass the data. Defaults to `False`.
dataset: If given, it will be used to create a `GeneratorStep` and put it as the
root step. Convenient method when you have already processed the dataset in
your script and just want to pass it already processed. Defaults to `None`.
Returns:
The `Distiset` created by the pipeline.
Expand All @@ -108,7 +113,11 @@ def run(
)

if distiset := super().run(
parameters, use_cache, storage_parameters, use_fs_to_pass_data
parameters,
use_cache,
storage_parameters,
use_fs_to_pass_data,
dataset=dataset,
):
return distiset

Expand Down
6 changes: 6 additions & 0 deletions src/distilabel/pipeline/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from typing import TYPE_CHECKING, Dict, List, Literal, TypedDict, TypeVar, Union

if TYPE_CHECKING:
import pandas as pd
from datasets import Dataset

from distilabel.mixins.runtime_parameters import RuntimeParameterInfo
from distilabel.steps.base import GeneratorStep, GlobalStep, Step

Expand Down Expand Up @@ -47,3 +50,6 @@ class StepLoadStatus(TypedDict):
str, Union[List["RuntimeParameterInfo"], Dict[str, "RuntimeParameterInfo"]]
]
"""Alias for the information of the runtime parameters of a `Pipeline`."""

InputDataset = Union["Dataset", "pd.DataFrame", List[Dict[str, str]]]
"""Alias for the types we can process as input dataset."""
2 changes: 2 additions & 0 deletions src/distilabel/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
LoadDataFromFileSystem,
LoadDataFromHub,
)
from distilabel.steps.generators.utils import make_generator_step
from distilabel.steps.globals.huggingface import PushToHub
from distilabel.steps.typing import GeneratorStepOutput, StepOutput

Expand All @@ -70,6 +71,7 @@
"LoadDataFromDisk",
"LoadDataFromFileSystem",
"LoadDataFromHub",
"make_generator_step",
"PushToHub",
"Step",
"StepInput",
Expand Down
3 changes: 3 additions & 0 deletions src/distilabel/steps/generators/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class LoadDataFromHub(GeneratorStep):
def load(self) -> None:
"""Load the dataset from the Hugging Face Hub"""
super().load()
if self._dataset is not None:
# Here to simplify the functionality of distilabel.steps.generators.util.make_generator_step
return

self._dataset = load_dataset(
self.repo_id, # type: ignore
Expand Down
Loading

0 comments on commit fc7e82e

Please sign in to comment.