diff --git a/scripts/test_freeform_skills.py b/scripts/test_freeform_skills.py index a8612c09..70dd6cf4 100644 --- a/scripts/test_freeform_skills.py +++ b/scripts/test_freeform_skills.py @@ -1,11 +1,17 @@ +# Standard +from importlib import resources + # Third Party from datasets import Dataset from openai import OpenAI # First Party from src.instructlab.sdg import SDG -from src.instructlab.sdg.default_flows import SynthSkillsFlow -from src.instructlab.sdg.pipeline import Pipeline +from src.instructlab.sdg.pipeline import ( + FULL_PIPELINES_PACKAGE, + Pipeline, + PipelineContext, +) # for vLLM endpoints, the api_key remains "EMPTY" openai_api_key = "EMPTY" @@ -49,8 +55,10 @@ ds = Dataset.from_list(samples) -skills_flow = SynthSkillsFlow(client, "mixtral", teacher_model, 1).get_flow() -skills_pipe = Pipeline(skills_flow) +ctx = PipelineContext(client, "mixtral", teacher_model, 1) + +with resources.path(FULL_PIPELINES_PACKAGE, "freeform_skills.yaml") as yaml_path: + skills_pipe = Pipeline.from_file(ctx, yaml_path) sdg = SDG([skills_pipe]) gen_data = sdg.generate(ds) diff --git a/scripts/test_grounded_skills.py b/scripts/test_grounded_skills.py index 338edb6c..5578db56 100644 --- a/scripts/test_grounded_skills.py +++ b/scripts/test_grounded_skills.py @@ -1,11 +1,17 @@ +# Standard +from importlib import resources + # Third Party from datasets import Dataset from openai import OpenAI # First Party from src.instructlab.sdg import SDG -from src.instructlab.sdg.default_flows import SynthGroundedSkillsFlow -from src.instructlab.sdg.pipeline import Pipeline +from src.instructlab.sdg.pipeline import ( + FULL_PIPELINES_PACKAGE, + Pipeline, + PipelineContext, +) # for vLLM endpoints, the api_key remains "EMPTY" openai_api_key = "EMPTY" @@ -97,8 +103,10 @@ ds = Dataset.from_list(samples) -skills_flow = SynthGroundedSkillsFlow(client, "mixtral", teacher_model, 10).get_flow() -skills_pipe = Pipeline(skills_flow) +ctx = PipelineContext(client, "mixtral", teacher_model, 10) + +with resources.path(FULL_PIPELINES_PACKAGE, "grounded_skills.yaml") as yaml_path: + skills_pipe = Pipeline.from_file(ctx, yaml_path) sdg = SDG([skills_pipe]) gen_data = sdg.generate(ds) diff --git a/scripts/test_knowledge.py b/scripts/test_knowledge.py index aeedcf59..fc65a275 100644 --- a/scripts/test_knowledge.py +++ b/scripts/test_knowledge.py @@ -1,4 +1,5 @@ # Standard +from importlib import resources import operator # Third Party @@ -7,8 +8,11 @@ # First Party from src.instructlab.sdg import SDG -from src.instructlab.sdg.default_flows import MMLUBenchFlow, SynthKnowledgeFlow -from src.instructlab.sdg.pipeline import Pipeline +from src.instructlab.sdg.pipeline import ( + FULL_PIPELINES_PACKAGE, + Pipeline, + PipelineContext, +) # Please don't add you vLLM endpoint key here openai_api_key = "EMPTY" @@ -38,12 +42,12 @@ ds = Dataset.from_list(samples) -mmlu_flow = MMLUBenchFlow(client, "mixtral", teacher_model, 1).get_flow() -knowledge_flow = SynthKnowledgeFlow(client, "mixtral", teacher_model, 1).get_flow() -knowledge_pipe = Pipeline(knowledge_flow) -mmlu_pipe = Pipeline(mmlu_flow) +ctx = PipelineContext(client, "mixtral", teacher_model, 1) + +with resources.path(FULL_PIPELINES_PACKAGE, "knowledge.yaml") as yaml_path: + knowledge_pipe = Pipeline.from_file(ctx, yaml_path) -sdg = SDG([mmlu_pipe, knowledge_pipe]) +sdg = SDG([knowledge_pipe]) mmlubench_data = sdg.generate(ds) print(mmlubench_data) diff --git a/src/instructlab/sdg/block.py b/src/instructlab/sdg/block.py index 09433f55..75b0a4e8 100644 --- a/src/instructlab/sdg/block.py +++ b/src/instructlab/sdg/block.py @@ -3,6 +3,7 @@ from abc import ABC from collections import ChainMap from typing import Any, Dict, Union +import os.path # Third Party import yaml @@ -14,7 +15,9 @@ class Block(ABC): - def __init__(self, block_name: str) -> None: + def __init__(self, ctx, pipe, block_name: str) -> None: + self.ctx = ctx + self.pipe = pipe self.block_name = block_name @staticmethod @@ -41,8 +44,15 @@ def _load_config(self, config_path: str) -> Union[Dict[str, Any], None]: """ Load the configuration file for this block. + If the supplied configuration file is a relative path, it is assumed + to be part of this Python package. + :param config_path: The path to the configuration file. :return: The loaded configuration. """ + if not os.path.isabs(config_path): + config_path = os.path.join( + os.path.dirname(self.pipe.config_path), config_path + ) with open(config_path, "r", encoding="utf-8") as config_file: return yaml.safe_load(config_file) diff --git a/src/instructlab/sdg/default_flows.py b/src/instructlab/sdg/default_flows.py deleted file mode 100644 index 818c4972..00000000 --- a/src/instructlab/sdg/default_flows.py +++ /dev/null @@ -1,459 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# Standard -from abc import ABC, abstractmethod -from importlib import resources -import operator -import os - -# Local -from .filterblock import FilterByValueBlock -from .llmblock import LLMBlock -from .utilblocks import CombineColumnsBlock - -MODEL_FAMILY_MIXTRAL = "mixtral" -MODEL_FAMILY_MERLINITE = "merlinite" - -_MODEL_PROMPT_MIXTRAL = " [INST] {prompt} [/INST]" -_MODEL_PROMPT_MERLINITE = "'<|system|>\nYou are an AI language model developed by IBM Research. You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior.\n<|user|>\n{prompt}\n<|assistant|>\n'" - -_MODEL_PROMPTS = { - MODEL_FAMILY_MIXTRAL: _MODEL_PROMPT_MIXTRAL, - MODEL_FAMILY_MERLINITE: _MODEL_PROMPT_MERLINITE, -} - - -def _get_model_prompt(model_family): - if model_family not in _MODEL_PROMPTS: - raise ValueError(f"Unknown model family: {model_family}") - return _MODEL_PROMPTS[model_family] - - -class Flow(ABC): - def __init__( - self, client, model_family, model_id, num_instructions_to_generate - ) -> None: - self.client = client - self.model_family = model_family - self.model_id = model_id - self.num_instructions_to_generate = num_instructions_to_generate - self.sdg_base = resources.files(__package__) - - @abstractmethod - def get_flow(self) -> list: - pass - - -class _SimpleFlow(Flow): - def get_flow(self) -> list: - return [ - { - "block_type": LLMBlock, - "block_config": { - "block_name": "", # must be set by subclass - "config_path": "", # must be set by subclass - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["output"], - }, - "gen_kwargs": { - "max_tokens": 2048, - "temperature": 0.7, - "n": self.num_instructions_to_generate, - }, - "drop_duplicates": ["output"], - } - ] - - -class SimpleKnowledgeFlow(_SimpleFlow): - def get_flow(self) -> list: - flow = super().get_flow() - flow[0]["block_config"]["config_path"] = os.path.join( - self.sdg_base, "configs/knowledge/simple_generate_qa.yaml" - ) - flow[0]["block_config"]["block_name"] = "gen_knowledge" - return flow - - -class SimpleFreeformSkillFlow(_SimpleFlow): - def get_flow(self) -> list: - flow = super().get_flow() - flow[0]["block_config"]["config_path"] = os.path.join( - self.sdg_base, "configs/skills/simple_generate_qa_freeform.yaml" - ) - flow[0]["block_config"]["block_name"] = "gen_skill_freeform" - flow[0]["block_config"]["block_name"] = "gen_skill_freeform" - return flow - - -class SimpleGroundedSkillFlow(_SimpleFlow): - def get_flow(self) -> list: - flow = super().get_flow() - flow[0]["block_config"]["config_path"] = os.path.join( - self.sdg_base, "configs/skills/simple_generate_qa_grounded.yaml" - ) - flow[0]["block_config"]["block_name"] = "gen_skill_grounded" - return flow - - -class MMLUBenchFlow(Flow): - def get_flow(self) -> list: - self.sdg_base = resources.files(__package__) - return [ - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_mmlu_knowledge", - "config_path": os.path.join( - self.sdg_base, "configs/knowledge/mcq_generation.yaml" - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["mmlubench_question", "mmlubench_answer"], - }, - "gen_kwargs": { - "temperature": 0, - "max_tokens": 2048, - }, - "drop_duplicates": ["mmlubench_question"], - }, - ] - - -class SynthKnowledgeFlow(Flow): - def get_flow(self) -> list: - return [ - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_knowledge", - "config_path": os.path.join( - self.sdg_base, - "configs/knowledge/generate_questions_responses.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["question", "response"], - "parser_kwargs": { - "parser_name": "custom", - "parsing_pattern": r"\[(?:Question|QUESTION)\]\s*(.*?)\s*\[(?:Answer|ANSWER)\]\s*(.*?)\s*(?=\[(?:Question|QUESTION)\]|$)", - "parser_cleanup_tags": ["[END]"], - }, - }, - "gen_kwargs": { - "max_tokens": 2048, - }, - "drop_duplicates": ["question"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "eval_faithfulness_qa_pair", - "config_path": os.path.join( - self.sdg_base, "configs/knowledge/evaluate_faithfulness.yaml" - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["explanation", "judgment"], - }, - "gen_kwargs": { - "max_tokens": 2048, - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_faithfulness", - "filter_column": "judgment", - "filter_value": "YES", - "operation": operator.eq, - "batch_kwargs": { - "num_procs": 8, - }, - }, - "drop_columns": ["judgment", "explanation"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "eval_relevancy_qa_pair", - "config_path": os.path.join( - self.sdg_base, "configs/knowledge/evaluate_relevancy.yaml" - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["feedback", "score"], - }, - "gen_kwargs": { - "max_tokens": 2048, - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_relevancy", - "filter_column": "score", - "filter_value": 2.0, - "operation": operator.eq, - "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, - }, - "drop_columns": ["feedback", "score"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "eval_verify_question", - "config_path": os.path.join( - self.sdg_base, "configs/knowledge/evaluate_question.yaml" - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["explanation", "rating"], - }, - "gen_kwargs": { - "max_tokens": 2048, - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_verify_question", - "filter_column": "rating", - "filter_value": 1.0, - "operation": operator.eq, - "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, - }, - "drop_columns": ["explanation", "rating", "__index_level_0__"], - }, - ] - - -class SynthSkillsFlow(Flow): - def get_flow(self) -> list: - return [ - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_questions", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/freeform_questions.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["question"], - "batch_kwargs": { - "num_samples": self.num_instructions_to_generate, - }, - }, - "drop_duplicates": ["question"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "eval_questions", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/evaluate_freeform_questions.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["evaluation", "score"], - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_questions", - "filter_column": "score", - "filter_value": 1.0, - "operation": operator.eq, - "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, - }, - "drop_columns": ["evaluation", "score", "num_samples"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_responses", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/freeform_responses.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["response"], - }, - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "evaluate_qa_pair", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/evaluate_freeform_pair.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["evaluation", "score"], - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_qa_pair", - "filter_column": "score", - "filter_value": 2.0, - "operation": operator.ge, - "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, - }, - "drop_columns": ["evaluation", "score"], - }, - ] - - -class SynthGroundedSkillsFlow(Flow): - def get_flow(self) -> list: - return [ - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_contexts", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/contexts.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["context"], - }, - "gen_kwargs": { - "temperature": 0.7, - "max_tokens": 2048, - "n": self.num_instructions_to_generate, - }, - "drop_duplicates": ["context"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_grounded_questions", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/grounded_questions.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["question"], - "batch_kwargs": { - "num_samples": 3, - }, - }, - "drop_duplicates": ["question"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "eval_grounded_questions", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/evaluate_grounded_questions.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["evaluation", "score"], - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_grounded_questions", - "filter_column": "score", - "filter_value": 1.0, - "operation": operator.eq, - "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, - }, - "drop_columns": ["evaluation", "score", "num_samples"], - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "gen_grounded_responses", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/grounded_responses.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["response"], - }, - }, - { - "block_type": LLMBlock, - "block_config": { - "block_name": "evaluate_grounded_qa_pair", - "config_path": os.path.join( - self.sdg_base, - "configs/skills/evaluate_grounded_pair.yaml", - ), - "client": self.client, - "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), - "output_cols": ["evaluation", "score"], - }, - }, - { - "block_type": FilterByValueBlock, - "block_config": { - "block_name": "filter_grounded_qa_pair", - "filter_column": "score", - "filter_value": 2.0, - "operation": operator.ge, - "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, - }, - }, - { - "block_type": CombineColumnsBlock, - "block_config": { - "block_name": "combine_question_and_context", - "columns": ["context", "question"], - "output_col": "question", - "batch_kwargs": { - "num_procs": 8, - "batched": True, - }, - }, - }, - ] diff --git a/src/instructlab/sdg/filterblock.py b/src/instructlab/sdg/filterblock.py index f5551b02..3cc7b427 100644 --- a/src/instructlab/sdg/filterblock.py +++ b/src/instructlab/sdg/filterblock.py @@ -1,4 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 +# Standard +import operator + # Third Party from datasets import Dataset @@ -9,50 +12,157 @@ logger = setup_logger(__name__) +class FilterByValueBlockError(Exception): + """An exception raised by the FilterByValue block.""" + + +def _get_operator_func(op): + if not op in dir(operator): + raise FilterByValueBlockError("Unknown FilterByValueBlock operation '{op}'") + return getattr(operator, op) + + +def _get_convert_dtype(convert_dtype): + if not convert_dtype: + return None + + type_mapping = { + "int": int, + "float": float, + "bool": bool, + } + + if not convert_dtype in type_mapping: + raise FilterByValueBlockError( + "Unknown FilterByValueBlock convert_dtype '{convert_dtype}'" + ) + + return type_mapping[convert_dtype] + + +# Note - this is not a method on the class below in order to avoid +# serializing the object itself when multi-processing is used. +# In particular, SSLContext - embedded in the OpenAI client object - +# cannot be pickled. +def _filter_by_values(samples, column, op, values, num_proc=1): + return samples.filter( + lambda x: any(op(x[column], value) for value in values), + num_proc=num_proc, + ) + + +def _map_dtype(samples, column, dtype, num_proc=1): + def convert_column(sample): + try: + sample[column] = dtype(sample[column]) + except ValueError as e: + logger.error( + "Error converting dtype: %s, filling with None to be filtered later", e + ) + sample[column] = None + return sample + + # FIXME: it appears multiprocessing map has issues with + # None columns. If we pass num_proc>1 here and the error + # case is triggered above, we get: + # ValueError: The features can't be aligned ... + # because the column is still considered a string not + # the new dtype. + num_proc = 1 + + return samples.map(convert_column, num_proc=num_proc) + + class FilterByValueBlock(Block): def __init__( - self, filter_column, filter_value, operation, convert_dtype=None, **batch_kwargs + self, + ctx, + pipe, + block_name, + filter_column, + filter_value, + operation, + convert_dtype=None, ) -> None: """ Initializes a new instance of the FilterByValueBlock class. Parameters: + - ctx (PipelineContext): A PipelineContext object containing runtime parameters. + - pipe (Pipeline): The Pipeline containing this block in its chain. + - block_name (str): An identifier for this block. - filter_column (str): The name of the column in the dataset to apply the filter on. - filter_value (any or list of any): The value(s) to filter by. - - operation (callable): A function that takes two arguments (column value and filter value) and returns a boolean indicating whether the row should be included in the filtered dataset. - - convert_dtype (callable, optional): A function to convert the data type of the filter column before applying the filter. Defaults to None. - - **batch_kwargs: Additional kwargs for batch processing. + - operation (string): The name of a function provided by the "operator" + Python package that takes two arguments (column value and filter value) + and returns a boolean indicating whether the row should be included in + the filtered dataset. + - convert_dtype (string, optional): the name of a Python type to convert + the column values to. Supported values are "int", "float", and "bool". + Defaults to None. Returns: None + + For supported values of `operation`, see the "operator" package + documentation: https://docs.python.org/3/library/operator.html + + Only a subset of the "operator" package is relevant. It has to + follow the semantics of taking two parameters and returning a boolean. + Some operations that work include: + - eq: equal to + - ne: not equal to + - gt: greater than + - ge: greater than or equal to + - lt: less than + - le: less than or equal to + - contains: filter_column contains filter_value (only for string columns) + + Note that the sematics of all operations are: + - filter_column operation filter_value + + Example: FilterByValueBlock(ctx, "filter_by_age", "age", 30, "eq", "int") + - This block will filter the dataset to only include rows where the + "age" column is equal to 30. + + The `contains` operator is only supported for string columns. This is + useful if you want to ensure that a string column contains a specific + substring. + + Example: FilterByValueBlock(ctx, "filter_by_name", "full_name", "John", "contains") + - This block will filter the dataset to only include rows where the + "full_name" column contains the substring "John". + + `filter_value` does not have to be a single value. It can also be a list of values. + In that case, the operation will be applied to each value in the list. The result is + considered True if the operation is True for any of the values in the list. + + Example: FilterByValueBlock(ctx, "filter_by_age", "age", [30, 35], "eq", "int") + - This block will filter the dataset to only include rows where the + "age" column is equal to 30 or 35. + + Example: FilterByValueBlock(ctx, "filter_by_city", "city", ["boston", "charleston", "dublin", "new york"], "eq") + - This block will filter the dataset to only include rows where the + "city" column is equal to "boston", "charleston", "dublin", or "new york". + + Example: FilterByValueBlock(ctx, "filter_by_name", "full_name", ["John", "Jane"], "contains") + - This block will filter the dataset to only include rows where the + "full_name" column contains the substring "John" or "Jane". """ - super().__init__(block_name=self.__class__.__name__) + super().__init__(ctx, pipe, block_name) self.value = filter_value if isinstance(filter_value, list) else [filter_value] self.column_name = filter_column - self.operation = operation - self.convert_dtype = convert_dtype - self.num_procs = batch_kwargs.get("num_procs", 1) - - def _convert_dtype(self, sample): - try: - sample[self.column_name] = self.convert_dtype(sample[self.column_name]) - except ValueError as e: - logger.error( - "Error converting dtype: %s, filling with None to be filtered later", e - ) - sample[self.column_name] = None - return sample + self.operation = _get_operator_func(operation) + self.convert_dtype = _get_convert_dtype(convert_dtype) + if self.convert_dtype: + self.value = [self.convert_dtype(value) for value in self.value] def generate(self, samples) -> Dataset: if self.convert_dtype: - samples = samples.map( - self._convert_dtype, - num_proc=self.num_procs, + samples = _map_dtype( + samples, self.column_name, self.convert_dtype, self.ctx.num_procs ) - return samples.filter( - lambda x: any( - self.operation(x[self.column_name], value) for value in self.value - ), - num_proc=self.num_procs, + return _filter_by_values( + samples, self.column_name, self.operation, self.value, self.ctx.num_procs ) diff --git a/src/instructlab/sdg/generate_data.py b/src/instructlab/sdg/generate_data.py index 36c6cad4..7a926a5a 100644 --- a/src/instructlab/sdg/generate_data.py +++ b/src/instructlab/sdg/generate_data.py @@ -2,6 +2,7 @@ # Standard from datetime import datetime +from importlib import resources from pathlib import Path from typing import Optional import json @@ -17,18 +18,13 @@ # First Party # pylint: disable=ungrouped-imports from instructlab.sdg import SDG, utils -from instructlab.sdg.default_flows import ( - MODEL_FAMILY_MERLINITE, - MODEL_FAMILY_MIXTRAL, - MMLUBenchFlow, - SimpleFreeformSkillFlow, - SimpleGroundedSkillFlow, - SimpleKnowledgeFlow, - SynthGroundedSkillsFlow, - SynthKnowledgeFlow, - SynthSkillsFlow, +from instructlab.sdg.llmblock import MODEL_FAMILY_MERLINITE, MODEL_FAMILY_MIXTRAL +from instructlab.sdg.pipeline import ( + FULL_PIPELINES_PACKAGE, + SIMPLE_PIPELINES_PACKAGE, + Pipeline, + PipelineContext, ) -from instructlab.sdg.pipeline import Pipeline from instructlab.sdg.utils import models from instructlab.sdg.utils.taxonomy import ( leaf_node_to_samples, @@ -168,53 +164,38 @@ def _gen_test_data( outfile.write("\n") -def _sdg_init(pipeline, client, model_family, model_name, num_instructions_to_generate): - knowledge_flow_types = [] - freeform_skill_flow_types = [] - grounded_skill_flow_types = [] +def _sdg_init(pipeline, client, model_family, model_id, num_instructions_to_generate): + pipeline_pkg = None if pipeline == "full": - knowledge_flow_types.append(MMLUBenchFlow) - knowledge_flow_types.append(SynthKnowledgeFlow) - freeform_skill_flow_types.append(SynthSkillsFlow) - grounded_skill_flow_types.append(SynthGroundedSkillsFlow) + pipeline_pkg = FULL_PIPELINES_PACKAGE elif pipeline == "simple": - knowledge_flow_types.append(SimpleKnowledgeFlow) - freeform_skill_flow_types.append(SimpleFreeformSkillFlow) - grounded_skill_flow_types.append(SimpleGroundedSkillFlow) + pipeline_pkg = SIMPLE_PIPELINES_PACKAGE else: - raise utils.GenerateException(f"Error: pipeline ({pipeline}) is not supported.") - - sdg_knowledge = SDG( - [ - Pipeline( - flow_type( - client, model_family, model_name, num_instructions_to_generate - ).get_flow() + # Validate that pipeline is a valid directory and that it contains the required files + if not os.path.exists(pipeline): + raise utils.GenerateException( + f"Error: pipeline directory ({pipeline}) does not exist." ) - for flow_type in knowledge_flow_types - ] - ) - sdg_freeform_skill = SDG( - [ - Pipeline( - flow_type( - client, model_family, model_name, num_instructions_to_generate - ).get_flow() - ) - for flow_type in freeform_skill_flow_types - ] - ) - sdg_grounded_skill = SDG( - [ - Pipeline( - flow_type( - client, model_family, model_name, num_instructions_to_generate - ).get_flow() - ) - for flow_type in grounded_skill_flow_types - ] + for file in ["knowledge.yaml", "freeform_skills.yaml", "grounded_skills.yaml"]: + if not os.path.exists(os.path.join(pipeline, file)): + raise utils.GenerateException( + f"Error: pipeline directory ({pipeline}) does not contain {file}." + ) + + ctx = PipelineContext(client, model_family, model_id, num_instructions_to_generate) + + def load_pipeline(yaml_basename): + if pipeline_pkg: + with resources.path(pipeline_pkg, yaml_basename) as yaml_path: + return Pipeline.from_file(ctx, yaml_path) + else: + return Pipeline.from_file(ctx, os.path.join(pipeline, yaml_basename)) + + return ( + SDG([load_pipeline("knowledge.yaml")]), + SDG([load_pipeline("freeform_skills.yaml")]), + SDG([load_pipeline("grounded_skills.yaml")]), ) - return sdg_knowledge, sdg_freeform_skill, sdg_grounded_skill # TODO - parameter removal needs to be done in sync with a CLI change. @@ -244,9 +225,21 @@ def generate_data( tls_client_cert: Optional[str] = None, tls_client_key: Optional[str] = None, tls_client_passwd: Optional[str] = None, - # TODO need to update the CLI to specify which pipeline to use (simple or full at the moment) pipeline: Optional[str] = "simple", ): + """Generate data for training and testing a model. + + This currently serves as the primary interface from the `ilab` CLI to the `sdg` library. + It is somewhat a transitionary measure, as this function existed back when all of the + functionality was embedded in the CLI. At some stage, we expect to evolve the CLI to + use the SDG library constructs directly, and this function will likely be removed. + + Args: + pipeline: This argument may be either an alias defined by the sdg library ("simple", "full"), + or an absolute path to a directory containing the pipeline YAML files. + We expect three files to be present in this directory: "knowledge.yaml", + "freeform_skills.yaml", and "grounded_skills.yaml". + """ generate_start = time.time() if not os.path.exists(output_dir): diff --git a/src/instructlab/sdg/importblock.py b/src/instructlab/sdg/importblock.py new file mode 100644 index 00000000..5fa479b8 --- /dev/null +++ b/src/instructlab/sdg/importblock.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: Apache-2.0 +# Third Party +from datasets import Dataset + +# Local +from . import pipeline +from .block import Block +from .logger_config import setup_logger + +logger = setup_logger(__name__) + + +class ImportBlock(Block): + def __init__( + self, + ctx, + pipe, + block_name, + path, + ) -> None: + """ + ImportBlock imports a chain of blocks from another pipeline config file. + + Parameters: + - ctx (PipelineContext): A PipelineContext object containing runtime parameters. + - pipe (Pipeline): The Pipeline containing this block in its chain. + - block_name (str): An identifier for this block. + - path (str): A path (absolute, or relative to the instructlab.sdg package) to a pipeline config file. + """ + super().__init__(ctx, pipe, block_name) + self.path = path + self.pipeline = pipeline.Pipeline.from_file(self.ctx, self.path) + + def generate(self, samples) -> Dataset: + logger.info("ImportBlock chaining to blocks from {self.path}") + return self.pipeline.generate(samples) diff --git a/src/instructlab/sdg/llmblock.py b/src/instructlab/sdg/llmblock.py index 4153a191..3f4d32f4 100644 --- a/src/instructlab/sdg/llmblock.py +++ b/src/instructlab/sdg/llmblock.py @@ -13,6 +13,23 @@ logger = setup_logger(__name__) +MODEL_FAMILY_MIXTRAL = "mixtral" +MODEL_FAMILY_MERLINITE = "merlinite" + +_MODEL_PROMPT_MIXTRAL = " [INST] {prompt} [/INST]" +_MODEL_PROMPT_MERLINITE = "'<|system|>\nYou are an AI language model developed by IBM Research. You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior.\n<|user|>\n{prompt}\n<|assistant|>\n'" + +_MODEL_PROMPTS = { + MODEL_FAMILY_MIXTRAL: _MODEL_PROMPT_MIXTRAL, + MODEL_FAMILY_MERLINITE: _MODEL_PROMPT_MERLINITE, +} + + +def _get_model_prompt(model_family): + if model_family not in _MODEL_PROMPTS: + raise ValueError(f"Unknown model family: {model_family}") + return _MODEL_PROMPTS[model_family] + def server_supports_batched(client, model_id: str) -> bool: supported = getattr(client, "server_supports_batched", None) @@ -38,38 +55,37 @@ class LLMBlock(Block): # pylint: disable=too-many-instance-attributes def __init__( self, + ctx, + pipe, block_name, config_path, - client, - model_id, output_cols, parser_kwargs={}, - model_prompt="{prompt}", - **batch_kwargs, + batch_kwargs={}, ) -> None: - super().__init__(block_name) + super().__init__(ctx, pipe, block_name) self.block_config = self._load_config(config_path) self.prompt_struct = ( """{system}\n{introduction}\n{principles}\n{examples}\n{generation}""" ) self.prompt_template = self.prompt_struct.format(**self.block_config) - self.client = client - self.model = model_id - self.model_prompt = model_prompt + self.model_prompt = _get_model_prompt(self.ctx.model_family) self.output_cols = output_cols - self.batch_params = batch_kwargs.get("batch_kwargs", {}) + self.batch_params = batch_kwargs self.parser_name = parser_kwargs.get("parser_name", None) self.parsing_pattern = parser_kwargs.get("parsing_pattern", None) self.parser_cleanup_tags = parser_kwargs.get("parser_cleanup_tags", None) self.defaults = { - "model": self.model, + "model": self.ctx.model_id, "temperature": 0, "max_tokens": 12000, } # Whether the LLM server supports a list of input prompts # and supports the n parameter to generate n outputs per input - self.server_supports_batched = server_supports_batched(client, model_id) + self.server_supports_batched = server_supports_batched( + self.ctx.client, self.ctx.model_id + ) def _parse(self, generated_string) -> dict: matches = {} @@ -111,22 +127,32 @@ def _parse(self, generated_string) -> dict: def _format_prompt(self, sample: Dict) -> str: return self.prompt_template.format(**sample).strip() + def _gen_kwargs(self, **gen_kwargs): + gen_kwargs = {**self.defaults, **gen_kwargs} + if "max_tokens" in gen_kwargs: + gen_kwargs["max_tokens"] = int(gen_kwargs["max_tokens"]) + if "temperature" in gen_kwargs: + gen_kwargs["temperature"] = float(gen_kwargs["temperature"]) + return gen_kwargs + def _generate(self, samples, **gen_kwargs) -> list: prompts = [ self.model_prompt.format(prompt=self._format_prompt(sample)) for sample in samples ] - generate_args = {**self.defaults, **gen_kwargs} + generate_args = self._gen_kwargs(**gen_kwargs) if self.server_supports_batched: - response = self.client.completions.create(prompt=prompts, **generate_args) + response = self.ctx.client.completions.create( + prompt=prompts, **generate_args + ) return [choice.text.strip() for choice in response.choices] n = gen_kwargs.get("n", 1) results = [] for prompt in prompts: for _ in range(n): - response = self.client.completions.create( + response = self.ctx.client.completions.create( prompt=prompt, **generate_args ) results.append(response.choices[0].text.strip()) @@ -189,25 +215,23 @@ def generate(self, samples: Dataset, **gen_kwargs) -> Dataset: class ConditionalLLMBlock(LLMBlock): def __init__( self, + ctx, + pipe, block_name, config_paths, - client, - model_id, output_cols, selector_column_name, parser_kwargs={}, - model_prompt="{prompt}", - **batch_kwargs, + batch_kwargs={}, ) -> None: super().__init__( + ctx, + pipe, block_name, config_paths[0][0], - client, - model_id, output_cols, parser_kwargs=parser_kwargs, - model_prompt=model_prompt, - **batch_kwargs, + batch_kwargs=batch_kwargs, ) self.selector_column_name = selector_column_name self.prompt_template = {} diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index bc570a83..3ee08306 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -1,22 +1,50 @@ # SPDX-License-Identifier: Apache-2.0 +# Standard +from importlib import resources +import os.path + # Third Party from datasets import Dataset +import yaml # Local +from . import filterblock, importblock, llmblock, utilblocks from .logger_config import setup_logger logger = setup_logger(__name__) +class PipelineContext: + def __init__( + self, client, model_family, model_id, num_instructions_to_generate + ) -> None: + self.client = client + self.model_family = model_family + self.model_id = model_id + self.num_instructions_to_generate = num_instructions_to_generate + # FIXME: base this on the available number of CPUs + self.num_procs = 8 + + class Pipeline: - def __init__(self, chained_blocks: list) -> None: + def __init__(self, ctx, config_path, chained_blocks: list) -> None: """ Initialize the Pipeline class with a configuration dictionary. config_dict: the run config py or yaml loaded into a dictionary """ + # ctx is a PipelineContext object that supplies context configuration to every block + self.ctx = ctx + # config_path is the path of the pipeline config file used to create this pipeline + self.config_path = config_path # pipeline config is the run configuration that consists of the pipeline steps self.chained_blocks = chained_blocks + @classmethod + def from_file(cls, ctx, pipeline_yaml): + if not os.path.isabs(pipeline_yaml): + pipeline_yaml = os.path.join(resources.files(__package__), pipeline_yaml) + return cls(ctx, pipeline_yaml, _parse_pipeline_config_file(pipeline_yaml)) + def _drop_duplicates(self, dataset, cols): """ Drop duplicates from the dataset based on the columns provided. @@ -31,14 +59,15 @@ def generate(self, dataset) -> Dataset: dataset: the input dataset """ for block_prop in self.chained_blocks: - block_type = block_prop["block_type"] - block_config = block_prop["block_config"] + block_name = block_prop["name"] + block_type = _lookup_block_type(block_prop["type"]) + block_config = block_prop["config"] drop_columns = block_prop.get("drop_columns", []) gen_kwargs = block_prop.get("gen_kwargs", {}) drop_duplicates_cols = block_prop.get("drop_duplicates", False) - block = block_type(**block_config) + block = block_type(self.ctx, self, block_name, **block_config) - logger.info("Running block: %s", block_config["block_name"]) + logger.info("Running block: %s", block_name) logger.info(dataset) dataset = block.generate(dataset, **gen_kwargs) @@ -51,3 +80,56 @@ def generate(self, dataset) -> Dataset: dataset = self._drop_duplicates(dataset, cols=drop_duplicates_cols) return dataset + + +_block_types = { + "CombineColumnsBlock": utilblocks.CombineColumnsBlock, + "ConditionalLLMBlock": llmblock.ConditionalLLMBlock, + "FilterByValueBlock": filterblock.FilterByValueBlock, + "ImportBlock": importblock.ImportBlock, + "LLMBlock": llmblock.LLMBlock, + "SamplePopulatorBlock": utilblocks.SamplePopulatorBlock, + "SelectorBlock": utilblocks.SelectorBlock, +} + + +def _lookup_block_type(block_type): + if not block_type in _block_types: + raise PipelineConfigParserError("Unknown block type {block_type}") + return _block_types[block_type] + + +_PIPELINE_CONFIG_PARSER_MAJOR = 1 +_PIPELINE_CONFIG_PARSER_MINOR = 0 + + +class PipelineConfigParserError(Exception): + """An exception raised while parsing a pipline config file.""" + + +def _parse_pipeline_config_file(pipeline_yaml): + with open(pipeline_yaml, "r", encoding="utf-8") as pipeline_file: + content = yaml.safe_load(pipeline_file) + + version = content["version"] + major, minor = map(int, version.split(".")) + + if major > _PIPELINE_CONFIG_PARSER_MAJOR: + raise PipelineConfigParserError( + "The pipeline config file format is from a future major version." + ) + if major <= _PIPELINE_CONFIG_PARSER_MAJOR and minor > _PIPELINE_CONFIG_PARSER_MINOR: + logger.warning( + "The pipeline config file may have new features that will be ignored." + ) + + if not "blocks" in content: + raise PipelineConfigParserError( + "The pipeline config file contains no 'blocks' section" + ) + + return content["blocks"] + + +SIMPLE_PIPELINES_PACKAGE = "instructlab.sdg.pipelines.simple" +FULL_PIPELINES_PACKAGE = "instructlab.sdg.pipelines.full" diff --git a/src/instructlab/sdg/pipelines/__init__.py b/src/instructlab/sdg/pipelines/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/instructlab/sdg/pipelines/full/__init__.py b/src/instructlab/sdg/pipelines/full/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/instructlab/sdg/pipelines/full/freeform_skills.yaml b/src/instructlab/sdg/pipelines/full/freeform_skills.yaml new file mode 100644 index 00000000..e14c059a --- /dev/null +++ b/src/instructlab/sdg/pipelines/full/freeform_skills.yaml @@ -0,0 +1,53 @@ +version: "1.0" +blocks: + - name: gen_questions + type: LLMBlock + config: + config_path: ../../configs/skills/freeform_questions.yaml + output_cols: + - question + batch_kwargs: + num_samples: 30 + drop_duplicates: + - question + - name: eval_questions + type: LLMBlock + config: + config_path: ../../configs/skills/evaluate_freeform_questions.yaml + output_cols: + - evaluation + - score + - name: filter_questions + type: FilterByValueBlock + config: + filter_column: score + filter_value: 1.0 + operation: eq + convert_dtype: float + drop_columns: + - evaluation + - score + - num_samples + - name: gen_responses + type: LLMBlock + config: + config_path: ../../configs/skills/freeform_responses.yaml + output_cols: + - response + - name: evaluate_qa_pair + type: LLMBlock + config: + config_path: ../../configs/skills/evaluate_freeform_pair.yaml + output_cols: + - evaluation + - score + - name: filter_qa_pair + type: FilterByValueBlock + config: + filter_column: score + filter_value: 2.0 + operation: ge + convert_dtype: float + drop_columns: + - evaluation + - score diff --git a/src/instructlab/sdg/pipelines/full/grounded_skills.yaml b/src/instructlab/sdg/pipelines/full/grounded_skills.yaml new file mode 100644 index 00000000..8fad3b83 --- /dev/null +++ b/src/instructlab/sdg/pipelines/full/grounded_skills.yaml @@ -0,0 +1,69 @@ +version: "1.0" +blocks: + - name: gen_contexts + type: LLMBlock + config: + config_path: ../../configs/skills/contexts.yaml + output_cols: + - context + gen_kwargs: + temperature: 0.7 + max_tokens: 2048 + n: 10 + drop_duplicates: + - context + - name: gen_grounded_questions + type: LLMBlock + config: + config_path: ../../configs/skills/grounded_questions.yaml + output_cols: + - question + batch_kwargs: + num_samples: 3 + drop_duplicates: + - question + - name: eval_grounded_questions + type: LLMBlock + config: + config_path: ../../configs/skills/evaluate_grounded_questions.yaml + output_cols: + - evaluation + - score + - name: filter_grounded_questions + type: FilterByValueBlock + config: + filter_column: score + filter_value: 1.0 + operation: eq + convert_dtype: float + drop_columns: + - evaluation + - score + - num_samples + - name: gen_grounded_responses + type: LLMBlock + config: + config_path: ../../configs/skills/grounded_responses.yaml + output_cols: + - response + - name: evaluate_grounded_qa_pair + type: LLMBlock + config: + config_path: ../../configs/skills/evaluate_grounded_pair.yaml + output_cols: + - evaluation + - score + - name: filter_grounded_qa_pair + type: FilterByValueBlock + config: + filter_column: score + filter_value: 2.0 + operation: ge + convert_dtype: float + - name: combine_question_and_context + type: CombineColumnsBlock + config: + columns: + - context + - question + output_col: question diff --git a/src/instructlab/sdg/pipelines/full/knowledge.yaml b/src/instructlab/sdg/pipelines/full/knowledge.yaml new file mode 100644 index 00000000..2b9e9c8d --- /dev/null +++ b/src/instructlab/sdg/pipelines/full/knowledge.yaml @@ -0,0 +1,87 @@ +version: "1.0" +blocks: + - name: gen_mmlu_knowledge + type: LLMBlock + config: + config_path: ../../configs/knowledge/mcq_generation.yaml + output_cols: + - mmlubench_question + - mmlubench_answer + gen_kwargs: + temperature: 0 + max_tokens: 2048 + drop_duplicates: + - mmlubench_question + - name: gen_knowledge + type: LLMBlock + config: + config_path: ../../configs/knowledge/generate_questions_responses.yaml + output_cols: + - question + - response + parser_kwargs: + parser_name: custom + parsing_pattern: '\[(?:Question|QUESTION)\]\s*(.*?)\s*\[(?:Answer|ANSWER)\]\s*(.*?)\s*(?=\[(?:Question|QUESTION)\]|$)' + parser_cleanup_tags: + - "[END]" + gen_kwargs: + max_tokens: 2048 + drop_duplicates: + - question + - name: eval_faithfulness_qa_pair + type: LLMBlock + config: + config_path: ../../configs/knowledge/evaluate_faithfulness.yaml + output_cols: + - explanation + - judgment + gen_kwargs: + max_tokens: 2048 + - name: filter_faithfulness + type: FilterByValueBlock + config: + filter_column: judgment + filter_value: "YES" + operation: eq + drop_columns: + - judgment + - explanation + - name: eval_relevancy_qa_pair + type: LLMBlock + config: + config_path: ../../configs/knowledge/evaluate_relevancy.yaml + output_cols: + - feedback + - score + gen_kwargs: + max_tokens: 2048 + - name: filter_relevancy + type: FilterByValueBlock + config: + filter_column: score + filter_value: 2.0 + operation: eq + convert_dtype: float + drop_columns: + - feedback + - score + - name: eval_verify_question + type: LLMBlock + config: + config_path: ../../configs/knowledge/evaluate_question.yaml + output_cols: + - explanation + - rating + gen_kwargs: + max_tokens: 2048 + - name: filter_verify_question + type: FilterByValueBlock + config: + filter_column: rating + filter_value: 1.0 + operation: eq + convert_dtype: float + drop_columns: + - explanation + - rating + - __index_level_0__ diff --git a/src/instructlab/sdg/pipelines/simple/__init__.py b/src/instructlab/sdg/pipelines/simple/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/instructlab/sdg/pipelines/simple/freeform_skills.yaml b/src/instructlab/sdg/pipelines/simple/freeform_skills.yaml new file mode 100644 index 00000000..be589af8 --- /dev/null +++ b/src/instructlab/sdg/pipelines/simple/freeform_skills.yaml @@ -0,0 +1,13 @@ +version: "1.0" +blocks: + - name: gen_skill_freeform + type: LLMBlock + config: + config_path: ../../configs/skills/simple_generate_qa_freeform.yaml + output_cols: + - output + gen_kwargs: + max_tokens: 2048 + temperature: 0.7 + drop_duplicates: + - output diff --git a/src/instructlab/sdg/pipelines/simple/grounded_skills.yaml b/src/instructlab/sdg/pipelines/simple/grounded_skills.yaml new file mode 100644 index 00000000..23925034 --- /dev/null +++ b/src/instructlab/sdg/pipelines/simple/grounded_skills.yaml @@ -0,0 +1,14 @@ +version: "1.0" +blocks: + - name: gen_skill_grounded + type: LLMBlock + config: + config_path: ../../configs/skills/simple_generate_qa_grounded.yaml + output_cols: + - output + gen_kwargs: + max_tokens: 2048 + temperature: 0.7 + n: 10 + drop_duplicates: + - output diff --git a/src/instructlab/sdg/pipelines/simple/knowledge.yaml b/src/instructlab/sdg/pipelines/simple/knowledge.yaml new file mode 100644 index 00000000..7e2cdc4f --- /dev/null +++ b/src/instructlab/sdg/pipelines/simple/knowledge.yaml @@ -0,0 +1,13 @@ +version: "1.0" +blocks: + - name: gen_knowledge + type: LLMBlock + config: + config_path: ../../configs/knowledge/simple_generate_qa.yaml + output_cols: + - output + gen_kwargs: + max_tokens: 2048 + temperature: 0.7 + drop_duplicates: + - output diff --git a/src/instructlab/sdg/utilblocks.py b/src/instructlab/sdg/utilblocks.py index db04b5a1..02b536f5 100644 --- a/src/instructlab/sdg/utilblocks.py +++ b/src/instructlab/sdg/utilblocks.py @@ -10,10 +10,10 @@ class SamplePopulatorBlock(Block): - def __init__(self, config_paths, column_name, post_fix="", **batch_kwargs) -> None: - super().__init__( - block_name=self.__class__.__name__ - ) # Call the base class's __init__ + def __init__( + self, ctx, pipe, block_name, config_paths, column_name, post_fix="" + ) -> None: + super().__init__(ctx, pipe, block_name) self.configs = {} for config in config_paths: if post_fix: @@ -23,48 +23,68 @@ def __init__(self, config_paths, column_name, post_fix="", **batch_kwargs) -> No config_key = config.split("/")[-1].split(".")[0] self.configs[config_key] = self._load_config(config_name) self.column_name = column_name - self.num_procs = batch_kwargs.get("num_procs", 8) - def _generate(self, sample) -> dict: - sample = {**sample, **self.configs[sample[self.column_name]]} - return sample + # Using a static method to avoid serializing self when using multiprocessing + @staticmethod + def _map_populate(samples, configs, column_name, num_proc=1): + def populate(sample): + return {**sample, **configs[sample[column_name]]} + + return samples.map(populate, num_proc=num_proc) def generate(self, samples) -> Dataset: - samples = samples.map(self._generate, num_proc=self.num_procs) - return samples + return self._map_populate_samples( + samples, self.configs, self.column_name, self.ctx.num_procs + ) class SelectorBlock(Block): - def __init__(self, choice_map, choice_col, output_col, **batch_kwargs) -> None: - super().__init__(block_name=self.__class__.__name__) + def __init__( + self, ctx, pipe, block_name, choice_map, choice_col, output_col + ) -> None: + super().__init__(ctx, pipe, block_name) self.choice_map = choice_map self.choice_col = choice_col self.output_col = output_col - self.num_procs = batch_kwargs.get("num_procs", 8) - def _generate(self, sample) -> dict: - sample[self.output_col] = sample[self.choice_map[sample[self.choice_col]]] - return sample + # Using a static method to avoid serializing self when using multiprocessing + @staticmethod + def _map_select_choice(samples, choice_map, choice_col, output_col, num_proc=1): + def select_choice(sample) -> dict: + sample[output_col] = sample[choice_map[sample[choice_col]]] + return sample + + return samples.map(select_choice, num_proc=num_proc) def generate(self, samples: Dataset) -> Dataset: - samples = samples.map(self._generate, num_proc=self.num_procs) - return samples + return self._map_select_choice( + samples, + self.choice_map, + self.choice_col, + self.output_col, + self.ctx.num_procs, + ) class CombineColumnsBlock(Block): - def __init__(self, columns, output_col, separator="\n\n", **batch_kwargs) -> None: - super().__init__(block_name=self.__class__.__name__) + def __init__( + self, ctx, pipe, block_name, columns, output_col, separator="\n\n" + ) -> None: + super().__init__(ctx, pipe, block_name) self.columns = columns self.output_col = output_col self.separator = separator - self.num_procs = batch_kwargs.get("num_procs", 8) - def _generate(self, sample) -> dict: - sample[self.output_col] = self.separator.join( - [sample[col] for col in self.columns] - ) - return sample + # Using a static method to avoid serializing self when using multiprocessing + @staticmethod + def _map_combine(samples, columns, output_col, separator, num_proc=1): + def combine(sample): + sample[output_col] = separator.join([sample[col] for col in columns]) + return sample + + return samples.map(combine, num_proc=num_proc) def generate(self, samples: Dataset) -> Dataset: - samples = samples.map(self._generate, num_proc=self.num_procs) - return samples + return self._map_combine( + samples, self.columns, self.output_col, self.separator, self.ctx.num_procs + ) diff --git a/tests/test_default_pipeline_configs.py b/tests/test_default_pipeline_configs.py new file mode 100644 index 00000000..211cf4de --- /dev/null +++ b/tests/test_default_pipeline_configs.py @@ -0,0 +1,53 @@ +# Standard +from importlib import resources +from unittest.mock import patch +import unittest + +# Third Party +from datasets import Dataset + +# First Party +from instructlab.sdg.filterblock import FilterByValueBlock +from instructlab.sdg.llmblock import ConditionalLLMBlock, LLMBlock +from instructlab.sdg.pipeline import Pipeline, PipelineContext +from instructlab.sdg.utilblocks import ( + CombineColumnsBlock, + SamplePopulatorBlock, + SelectorBlock, +) + + +def _noop_generate(self, samples, **gen_kwargs): + return samples + + +@patch.object(CombineColumnsBlock, "generate", _noop_generate) +@patch.object(ConditionalLLMBlock, "generate", _noop_generate) +@patch.object(FilterByValueBlock, "generate", _noop_generate) +@patch.object(LLMBlock, "generate", _noop_generate) +@patch.object(SamplePopulatorBlock, "generate", _noop_generate) +@patch.object(SelectorBlock, "generate", _noop_generate) +@patch("instructlab.sdg.llmblock.server_supports_batched", lambda c, m: True) +class TestDefaultPipelineConfigs(unittest.TestCase): + def setUp(self): + self._yaml_files = [ + file + for package in [ + "instructlab.sdg.pipelines.simple", + "instructlab.sdg.pipelines.full", + ] + for file in resources.files(package).iterdir() + if file.suffix == ".yaml" + ] + + def test_pipeline_from_config(self): + ctx = PipelineContext( + client=None, + model_family="mixtral", + model_id="model", + num_instructions_to_generate=1, + ) + for pipeline_yaml in self._yaml_files: + pipeline = Pipeline.from_file(ctx, pipeline_yaml) + output = pipeline.generate(Dataset.from_list([])) + self.assertIsNotNone(output) diff --git a/tests/test_filterblock.py b/tests/test_filterblock.py index 7b8b1ce7..5dcc4d1b 100644 --- a/tests/test_filterblock.py +++ b/tests/test_filterblock.py @@ -1,5 +1,5 @@ # Standard -from unittest.mock import patch +from unittest.mock import MagicMock, patch import operator import unittest @@ -8,21 +8,31 @@ # First Party from instructlab.sdg.filterblock import FilterByValueBlock +from instructlab.sdg.pipeline import PipelineContext class TestFilterByValueBlock(unittest.TestCase): def setUp(self): + self.ctx = MagicMock() + self.ctx.num_procs = 1 + self.pipe = MagicMock() self.block = FilterByValueBlock( + self.ctx, + self.pipe, + "filter_by_age", filter_column="age", - filter_value=30, - operation=operator.eq, - convert_dtype=int, + filter_value="30", + operation="eq", + convert_dtype="int", ) self.block_with_list = FilterByValueBlock( + self.ctx, + self.pipe, + "filter_by_age_list", filter_column="age", - filter_value=[30, 35], - operation=operator.eq, - convert_dtype=int, + filter_value=["30", "35"], + operation="eq", + convert_dtype="int", ) self.dataset = Dataset.from_dict( {"age": ["25", "30", "35", "forty", "45"]}, diff --git a/tests/test_importblock.py b/tests/test_importblock.py new file mode 100644 index 00000000..80baf215 --- /dev/null +++ b/tests/test_importblock.py @@ -0,0 +1,104 @@ +# Standard +from unittest.mock import MagicMock, patch +import os +import tempfile +import unittest + +# Third Party +from datasets import Dataset, Features, Value + +# First Party +from instructlab.sdg.importblock import ImportBlock +from instructlab.sdg.pipeline import Pipeline + + +class TestImportBlockWithMockPipeline(unittest.TestCase): + @patch("instructlab.sdg.pipeline.Pipeline") + def setUp(self, mock_pipeline): + self.ctx = MagicMock() + self.pipe = MagicMock() + self.block_name = "test_block" + self.path = "/path/to/config" + self.mock_pipeline = mock_pipeline + self.import_block = ImportBlock(self.ctx, self.pipe, self.block_name, self.path) + self.dataset = Dataset.from_dict({}) + + def test_initialization(self): + self.assertEqual(self.import_block.block_name, self.block_name) + self.assertEqual(self.import_block.path, self.path) + self.mock_pipeline.from_file.assert_called_once_with(self.ctx, self.path) + + def test_generate(self): + self.mock_pipeline.from_file.return_value.generate.return_value = self.dataset + samples = self.import_block.generate(self.dataset) + self.mock_pipeline.from_file.return_value.generate.assert_called_once_with( + samples + ) + self.assertEqual(samples, self.dataset) + + +_CHILD_YAML = """\ +version: "1.0" +blocks: +- name: greater_than_thirty + type: FilterByValueBlock + config: + filter_column: age + filter_value: 30 + operation: gt + convert_dtype: int +""" + + +_PARENT_YAML_FMT = """\ +version: "1.0" +blocks: +- name: forty_or_under + type: FilterByValueBlock + config: + filter_column: age + filter_value: 40 + operation: le + convert_dtype: int +- name: import_child + type: ImportBlock + config: + path: %s +- name: big_bdays + type: FilterByValueBlock + config: + filter_column: age + filter_value: + - 30 + - 40 + operation: eq + convert_dtype: int +""" + + +class TestImportBlockWithFilterByValue(unittest.TestCase): + def setUp(self): + self.ctx = MagicMock() + self.ctx.num_procs = 1 + self.child_yaml = self._write_tmp_yaml(_CHILD_YAML) + self.parent_yaml = self._write_tmp_yaml(_PARENT_YAML_FMT % self.child_yaml) + self.dataset = Dataset.from_dict( + {"age": ["25", "30", "35", "40", "45"]}, + features=Features({"age": Value("string")}), + ) + + def tearDown(self): + os.remove(self.parent_yaml) + os.remove(self.child_yaml) + + def _write_tmp_yaml(self, content): + tmp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".yaml") + tmp_file.write(content) + tmp_file.close() + return tmp_file.name + + def test_generate(self): + pipeline = Pipeline.from_file(self.ctx, self.parent_yaml) + filtered_dataset = pipeline.generate(self.dataset) + self.assertEqual(len(filtered_dataset), 1) + self.assertEqual(filtered_dataset["age"], [40])