From ffeaec28e493fb9573f6b7ed7952088a65b741e9 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 8 Feb 2024 10:22:44 +0100 Subject: [PATCH] remove custom lightweight components --- src/components/__init__.py | 0 src/components/aggregrate_eval_results.py | 25 --- src/components/chunking_component.py | 68 ------ src/components/evaluate_ragas.py | 97 -------- src/components/retrieve_from_weaviate.py | 73 ------- src/evaluation.ipynb | 255 +++++++++++++--------- 6 files changed, 151 insertions(+), 367 deletions(-) delete mode 100644 src/components/__init__.py delete mode 100644 src/components/aggregrate_eval_results.py delete mode 100644 src/components/chunking_component.py delete mode 100644 src/components/evaluate_ragas.py delete mode 100644 src/components/retrieve_from_weaviate.py diff --git a/src/components/__init__.py b/src/components/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/components/aggregrate_eval_results.py b/src/components/aggregrate_eval_results.py deleted file mode 100644 index 6cf52ba..0000000 --- a/src/components/aggregrate_eval_results.py +++ /dev/null @@ -1,25 +0,0 @@ -import dask.dataframe as dd -import pyarrow as pa -from fondant.component import DaskTransformComponent -from fondant.pipeline import lightweight_component - - -@lightweight_component( - consumes={ - "context_precision": pa.float32(), - "context_relevancy": pa.float32(), - }, - produces={ - "metric": pa.string(), - "score": pa.float32(), - }, -) -class AggregateResults(DaskTransformComponent): - def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame: - metrics = list(self.consumes.keys()) - agg = dataframe[metrics].mean() - agg_df = agg.to_frame(name="score") - agg_df["metric"] = agg.index - agg_df.index = agg_df.index.astype(str) - - return agg_df diff --git a/src/components/chunking_component.py b/src/components/chunking_component.py deleted file mode 100644 index 8d5a041..0000000 --- a/src/components/chunking_component.py +++ /dev/null @@ -1,68 +0,0 @@ -import typing as t - -import pandas as pd -import pyarrow as pa -from fondant.component import PandasTransformComponent -from fondant.pipeline import lightweight_component - - -@lightweight_component( - consumes={"text": pa.string()}, - produces={"text": pa.string(), "original_document_id": pa.string()}, - extra_requires=["langchain==0.0.329"], -) -class ChunkTextComponent(PandasTransformComponent): - """Component that chunks text into smaller segments. - More information about the different chunking strategies can be here: - - https://python.langchain.com/docs/modules/data_connection/document_transformers/ - - https://www.pinecone.io/learn/chunking-strategies/. - """ - - def __init__( - self, - *, - chunk_size: int, - chunk_overlap: int, - ): - """ - Args: - chunk_size: the chunk size - chunk_overlap: the overlap between chunks. - """ - from langchain.text_splitter import RecursiveCharacterTextSplitter - - self.chunker = RecursiveCharacterTextSplitter( - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - ) - - def chunk_text(self, row) -> t.List[t.Tuple]: - # Multi-index df has id under the name attribute - doc_id = row.name - text_data = row["text"] - docs = self.chunker.create_documents([text_data]) - - return [ - (doc_id, f"{doc_id}_{chunk_id}", chunk.page_content) - for chunk_id, chunk in enumerate(docs) - ] - - def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: - import itertools - - results = dataframe.apply( - self.chunk_text, - axis=1, - ).to_list() - - # Flatten results - results = list(itertools.chain.from_iterable(results)) - - # Turn into dataframes - results_df = pd.DataFrame( - results, - columns=["original_document_id", "id", "text"], - ) - results_df = results_df.set_index("id") - - return results_df diff --git a/src/components/evaluate_ragas.py b/src/components/evaluate_ragas.py deleted file mode 100644 index 92c99dd..0000000 --- a/src/components/evaluate_ragas.py +++ /dev/null @@ -1,97 +0,0 @@ -import pandas as pd -import pyarrow as pa -from fondant.component import PandasTransformComponent -from fondant.pipeline import lightweight_component - - -@lightweight_component( - consumes={ - "question": pa.string(), - "retrieved_chunks": pa.list_(pa.string()), - }, - produces={ - "context_precision": pa.float32(), - "context_relevancy": pa.float32(), - }, - extra_requires=["ragas==0.0.21"], -) -class RagasEvaluator(PandasTransformComponent): - def __init__( - self, - *, - llm_module_name: str, - llm_class_name: str, - llm_kwargs: dict, - ) -> None: - """ - Args: - llm_module_name: Module from which the LLM is imported. Defaults to - langchain.chat_models - llm_class_name: Name of the selected llm. Defaults to ChatOpenAI - llm_kwargs: Arguments of the selected llm. - """ - self.llm = self.extract_llm( - llm_module_name=llm_module_name, - llm_class_name=llm_class_name, - llm_kwargs=llm_kwargs, - ) - - from ragas.llms import LangchainLLM - - self.gpt_wrapper = LangchainLLM(llm=self.llm) - self.metric_functions = self.extract_metric_functions( - metrics=["context_precision", "context_relevancy"], - ) - self.set_llm(self.metric_functions) - - # import the metric functions selected - @staticmethod - def import_from(module_name: str, element_name: str): - module = __import__(module_name, fromlist=[element_name]) - return getattr(module, element_name) - - def extract_llm(self, llm_module_name: str, llm_class_name: str, llm_kwargs: dict): - module = self.import_from( - module_name=llm_module_name, - element_name=llm_class_name, - ) - return module(**llm_kwargs) - - def extract_metric_functions(self, metrics: list): - functions = [] - for metric in metrics: - functions.append(self.import_from("ragas.metrics", metric)) - return functions - - def set_llm(self, metric_functions: list): - for metric_function in metric_functions: - metric_function.llm = self.gpt_wrapper - - # evaluate the retriever - @staticmethod - def create_hf_ds(dataframe: pd.DataFrame): - dataframe = dataframe.rename( - columns={"retrieved_chunks": "contexts"}, - ) - - from datasets import Dataset - - return Dataset.from_pandas(dataframe) - - def ragas_eval(self, dataset): - from ragas import evaluate - - return evaluate(dataset=dataset, metrics=self.metric_functions) - - def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: - hf_dataset = self.create_hf_ds( - dataframe=dataframe[["question", "retrieved_chunks"]], - ) - if "id" in hf_dataset.column_names: - hf_dataset = hf_dataset.remove_columns("id") - - result = self.ragas_eval(dataset=hf_dataset) - results_df = result.to_pandas() - results_df = results_df.set_index(dataframe.index) - - return results_df diff --git a/src/components/retrieve_from_weaviate.py b/src/components/retrieve_from_weaviate.py deleted file mode 100644 index 608d4c3..0000000 --- a/src/components/retrieve_from_weaviate.py +++ /dev/null @@ -1,73 +0,0 @@ -import pandas as pd -import pyarrow as pa -from fondant.component import PandasTransformComponent -from fondant.pipeline import lightweight_component - - -@lightweight_component( - produces={"retrieved_chunks": pa.list_(pa.string())}, - extra_requires=["weaviate-client==3.24.1"], -) -class RetrieveFromWeaviateComponent(PandasTransformComponent): - def __init__( - self, - *, - weaviate_url: str, - class_name: str, - top_k: int, - ) -> None: - """ - Args: - weaviate_url: An argument passed to the component. - class_name: Name of class to query - top_k: Amount of context to return. - additional_config: Additional configuration passed to the weaviate client. - additional_headers: Additional headers passed to the weaviate client. - hybrid_query: The hybrid query to be used for retrieval. Optional parameter. - hybrid_alpha: Argument to change how much each search affects the results. An alpha - of 1 is a pure vector search. An alpha of 0 is a pure keyword search. - rerank: Whether to rerank the results based on the hybrid query. Defaults to False. - Check this notebook for more information on reranking: - https://github.com/weaviate/recipes/blob/main/ranking/cohere-ranking/cohere-ranking.ipynb - https://weaviate.io/developers/weaviate/search/rerank. - """ - import weaviate - - # Initialize your component here based on the arguments - self.client = weaviate.Client( - url=weaviate_url, - additional_config=None, - additional_headers=None, - ) - self.class_name = class_name - self.k = top_k - - def teardown(self) -> None: - del self.client - - def retrieve_chunks_from_embeddings(self, vector_query: list): - """Get results from weaviate database.""" - query = ( - self.client.query.get(self.class_name, ["passage"]) - .with_near_vector({"vector": vector_query}) - .with_limit(self.k) - .with_additional(["distance"]) - ) - - result = query.do() - result_dict = result["data"]["Get"][self.class_name] - return [retrieved_chunk["passage"] for retrieved_chunk in result_dict] - - def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: - if "embedding" in dataframe.columns: - dataframe["retrieved_chunks"] = dataframe["embedding"].apply( - self.retrieve_chunks_from_embeddings, - ) - - else: - msg = "Dataframe must contain an 'embedding' column" - raise ValueError( - msg, - ) - - return dataframe diff --git a/src/evaluation.ipynb b/src/evaluation.ipynb index db56b5b..d8ea798 100644 --- a/src/evaluation.ipynb +++ b/src/evaluation.ipynb @@ -99,33 +99,6 @@ "!pip install -q -r ../requirements.txt --disable-pip-version-check && echo \"Success\"" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**Check if GPU is available**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import logging\n", - "import subprocess\n", - "\n", - "try:\n", - " subprocess.check_output('nvidia-smi')\n", - " logging.info(\"Found GPU, using it!\")\n", - " number_of_accelerators = 1\n", - " accelerator_name = \"GPU\"\n", - "except Exception:\n", - " logging.warning(\"We recommend to run this pipeline on a GPU, but none could be found, using CPU instead\")\n", - " number_of_accelerators = None\n", - " accelerator_name = None" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -239,54 +212,12 @@ "metadata": {}, "outputs": [], "source": [ - "import pipeline_index\n", "import utils\n", "\n", - "# Path where data and artifacts will be stored\n", "BASE_PATH = \"./data\"\n", "utils.create_directory_if_not_exists(BASE_PATH)\n", - "\n", - "# Parameters shared between indexing and evaluation pipeline\n", - "shared_args = {\n", - " \"base_path\": BASE_PATH,\n", - " \"embed_model_provider\": \"huggingface\",\n", - " \"embed_model\": \"all-MiniLM-L6-v2\",\n", - " \"weaviate_url\": f\"http://{utils.get_host_ip()}:8081\",\n", - " \"weaviate_class\": \"Pipeline1\", # Capitalized, avoid special characters (_, =, -, etc.)\n", - "}\n", - "\n", - "# Parameters for the indexing pipeline\n", - "indexing_args = {\n", - " \"n_rows_to_load\": 1000,\n", - " \"chunk_args\": {\"chunk_size\": 512, \"chunk_overlap\": 32}\n", - "}\n", - "\n", - "# Parameters for the GPU resources\n", - "resources_args = {\n", - " \"number_of_accelerators\": number_of_accelerators,\n", - " \"accelerator_name\": accelerator_name,\n", - "}\n", - "\n", - "indexing_pipeline = pipeline_index.create_pipeline(**shared_args, **indexing_args, **resources_args)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Run the indexing pipeline" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from fondant.pipeline.runner import DockerRunner\n", - "\n", - "runner = DockerRunner()\n", - "runner.run(indexing_pipeline)" + "weaviate_url = f\"http://{utils.get_host_ip()}:8081\"\n", + "weaviate_class = \"Pipeline1\"" ] }, { @@ -364,8 +295,8 @@ "from fondant.pipeline import Pipeline\n", "evaluation_pipeline = Pipeline(\n", " name=\"evaluation-pipeline\",\n", - " description=\"Pipeline to evaluate a RAG solution\",\n", - " base_path=shared_args[\"base_path\"],\n", + " description=\"Pipeline to evaluate a RAG system\",\n", + " base_path=BASE_PATH,\n", ")" ] }, @@ -411,21 +342,15 @@ "metadata": {}, "outputs": [], "source": [ - "from fondant.pipeline import Resources\n", "embed_text_op = load_from_csv.apply(\n", " \"embed_text\",\n", " arguments={\n", - " \"model_provider\": shared_args[\"embed_model_provider\"],\n", - " \"model\": shared_args[\"embed_model\"]\n", + " \"model_provider\": \"huggingface\",\n", + " \"model\": \"all-MiniLM-L6-v2\"\n", " },\n", " consumes={\n", " \"text\": \"question\",\n", - " },\n", - " resources=Resources(\n", - " accelerator_number=number_of_accelerators,\n", - " accelerator_name=accelerator_name,\n", - " ),\n", - " cluster_type=\"local\" if number_of_accelerators is not None else \"default\",\n", + " }\n", ")" ] }, @@ -440,32 +365,157 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from components.retrieve_from_weaviate import RetrieveFromWeaviateComponent\n", - "from components.evaluate_ragas import RagasEvaluator\n", - "from components.aggregrate_eval_results import AggregateResults\n", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:fondant.pipeline.lightweight_component:No consumes defined. Consumes will be inferred from the dataset. All field will be consumed which may lead to additional computation, Consider defining consumes in the component.\n", + " Consumes: {'question': {'type': 'string'}, 'embedding': {'type': 'array', 'items': {'type': 'float32'}}}\n" + ] + } + ], + "source": [ + "import pandas as pd\n", + "import pyarrow as pa\n", + "from fondant.component import PandasTransformComponent\n", + "from fondant.pipeline import lightweight_component\n", "\n", + "\n", + "@lightweight_component(\n", + " produces={\"retrieved_chunks\": pa.list_(pa.string())},\n", + " extra_requires=[\"weaviate-client==3.24.1\"],\n", + ")\n", + "class RetrieveFromWeaviateComponent(PandasTransformComponent):\n", + " def __init__(self, *, weaviate_url: str, class_name: str, top_k: int) -> None:\n", + " import weaviate\n", + "\n", + " self.client = weaviate.Client(\n", + " url=weaviate_url,\n", + " additional_config=None,\n", + " additional_headers=None,\n", + " )\n", + " self.class_name = class_name\n", + " self.k = top_k\n", + "\n", + " def teardown(self) -> None:\n", + " del self.client\n", + "\n", + " def retrieve_chunks_from_embeddings(self, vector_query: list):\n", + " \"\"\"Get results from weaviate database.\"\"\"\n", + " query = (\n", + " self.client.query.get(self.class_name, [\"passage\"])\n", + " .with_near_vector({\"vector\": vector_query})\n", + " .with_limit(self.k)\n", + " .with_additional([\"distance\"])\n", + " )\n", + "\n", + " result = query.do()\n", + " result_dict = result[\"data\"][\"Get\"][self.class_name]\n", + " return [retrieved_chunk[\"passage\"] for retrieved_chunk in result_dict]\n", + "\n", + " def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:\n", + " dataframe[\"retrieved_chunks\"] = dataframe[\"embedding\"].apply(self.retrieve_chunks_from_embeddings)\n", + " return dataframe\n", + "\n", + "# Add component to pipeline\n", "retrieve_chunks = embed_text_op.apply(\n", " RetrieveFromWeaviateComponent,\n", " arguments={\n", - " \"weaviate_url\": shared_args[\"weaviate_url\"],\n", - " \"class_name\": shared_args[\"weaviate_class\"],\n", - " \"top_k\": 5\n", + " \"weaviate_url\": weaviate_url,\n", + " \"class_name\": weaviate_class,\n", + " \"top_k\": 2\n", " },\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "@lightweight_component(\n", + " consumes={\n", + " \"question\": pa.string(),\n", + " \"retrieved_chunks\": pa.list_(pa.string()),\n", + " },\n", + " produces={\n", + " \"context_precision\": pa.float32(),\n", + " \"context_relevancy\": pa.float32(),\n", + " },\n", + " extra_requires=[\"ragas==0.0.21\"],\n", ")\n", + "class RagasEvaluator(PandasTransformComponent):\n", + "\n", + " def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:\n", + " from datasets import Dataset\n", + " from ragas import evaluate\n", + " from ragas.metrics import context_precision, context_relevancy\n", + " from langchain_openai.chat_models import ChatOpenAI\n", + "\n", + " gpt_evaluator = ChatOpenAI(model_name=\"gpt-3.5-turbo\")\n", + "\n", + " dataframe = dataframe.rename(\n", + " columns={\"retrieved_chunks\": \"contexts\"},\n", + " )\n", + " \n", + " dataset = Dataset.from_pandas(dataframe)\n", "\n", + " \n", + " #if \"id\" in hf_dataset.column_names:\n", + " # hf_dataset = hf_dataset.remove_columns(\"id\")\n", + "\n", + " result = evaluate(\n", + " dataset, \n", + " metrics=[context_precision, context_relevancy],\n", + " llm=gpt_evaluator,\n", + " )\n", + "\n", + " results_df = result.to_pandas()\n", + " results_df = results_df.set_index(dataframe.index)\n", + "\n", + " return results_df\n", + " \n", + "# Add component to pipeline\n", "retriever_eval = retrieve_chunks.apply(\n", " RagasEvaluator,\n", - " arguments={\n", - " \"llm_module_name\": evaluation_args[\"llm_module_name\"],\n", - " \"llm_class_name\": evaluation_args[\"llm_class_name\"],\n", - " \"llm_kwargs\": evaluation_args[\"llm_kwargs\"],\n", - " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from fondant.component import DaskTransformComponent\n", + "import dask.dataframe as dd\n", + "\n", + "\n", + "@lightweight_component(\n", + " consumes={\n", + " \"context_precision\": pa.float32(),\n", + " \"context_relevancy\": pa.float32(),\n", + " },\n", + " produces={\n", + " \"metric\": pa.string(),\n", + " \"score\": pa.float32(),\n", + " },\n", ")\n", + "class AggregateResults(DaskTransformComponent):\n", + " def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:\n", + " metrics = list(self.consumes.keys())\n", + " agg = dataframe[metrics].mean()\n", + " agg_df = agg.to_frame(name=\"score\")\n", + " agg_df[\"metric\"] = agg.index\n", + " agg_df.index = agg_df.index.astype(str)\n", "\n", + " return agg_df\n", + "\n", + "# Add component to pipeline\n", "retriever_eval.apply(\n", " AggregateResults, \n", " consumes={\n", @@ -489,13 +539,10 @@ "outputs": [], "source": [ "import os\n", - "if utils.check_weaviate_class_exists(\n", - " local_weaviate_client,\n", - " shared_args[\"weaviate_class\"]\n", - "): \n", - " runner = DockerRunner()\n", - " extra_volumes = [str(os.path.join(os.path.abspath('.'), \"evaluation_datasets\")) + \":/evaldata\"]\n", - " runner.run(evaluation_pipeline, extra_volumes=extra_volumes)" + "from fondant.pipeline.runner import DockerRunner\n", + "runner = DockerRunner() \n", + "extra_volumes = [str(os.path.join(os.path.abspath('.'), \"evaluation_datasets\")) + \":/evaldata\"]\n", + "runner.run(evaluation_pipeline, extra_volumes=extra_volumes)" ] }, {