diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index 59613a8e..9c8dfc5d 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -8,15 +8,18 @@ import logging import math import os.path +import requests # Third Party from datasets import Dataset, concatenate_datasets from openai import OpenAI +from openai import OpenAIError import yaml # First Party from instructlab.sdg.checkpointing import Checkpointer from instructlab.sdg.utils import pandas +from instructlab.sdg.throttlers import AdaptiveThrottler # Local from .blocks import llmblock @@ -156,6 +159,9 @@ def generate(self, dataset, checkpoint_name=None) -> Dataset: logger.info("Running pipeline single-threaded") return self._generate_single(dataset) + + # if self.ctx.batch_num_workers is None, calculate default number of workers, same as outlined here: https://docs.python.org/3.11/library/concurrent.futures.html + self.ctx.batch_num_workers = min(32, os.cpu_count() + 4) # Otherwise, split the dataset into batches and run each batch as a # future in the thread pool logger.info( @@ -165,18 +171,54 @@ def generate(self, dataset, checkpoint_name=None) -> Dataset: ) input_splits = self._split_dataset(dataset) output_splits = [] - with ThreadPoolExecutor(max_workers=self.ctx.batch_num_workers) as executor: - futures = [ - executor.submit(self._generate_single, input_split) - for input_split in input_splits - ] - - # Collect the results of each batch as they finish. This needs to - # wait for them all, so the order of waiting doesn't matter - for future in futures: - ds = future.result() - output_splits.append(ds) - checkpointer.checkpoint(ds) + + + throttler = AdaptiveThrottler( + min_workers=1, + max_workers=self.ctx.batch_num_workers, # Upper limit from config + initial_workers=self.ctx.batch_num_workers//2, # Start at 50% of max + ) + + if not input_splits: + logger.warning("Input splits are empty. Returning empty dataset.") + return concatenate_datasets([]) + + while input_splits: + # Get the current number of workers from the throttler + current_workers = throttler.get_workers() + # Take a slice of input splits to process concurrently + input_splits_batch = input_splits[:current_workers] + input_splits = input_splits[current_workers:] + + with ThreadPoolExecutor(max_workers=current_workers) as executor: + # Submit tasks for each batch + futures = [ + executor.submit(self._generate_single, input_split) + for input_split in input_splits_batch + ] + + # Collect the results of each batch as they finish. This needs to + # wait for them all, so the order of waiting doesn't matter + for future in futures: + try: + ds = future.result() # Block until the task is complete + output_splits.append(ds) # Store the successful result + checkpointer.checkpoint(ds) # Save progress + throttler.adjust_workers(success=True) # Increase workers on success + + except PipelineBlockError as err: + root_exception = err.exception # Access the underlying exception + + if isinstance(root_exception, (requests.exceptions.RequestException, TimeoutError, OpenAIError)): + # Retryable errors + logger.warning("Retryable error in pipeline batch generation: %s", root_exception) + throttler.adjust_workers(success=False) + else: + # Non-retryable errors + logger.error("Non Retryable error in pipeline batch generation: %s", err) + throttler.adjust_workers(success=False) + + checkpointer.done() if pre_generated_data: output_splits.append(pre_generated_data) diff --git a/src/instructlab/sdg/throttlers.py b/src/instructlab/sdg/throttlers.py new file mode 100644 index 00000000..03887cb0 --- /dev/null +++ b/src/instructlab/sdg/throttlers.py @@ -0,0 +1,31 @@ +import threading + +DEFAULT_TOLERANCE = 0.2 # Fraction to reduce workers on failure + +class AdaptiveThrottler: + def __init__(self, min_workers, max_workers, initial_workers, tolerance=DEFAULT_TOLERANCE): + self.min_workers = min_workers # Lower limit of workers + self.max_workers = max_workers # Upper limit of workers, same as num_cpus cli argument + self.current_workers = initial_workers # Start with this number + self.tolerance = tolerance # Reduce workers by this fraction on error + self.lock = threading.Lock() # Ensure thread-safe updates + + def adjust_workers(self, success=True): + """Adjust the number of workers based on success or failure.""" + with self.lock: # Use a lock to avoid race conditions in multi-threading + if success: + # Gradually increase workers up to max_workers + if self.current_workers < self.max_workers: + self.current_workers += 1 + else: + # Reduce workers by a fraction on failure, respecting min_workers + if self.current_workers > self.min_workers: + self.current_workers = max( + self.min_workers, + int(self.current_workers * (1 - self.tolerance)), + ) + + def get_workers(self): + """Get the current number of workers.""" + with self.lock: + return self.current_workers diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 6c801845..63fb3e82 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -8,13 +8,13 @@ from contextlib import contextmanager from threading import Event from unittest import mock - # Third Party from datasets import Dataset import pytest # First Party from instructlab.sdg import Block, Pipeline, PipelineBlockError +from instructlab.sdg.throttlers import AdaptiveThrottler ## Helpers ## @@ -194,3 +194,40 @@ def test_block_generation_error_properties_from_strings(): str(gen_err) == f"{PipelineBlockError.__name__}({block_type}/{block_name}): {inner_err}" ) + +def test_pipeline_with_adaptive_throttler(sample_dataset, threaded_ctx): + """Test that the Pipeline integrates correctly with AdaptiveThrottler.""" + + # Mock block.generate to simulate failures and successes + block_type_mock = mock.MagicMock() + throttler = AdaptiveThrottler(min_workers=1, max_workers=5, initial_workers=3) + + def mock_generate(dataset): + try: + # Simulate a task + if len(dataset) % 3 == 0: # Simulate failure for some batches + throttler.adjust_workers(success=False) # Report failure + raise Exception("Simulated failure") + throttler.adjust_workers(success=True) # Report success + return dataset # Return same sample dataset for all cases + except Exception: + raise + + block_type_mock().generate.side_effect = mock_generate + + # Configure the pipeline + pipe_cfg = [ + { + "name": "block-one", + "type": "test", + "config": {}, + } + ] + with block_types({"test": block_type_mock}): + result = Pipeline(threaded_ctx, "", pipe_cfg).generate(sample_dataset) + + # Assertions + assert result is not None # Ensure we got some output + assert throttler.current_workers < 5 # Concurrency should adapt dynamically, so should not reach max_workers + assert throttler.current_workers >= 1 # Should not drop below min_workers + block_type_mock().generate.call_count > 0 # Ensure the block was called at least once