-
Notifications
You must be signed in to change notification settings - Fork 148
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Update link * Update cache section * Add step to fail if warnings * Fix dependency name
- Loading branch information
1 parent
4cbcb90
commit d99011c
Showing
11 changed files
with
48 additions
and
118 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,135 +1,60 @@ | ||
# Cache and recover pipeline executions | ||
# Pipeline cache | ||
|
||
Distilabel `Pipelines` automatically save all the intermediate steps to avoid losing any data in case of error. | ||
`distilabel` will automatically save all the intermediate outputs generated by each [`Step`][distilabel.steps.base.Step] of a [`Pipeline`][distilabel.pipeline.local.Pipeline], so these outputs can be reused to recover the state of a pipeline execution that was stopped before finishing or to not have to re-execute steps from a pipeline after adding a new downstream step. | ||
|
||
## Cache directory | ||
## How to enable/disable the cache | ||
|
||
Out of the box, the `Pipeline` will use the `~/.cache/distilabel/pipelines` directory to store the different pipelines[^1]: | ||
The use of the cache can be toggled using the `use_cache` parameter of the [`Pipeline.use_cache`][distilabel.pipeline.base.BasePipeline.run] method. If `True`, then `distilabel ` will use the reuse the outputs of previous executions for the new execution. If `False`, then `distilabel` will re-execute all the steps of the pipeline to generate new outputs for all the steps. | ||
|
||
```python | ||
from distilabel.pipeline.local import Pipeline | ||
|
||
with Pipeline(name="cache_testing") as pipeline: | ||
with Pipeline(name="my-pipeline") as pipeline: | ||
... | ||
``` | ||
|
||
This directory can be modified by setting the `DISTILABEL_CACHE_DIR` environment variable (`export DISTILABEL_CACHE_DIR=my_cache_dir`) or by explicitly passing the `cache_dir` variable to the `Pipeline` constructor like so: | ||
|
||
```python | ||
with Pipeline(name="cache_testing", cache_dir="~/my_cache_dir") as pipeline: | ||
... | ||
if __name__ == "__main__": | ||
distiset = pipeline.run(use_cache=False) # (1) | ||
``` | ||
|
||
[^1]: | ||
|
||
The pipelines will be organized according to the pipeline's name attribute, and then by the hash, in case you want to look for something manually, like the following example: | ||
|
||
```bash | ||
$ tree ~/.cache/distilabel/pipelines/ | ||
├── cache_testing | ||
│ └── 13da04d2cc255b2180d6bebb50fb5be91124f70d | ||
│ ├── batch_manager.json | ||
│ ├── batch_manager_steps | ||
│ │ └── succeed_always_0.json | ||
│ ├── data | ||
│ │ └── succeed_always_0 | ||
│ │ └── 00001.parquet | ||
│ ├── pipeline.log | ||
│ └── pipeline.yaml | ||
└── test-pipe | ||
└── f23b95d7ad4e9301a70b2a54c953f8375ebfcd5c | ||
├── batch_manager.json | ||
├── batch_manager_steps | ||
│ └── text_generation_0.json | ||
├── data | ||
│ └── text_generation_0 | ||
│ └── 00001.parquet | ||
├── pipeline.log | ||
└── pipeline.yaml | ||
``` | ||
|
||
## How does it work? | ||
|
||
Let's take a look at the logging messages from a sample pipeline. | ||
When we run a `Pipeline` for the first time | ||
![Pipeline 1](../../../assets/images/sections/caching/caching_pipe_1.png) | ||
If we decide to stop the pipeline (say we kill the run altogether via `CTRL + C` or `CMD + C` in *macOS*), we will see the signal sent to the different workers: | ||
![Pipeline 2](../../../assets/images/sections/caching/caching_pipe_2.png) | ||
After this step, when we run again the pipeline, the first log message we see corresponds to "Load pipeline from cache", which will restart processing from where it stopped: | ||
![Pipeline 3](../../../assets/images/sections/caching/caching_pipe_3.png) | ||
1. Pipeline cache is disabled | ||
|
||
Finally, if we decide to run the same `Pipeline` after it has finished completely, it won't start again but resume the process, as we already have all the data processed: | ||
In addition, the cache can be enabled/disabled at [`Step`][distilabel.steps.base.Step] level using its `use_cache` attribute. If `True`, then the outputs of the step will be reused in the new pipeline execution. If `False`, then the step will be re-executed to generate new outputs. If the cache of one step is disabled and the outputs have to be regenerated, then the outputs of the steps that depend on this step will also be regenerated. | ||
|
||
![Pipeline 4](../../../assets/images/sections/caching/caching_pipe_4.png) | ||
|
||
### Serialization | ||
|
||
Let's see what gets serialized by looking at a sample `Pipeline`'s cached folder: | ||
|
||
```bash | ||
$ tree ~/.cache/distilabel/pipelines/73ca3f6b7a613fb9694db7631cc038d379f1f533 | ||
├── batch_manager.json | ||
├── batch_manager_steps | ||
│ ├── generate_response.json | ||
│ └── rename_columns.json | ||
├── data | ||
│ └── generate_response | ||
│ ├── 00001.parquet | ||
│ └── 00002.parquet | ||
└── pipeline.yaml | ||
```python | ||
with Pipeline(name="writting-assistant") as pipeline: | ||
load_data = LoadDataFromDicts( | ||
data=[ | ||
{ | ||
"instruction": "How much is 2+2?" | ||
} | ||
] | ||
) | ||
|
||
generation = TextGeneration( | ||
llm=InferenceEndpointsLLM( | ||
model_id="Qwen/Qwen2.5-72B-Instruct", | ||
generation_kwargs={ | ||
"temperature": 0.8, | ||
"max_new_tokens": 512, | ||
}, | ||
), | ||
use_cache=False # (1) | ||
) | ||
|
||
load_data >> generation | ||
|
||
if __name__ == "__main__": | ||
distiset = pipeline.run() | ||
``` | ||
|
||
The `Pipeline` will have a signature created from the arguments that define it so we can find it afterwards, and the contents are the following: | ||
|
||
- `batch_manager.json` | ||
|
||
Folder that stores the content of the internal batch manager to keep track of the data. Along with the `batch_manager_steps/` they store the information to restart the `Pipeline`. One shouldn't need to know about it. | ||
- `pipeline.yaml` | ||
This file contains a representation of the `Pipeline` in *YAML* format. If we push a `Distiset` to the Hugging Face Hub as obtained from calling `Pipeline.run`, this file will be stored at our datasets' repository, allowing to reproduce the `Pipeline` using the `CLI`: | ||
1. Step cache is disabled and every time the pipeline is executed, this step will be re-executed | ||
|
||
```bash | ||
distilabel pipeline run --config "path/to/pipeline.yaml" | ||
``` | ||
## How a cache hit is triggered | ||
|
||
- `data/` | ||
`distilabel` groups information and data generated by a `Pipeline` using the name of the pipeline, so the first factor that triggers a cache hit is the name of the pipeline. The second factor, is the [`Pipeline.signature`][distilabel.pipeline.local.Pipeline.signature] property. This property returns a hash that is generated using the names of the steps used in the pipeline and their connections. The third factor, is the [`Pipeline.aggregated_steps_signature`][distilabel.pipeline.local.Pipeline.aggregated_steps_signature] property which is used to determine if the new pipeline execution is exactly the same as one of the previous i.e. the pipeline contains exactly the same steps, with exactly the same connections and the steps are using exactly the same parameters. If these three factors are met, then the cache hit is triggered and the pipeline won't get re-executed and instead the function [`create_distiset`][distilabel.distiset.create_distiset] will be used to create the resulting [`Distiset`][distilabel.distiset.Distiset] using the outputs of the previous execution, as it can be seen in the following image: | ||
|
||
Folder that stores the data generated, with a special folder to keep track of each `leaf_step` separately. We can recreate a `Distiset` from the contents of this folder (*Parquet* files), as we will see next. | ||
|
||
- `pipeline.log` | ||
|
||
This file stores the logs that the `Pipeline` generated while processing. Just as with the `pipeline.yaml` file, it will be pushed to the Hugging Face Hub datasets` repository to keep track of the information. | ||
## create_distiset | ||
In case we wanted to regenerate the dataset from the `cache`, we can do it using the [`create_distiset`][distilabel.distiset.create_distiset] function and passing the path to the `/data` folder inside our `Pipeline`: | ||
```python | ||
from pathlib import Path | ||
from distilabel.distiset import create_distiset | ||
path = Path("~/.cache/distilabel/pipelines/73ca3f6b7a613fb9694db7631cc038d379f1f533/data") | ||
ds = create_distiset(path) | ||
ds | ||
# Distiset({ | ||
# generate_response: DatasetDict({ | ||
# train: Dataset({ | ||
# features: ['instruction', 'response'], | ||
# num_rows: 80 | ||
# }) | ||
# }) | ||
# }) | ||
``` | ||
![Complete cache hit](../../../assets/images/sections/caching/caching_1.png) | ||
|
||
!!! Note | ||
If the new pipeline execution have a different `Pipeline.aggregated_steps_signature` i.e. at least one step has changed its parameters, `distilabel` will reuse the outputs of the steps that have not changed and re-execute the steps that have changed, as it can be seen in the following image: | ||
|
||
Internally, the function will try to inject the `pipeline_path` variable if it's not passed via argument, assuming it's in the parent directory of the current one, called `pipeline.yaml`. If the file doesn't exist, it won't raise any error, but take into account that if the `Distiset` is pushed to the Hugging Face Hub, the `pipeline.yaml` won't be generated. The same happens with the `pipeline.log` file, it can be passed via `log_filename_path`, but it will try to locate it automatically. | ||
![Partial cache hit](../../../assets/images/sections/caching/caching_2.png) | ||
|
||
Lastly, there is the option of including the `distilabel_metadata` column in the final dataset. This column can contain custom metadata generated automatically by the pipeline, like the raw output from an `LLM` without formatting in case of failure, and we can decide whether to include it using the `enable_metadata` argument. | ||
The same pipeline from above gets executed a third time, but this time the last step `text_generation_1` changed, so it's needed to re-execute it. The other steps, as they have not been changed, doesn't need to be re-executed and their outputs are reused. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters