From d94c659ee156b04249eef37010ac7c7384915f32 Mon Sep 17 00:00:00 2001 From: eshwarprasadS Date: Thu, 16 Jan 2025 12:12:25 -0500 Subject: [PATCH] fix: make batch processing sequential Signed-off-by: eshwarprasadS --- src/instructlab/sdg/pipeline.py | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index 8abca62e..d67a59f5 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -205,27 +205,10 @@ def _generate_single(self, dataset) -> Dataset: else: # Split the dataset into batches input_splits = self._split_dataset(dataset) - output_splits = [] - with ThreadPoolExecutor(max_workers=self.ctx.batch_num_workers) as executor: - futures = [ - executor.submit(block.generate, input_split) - for input_split in input_splits - ] - - # Collect the results of each batch - for future in futures: - try: - ds = future.result() - output_splits.append(ds) - except Exception as err: - logger.error("Error in block %s: %s", block_name, err) - raise PipelineBlockError( - exception=err, - block=block, - block_name=block_name, - block_type=block_type, - ) from err - + # Process each batch in sequence + output_splits = [ + block.generate(input_split) for input_split in input_splits + ] # Combine the processed splits back into a single dataset dataset = concatenate_datasets(output_splits)