From ff6e68670351ee2a50330c34f164c0266f6ccfbc Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 10 Jan 2024 19:32:34 +0100 Subject: [PATCH] Add fixes (#63) Added small fixes to the SPS_merge branch. Mainly some formatting and linting related fixes. Tested all notebooks end-to-end and they seem to work fine --- src/evaluation.ipynb | 61 ++++++--- src/parameter_search.ipynb | 67 ++++++--- src/pipeline_eval.py | 16 ++- src/pipeline_index.py | 11 +- src/utils.py | 272 ++++++++++++++++++++++++------------- 5 files changed, 289 insertions(+), 138 deletions(-) diff --git a/src/evaluation.ipynb b/src/evaluation.ipynb index fd74e24..5bd38e1 100644 --- a/src/evaluation.ipynb +++ b/src/evaluation.ipynb @@ -65,9 +65,7 @@ }, { "cell_type": "markdown", - "metadata": { - "jp-MarkdownHeadingCollapsed": true - }, + "metadata": {}, "source": [ "## Set up environment" ] @@ -110,8 +108,7 @@ "metadata": {}, "outputs": [], "source": [ - "!docker compose version\n", - "!docker ps && echo \"Docker running\"" + "!docker compose version" ] }, { @@ -151,9 +148,34 @@ }, { "cell_type": "markdown", - "metadata": { - "jp-MarkdownHeadingCollapsed": true - }, + "metadata": {}, + "source": [ + "**Check if GPU is available**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import subprocess\n", + "\n", + "try:\n", + " subprocess.check_output('nvidia-smi')\n", + " logging.info(\"Found GPU, using it!\")\n", + " number_of_accelerators = 1\n", + " accelerator_name = \"GPU\"\n", + "except Exception:\n", + " logging.warning(\"We recommend to run this pipeline on a GPU, but none could be found, using CPU instead\")\n", + " number_of_accelerators = None\n", + " accelerator_name = None" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, "source": [ "## Spin up the Weaviate vector store" ] @@ -227,7 +249,7 @@ "import weaviate\n", "\n", "try:\n", - " local_weaviate_client = weaviate.Client(\"http://localhost:8080\")\n", + " local_weaviate_client = weaviate.Client(\"http://localhost:8081\")\n", " logging.info(\"Connected to Weaviate instance\")\n", "except weaviate.WeaviateStartUpError:\n", " logging.error(\"Cannot connect to weaviate instance, is it running?\")" @@ -296,7 +318,7 @@ " \"embed_model_provider\": \"huggingface\",\n", " \"embed_model\": \"all-MiniLM-L6-v2\",\n", " \"embed_api_key\": {},\n", - " \"weaviate_url\": f\"http://{utils.get_host_ip()}:8080\",\n", + " \"weaviate_url\": f\"http://{utils.get_host_ip()}:8081\",\n", " \"weaviate_class\": \"Pipeline1\", # Capitalized, avoid special characters (_, =, -, etc.)\n", "}\n", "\n", @@ -307,7 +329,13 @@ " \"chunk_overlap\": 8,\n", "}\n", "\n", - "indexing_pipeline = pipeline_index.create_pipeline(**shared_args, **indexing_args)" + "# Parameters for the GPU resources\n", + "resources_args = {\n", + " \"number_of_accelerators\": number_of_accelerators,\n", + " \"accelerator_name\": accelerator_name,\n", + "}\n", + "\n", + "indexing_pipeline = pipeline_index.create_pipeline(**shared_args, **indexing_args, **resources_args)" ] }, { @@ -389,13 +417,12 @@ "import os\n", "import pipeline_eval\n", "\n", + "os.environ[\"OPENAI_API_KEY\"] = \"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\"\n", + "\n", "evaluation_args = {\n", " \"retrieval_top_k\": 2,\n", - " \"evaluation_set_path\" : \"./evaluation_datasets\",\n", - " \"evaluation_set_filename\" : \"wikitext_1000_q.csv\",\n", - " \"evaluation_set_separator\" : \";\",\n", - " \"evaluation_module\": \"langchain.llms\",\n", - " \"evaluation_llm\": \"OpenAI\",\n", + " \"evaluation_module\": \"langchain.chat_models\",\n", + " \"evaluation_llm\": \"ChatOpenAI\",\n", " \"evaluation_llm_kwargs\": {\n", " \"openai_api_key\": os.environ[\"OPENAI_API_KEY\"], # TODO: Update with your key or use a different model\n", " \"model_name\" : \"gpt-3.5-turbo\"\n", @@ -534,7 +561,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.13" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/src/parameter_search.ipynb b/src/parameter_search.ipynb index ff49169..08776bb 100644 --- a/src/parameter_search.ipynb +++ b/src/parameter_search.ipynb @@ -59,9 +59,7 @@ }, { "cell_type": "markdown", - "metadata": { - "jp-MarkdownHeadingCollapsed": true - }, + "metadata": {}, "source": [ "## Set up environment" ] @@ -127,6 +125,33 @@ "logging.info(\"test\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Check if GPU is available**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import subprocess\n", + "\n", + "try:\n", + " subprocess.check_output('nvidia-smi')\n", + " logging.info(\"Found GPU, using it!\")\n", + " number_of_accelerators = 1\n", + " accelerator_name = \"GPU\"\n", + "except Exception:\n", + " logging.warning(\"We recommend to run this pipeline on a GPU, but none could be found, using CPU instead\")\n", + " number_of_accelerators = None\n", + " accelerator_name = None" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -145,9 +170,7 @@ }, { "cell_type": "markdown", - "metadata": { - "jp-MarkdownHeadingCollapsed": true - }, + "metadata": {}, "source": [ "## Spin up the Weaviate vector store" ] @@ -180,19 +203,19 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "!docker compose -f weaviate/docker-compose.yaml up --detach" + "Make sure you have **Weaviate client v3**" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ - "Make sure you have **Weaviate client v3**" + "!docker compose -f weaviate/docker-compose.yaml up --detach" ] }, { @@ -316,6 +339,7 @@ " 'retrieval_top_k' : [2, 4, 8]\n", "}\n", "\n", + "evaluation_set_path = \"./evaluation_datasets\"\n", "search_method = 'progressive_search' # 'grid_search', 'progressive_search'\n", "target_metric = 'context_precision' # relevant for 'smart' methods that use previous results to determine params, e.g. progressive search" ] @@ -335,6 +359,8 @@ "source": [ "from utils import get_host_ip\n", "\n", + "os.environ[\"OPENAI_API_KEY\"] = \"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\"\n", + "\n", "# configurable parameters\n", "shared_args = {\n", " \"base_path\" : \"./data\", # where data goes\n", @@ -345,14 +371,19 @@ " \"n_rows_to_load\" : 1000,\n", "}\n", "eval_args = {\n", - " \"evaluation_set_path\" : \"./evaluation_datasets\",\n", " \"evaluation_set_filename\" : \"wikitext_1000_q.csv\",\n", " \"evaluation_set_separator\" : \";\",\n", " \"evaluation_module\": \"langchain.chat_models\",\n", " \"evaluation_llm\": \"ChatOpenAI\",\n", " \"evaluation_llm_kwargs\": {\"openai_api_key\": os.environ[\"OPENAI_API_KEY\"], #TODO Specify your key if you're using OpenAI\n", - " \"model_name\" : \"gpt-4\"}, # e.g. \"gpt-4\" or \"gpt-3.5-turbo\"\n", + " \"model_name\" : \"gpt-3.5-turbo\"}, # e.g. \"gpt-4\" or \"gpt-3.5-turbo\"\n", " \"evaluation_metrics\" : [\"context_precision\", \"context_relevancy\"]\n", + "}\n", + "\n", + "# Parameters for the GPU resources\n", + "resource_args = {\n", + " \"number_of_accelerators\": number_of_accelerators,\n", + " \"accelerator_name\": accelerator_name,\n", "}" ] }, @@ -389,12 +420,14 @@ " shared_args = shared_args,\n", " index_args = index_args,\n", " eval_args = eval_args,\n", + " resource_args = resource_args,\n", " search_method = search_method,\n", " target_metric = target_metric,\n", + " evaluation_set_path=evaluation_set_path,\n", " debug = True # set to False if you do not want to see intermediary results and evolving parameters printed out\n", ")\n", "\n", - "parameter_search_results = mysearch.run()" + "results = mysearch.run()" ] }, { @@ -417,7 +450,7 @@ "metadata": {}, "outputs": [], "source": [ - "parameter_search_results" + "results" ] }, { @@ -571,7 +604,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.13" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/src/pipeline_eval.py b/src/pipeline_eval.py index 454934a..87c2c54 100644 --- a/src/pipeline_eval.py +++ b/src/pipeline_eval.py @@ -1,7 +1,7 @@ """Fondant pipeline to evaluate a RAG pipeline.""" import pyarrow as pa -from fondant.pipeline import Pipeline +from fondant.pipeline import Pipeline, Resources def create_pipeline( @@ -9,8 +9,8 @@ def create_pipeline( base_path: str = "./data", weaviate_url="http://host.docker.internal:8080", weaviate_class: str = "Pipeline1", - evaluation_set_path = "./evaluation_sets", - evaluation_set_filename = "wikitext_1000_q.csv", + evaluation_set_path="./evaluation_datasets", + evaluation_set_filename="wikitext_1000_q.csv", evaluation_set_separator: str = ";", embed_model_provider: str = "huggingface", embed_model: str = "all-MiniLM-L6-v2", @@ -20,6 +20,8 @@ def create_pipeline( evaluation_llm: str = "OpenAI", evaluation_llm_kwargs: dict = {"model_name": "gpt-3.5-turbo"}, evaluation_metrics: list = ["context_precision", "context_relevancy"], + number_of_accelerators=None, + accelerator_name=None, ): """Create a Fondant pipeline based on the provided arguments.""" evaluation_pipeline = Pipeline( @@ -31,7 +33,8 @@ def create_pipeline( load_from_csv = evaluation_pipeline.read( "load_from_csv", arguments={ - "dataset_uri": '/evaldata/' + evaluation_set_filename, # mounted dir from within docker as extra_volumes + "dataset_uri": "/evaldata/" + evaluation_set_filename, + # mounted dir from within docker as extra_volumes "column_separator": evaluation_set_separator, }, produces={ @@ -49,6 +52,11 @@ def create_pipeline( consumes={ "text": "question", }, + resources=Resources( + accelerator_number=number_of_accelerators, + accelerator_name=accelerator_name, + ), + cluster_type="local" if number_of_accelerators is not None else "default", ) retrieve_chunks = embed_text_op.apply( diff --git a/src/pipeline_index.py b/src/pipeline_index.py index c72435d..e1926e6 100644 --- a/src/pipeline_index.py +++ b/src/pipeline_index.py @@ -1,13 +1,13 @@ """Fondant pipeline to index a RAG system.""" import pyarrow as pa -from fondant.pipeline import Pipeline +from fondant.pipeline import Pipeline, Resources def create_pipeline( *, + weaviate_url: str, base_path: str = "./data", n_rows_to_load: int = 1000, - weaviate_url: str = "http://host.docker.internal:8080", weaviate_class: str = "Pipeline1", weaviate_overwrite: bool = True, embed_model_provider: str = "huggingface", @@ -15,6 +15,8 @@ def create_pipeline( embed_api_key: dict = {}, chunk_size: int = 512, chunk_overlap: int = 32, + number_of_accelerators=None, + accelerator_name=None, ): """Create a Fondant pipeline based on the provided arguments.""" indexing_pipeline = Pipeline( @@ -50,6 +52,11 @@ def create_pipeline( "model": embed_model, "api_keys": embed_api_key, }, + resources=Resources( + accelerator_number=number_of_accelerators, + accelerator_name=accelerator_name, + ), + cluster_type="local" if number_of_accelerators is not None else "default", ) embeddings.write( diff --git a/src/utils.py b/src/utils.py index c7f5945..1b28e79 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,19 +1,19 @@ -import glob -import itertools -import json import logging import os import socket +import typing as t from datetime import datetime -from pathlib import Path from itertools import product +from pathlib import Path import pandas as pd import pipeline_eval import pipeline_index -import weaviate from fondant.pipeline.runner import DockerRunner +logger = logging.getLogger() +logger.setLevel(logging.INFO) + def get_host_ip(): try: @@ -38,7 +38,9 @@ def create_directory_if_not_exists(path): def cartesian_product(input_dict): - return (dict(zip(input_dict.keys(), values)) for values in product(*input_dict.values())) + return ( + dict(zip(input_dict.keys(), values)) for values in product(*input_dict.values()) + ) def extract_timestamp(folder_name): @@ -51,22 +53,27 @@ def has_parquet_file(data_directory, entry, component_name): # Check if the component exists if not os.path.exists(component_folder) or not os.path.isdir(component_folder): return False - parquet_files = [file for file in os.listdir(component_folder) if file.endswith(".parquet")] + parquet_files = [ + file for file in os.listdir(component_folder) if file.endswith(".parquet") + ] return bool(parquet_files) -def get_metrics_latest_run(base_path, - pipeline_name="evaluation-pipeline", - component_name = "aggregate_eval_results"): - +def get_metrics_latest_run( + base_path, + pipeline_name="evaluation-pipeline", + component_name="aggregate_eval_results", +): data_directory = f"{base_path}/{pipeline_name}" - + # keep data folders that belong to pipeline and contain parquet file - valid_entries = [d + valid_entries = [ + d for d in os.listdir(data_directory) if os.path.isdir(os.path.join(data_directory, d)) and d.startswith(pipeline_name) - and has_parquet_file(data_directory, d, component_name)] + and has_parquet_file(data_directory, d, component_name) + ] # keep the latest folder latest_run = sorted(valid_entries, key=extract_timestamp, reverse=True)[0] @@ -74,40 +81,56 @@ def get_metrics_latest_run(base_path, # read all Parquet files and concatenate them into a single DataFrame component_folder = os.path.join(data_directory, latest_run, component_name) parquet_files = [f for f in os.listdir(component_folder) if f.endswith(".parquet")] - dfs = [pd.read_parquet(os.path.join(component_folder, file)) for file in parquet_files] + dfs = [ + pd.read_parquet(os.path.join(component_folder, file)) for file in parquet_files + ] + + # Concatenate DataFrames and set index + concatenated_df = pd.concat(dfs, ignore_index=True).set_index("metric") + + return concatenated_df["score"].apply(lambda x: round(x, 2)).to_dict() - return pd.concat(dfs, ignore_index=True).set_index('metric')['score'].to_dict() def add_embed_model_numerical_column(df): - df['embed_model_numerical'] = pd.factorize(df['embed_model'])[0] + 1 + df["embed_model_numerical"] = pd.factorize(df["embed_model"])[0] + 1 return df + def show_legend_embed_models(df): - columns_to_show = ['embed_model','embed_model_numerical'] - df = df[columns_to_show].drop_duplicates().set_index('embed_model_numerical') - df.index.name = '' + columns_to_show = ["embed_model", "embed_model_numerical"] + df = df[columns_to_show].drop_duplicates().set_index("embed_model_numerical") + df.index.name = "" return df class ParameterSearch: - """RAG parameter search""" - - def __init__(self, - searchable_index_params, - searchable_shared_params, - searchable_eval_params, - index_args, - shared_args, - eval_args, - search_method = 'progressive_search', - target_metric = 'context_precision', - debug=False): - + """RAG parameter search.""" + + def __init__( + self, + *, + searchable_index_params: t.Dict[str, t.Any], + searchable_shared_params: t.Dict[str, t.Any], + searchable_eval_params: t.Dict[str, t.Any], + index_args: t.Dict[str, t.Any], + shared_args: t.Dict[str, t.Any], + eval_args: t.Dict[str, t.Any], + resource_args: t.Dict[str, t.Any], + evaluation_set_path: str = "./evaluation_datasets", + search_method: str = "progressive_search", + target_metric: str = "context_precision", + debug=False, + ): self.searchable_index_params = searchable_index_params self.searchable_shared_params = searchable_shared_params self.searchable_eval_params = searchable_eval_params - self.searchable_params = {**searchable_index_params, **searchable_shared_params, **searchable_eval_params} + self.searchable_params = { + **searchable_index_params, + **searchable_shared_params, + **searchable_eval_params, + } self.index_args = index_args + self.resource_args = resource_args self.shared_args = shared_args self.eval_args = eval_args self.search_method = search_method @@ -115,120 +138,162 @@ def __init__(self, self.debug = debug # create directory for pipeline output data - self.base_path = create_directory_if_not_exists(shared_args['base_path']) + self.base_path = create_directory_if_not_exists(shared_args["base_path"]) # mount directory of pipeline output data from docker - self.extra_volumes = [str(os.path.join(os.path.abspath(eval_args['evaluation_set_path']))) + ":/evaldata"] + self.extra_volumes = [ + str(os.path.join(os.path.abspath(evaluation_set_path))) + ":/evaldata", + ] # define pipeline runner self.runner = DockerRunner() # list of dicts to store all params & results self.results = [] - + def run(self): - - runcount = 0 - - while True: + run_count = 0 - configs = self.create_configs(runcount) + while True: + configs = self.create_configs(run_count) if configs is None: break - + # create configs indexing_config, evaluation_config = configs # create pipeline objects - indexing_pipeline, evaluation_pipeline = self.create_pipelines(indexing_config, evaluation_config) - + indexing_pipeline, evaluation_pipeline = self.create_pipelines( + indexing_config, + evaluation_config, + ) + # run indexing pipeline - self.run_indexing_pipeline(runcount, indexing_config, indexing_pipeline) - + self.run_indexing_pipeline(run_count, indexing_config, indexing_pipeline) + # run evaluation pipeline - self.run_evaluation_pipeline(runcount, evaluation_config, evaluation_pipeline) + self.run_evaluation_pipeline( + run_count, + evaluation_config, + evaluation_pipeline, + ) # read metrics from pipeline output - metrics={} + metrics = {} metrics = get_metrics_latest_run(self.base_path) metadata = { - 'run_number' : runcount, - 'date_time' : datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "run_number": run_count, + "date_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } - + # collect results - self.results.append({**metadata, **indexing_config, **evaluation_config, **metrics}) + self.results.append( + {**metadata, **indexing_config, **evaluation_config, **metrics}, + ) - runcount += 1 + run_count += 1 return pd.DataFrame(self.results) - - def create_configs(self, runcount): - - if self.search_method == 'grid_search': + def create_configs(self, run_count: int): + if self.search_method == "grid_search": # all possible combinations of parameters all_combinations = list(cartesian_product(self.searchable_params)) # when all combinations have been tried, stop searching - if runcount > len(all_combinations) - 1: + if run_count > len(all_combinations) - 1: return None # create base config for indexing pipeline - pipeline_config = all_combinations[runcount] - - elif self.search_method == 'progressive_search': + pipeline_config = all_combinations[run_count] + elif self.search_method == "progressive_search": # initialize pipeline config with middle values for each parameter pipeline_config = {} for key, value in self.searchable_params.items(): - middle_index = int((len(value) - 1)/2) - pipeline_config.update({key:value[middle_index]}) - + middle_index = int((len(value) - 1) / 2) + pipeline_config.update({key: value[middle_index]}) + # make a list of variations to try keys_to_try = [] values_to_try = [] step = 0 for key, values in self.searchable_params.items(): - if len(values) > 1: # only variations to try when more than one option + if len(values) > 1: # only variations to try when more than one option for option in values: - # for the first step we need to try all options, for subsequent steps we need not repeat the default starting options (pipeline_config) - if step == 0 or not (key in pipeline_config and option == pipeline_config[key]): + # for the first step we need to try all options, for subsequent + # steps we need not repeat the default starting options (pipeline_config) + if step == 0 or not ( + key in pipeline_config and option == pipeline_config[key] + ): keys_to_try.append(key) values_to_try.append(option) step += 1 - variations_to_try = [{keys_to_try[i]:values_to_try[i]} for i in range(len(keys_to_try))] + variations_to_try = [ + {keys_to_try[i]: values_to_try[i]} for i in range(len(keys_to_try)) + ] # if there are no variations to try, just schedule one run if len(variations_to_try) == 0: - variations_to_try = [{list(pipeline_config.keys())[0]: list(pipeline_config.values())[0]}] + variations_to_try = [ + { + list(pipeline_config.keys())[0]: list(pipeline_config.values())[ + 0 + ], + }, + ] # when all variations have been tried, stop searching - if runcount > len(variations_to_try) - 1: + if run_count > len(variations_to_try) - 1: return None - + # update with best performing params if len(self.results): - pipeline_config = pd.DataFrame(self.results).sort_values(self.target_metric, ascending=False).iloc[0].to_dict() - pipeline_config.update({'embed_model' : (pipeline_config['embed_model_provider'], pipeline_config['embed_model'])}) # TOD0 cleanify - - logging.info(f'Trying: {variations_to_try[runcount]}') - pipeline_config.update(variations_to_try[runcount]) + pipeline_config = ( + pd.DataFrame(self.results) + .sort_values(self.target_metric, ascending=False) + .iloc[0] + .to_dict() + ) + pipeline_config.update( + { + "embed_model": ( + pipeline_config["embed_model_provider"], + pipeline_config["embed_model"], + ), + }, + ) # TOD0 cleanify + + logging.info(f"Trying: {variations_to_try[run_count]}") + pipeline_config.update(variations_to_try[run_count]) else: - raise ValueError('Please provide a valid search method') + msg = "Please provide a valid search method" + raise ValueError(msg) # filter out indexing & evaluation parameters - indexing_config = {key: pipeline_config[key] for key in {**self.searchable_index_params, **self.searchable_shared_params}} - evaluation_config = {key: pipeline_config[key] for key in {**self.searchable_eval_params, **self.searchable_shared_params}} + indexing_config = { + key: pipeline_config[key] + for key in {**self.searchable_index_params, **self.searchable_shared_params} + } + evaluation_config = { + key: pipeline_config[key] + for key in {**self.searchable_eval_params, **self.searchable_shared_params} + } # More shared parameters - indexing_config['weaviate_class'] = evaluation_config['weaviate_class'] = f'Run{runcount}' - indexing_config['embed_model_provider'] = evaluation_config['embed_model_provider'] = indexing_config['embed_model'][0] - indexing_config['embed_model'] = evaluation_config['embed_model'] = indexing_config['embed_model'][1] - + indexing_config["weaviate_class"] = evaluation_config[ + "weaviate_class" + ] = f"Run{run_count}" + indexing_config["embed_model_provider"] = evaluation_config[ + "embed_model_provider" + ] = indexing_config["embed_model"][0] + indexing_config["embed_model"] = evaluation_config[ + "embed_model" + ] = indexing_config["embed_model"][1] + return indexing_config, evaluation_config def create_pipelines(self, indexing_config, evaluation_config): @@ -237,31 +302,42 @@ def create_pipelines(self, indexing_config, evaluation_config): indexing_pipeline = pipeline_index.create_pipeline( **self.shared_args, **self.index_args, - **indexing_config + **indexing_config, + **self.resource_args, ) # create evaluation pipeline evaluation_pipeline = pipeline_eval.create_pipeline( **self.shared_args, **self.eval_args, - **evaluation_config + **evaluation_config, + **self.resource_args, ) if self.debug: - print('\nIntermediary results:') - print(pd.DataFrame(self.results)) - print(f'RUN {indexing_config["weaviate_class"]}') - print('\nIndexing pipeline parameters:') - print({**self.shared_args, **self.index_args, **indexing_config}) - print('\nEvaluation pipeline parameters:') - print({**self.shared_args, **self.eval_args, **evaluation_config}) - + logger.info("\nIntermediary results:") + logger.info(pd.DataFrame(self.results)) + logger.info(f'RUN {indexing_config["weaviate_class"]}') + logger.info("\nIndexing pipeline parameters:") + logger.info({**self.shared_args, **self.index_args, **indexing_config}) + logger.info("\nEvaluation pipeline parameters:") + logger.info({**self.shared_args, **self.eval_args, **evaluation_config}) + return indexing_pipeline, evaluation_pipeline - def run_indexing_pipeline(self, runcount, indexing_config, indexing_pipeline): - logging.info(f'Starting indexing pipeline of run #{runcount} with {indexing_config}') + def run_indexing_pipeline(self, run_count, indexing_config, indexing_pipeline): + logger.info( + f"Starting indexing pipeline of run #{run_count} with {indexing_config}", + ) self.runner.run(indexing_pipeline) - def run_evaluation_pipeline(self, runcount, evaluation_config, evaluation_pipeline): - logging.info(f'Starting evaluation pipeline of run #{runcount} with {evaluation_config}') + def run_evaluation_pipeline( + self, + run_count, + evaluation_config, + evaluation_pipeline, + ): + logger.info( + f"Starting evaluation pipeline of run #{run_count} with {evaluation_config}", + ) self.runner.run(input=evaluation_pipeline, extra_volumes=self.extra_volumes)