Skip to content

Commit

Permalink
apply pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mrchtr committed Feb 6, 2024
1 parent 649eef0 commit d3696ed
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 61 deletions.
9 changes: 4 additions & 5 deletions src/components/aggregrate_eval_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@

@lightweight_component(
consumes={
"context_precision": pa.float32(),
"context_relevancy": pa.float32(),
"context_precision": pa.float32(),
"context_relevancy": pa.float32(),
},
produces={
"metric": pa.string(),
"score": pa.float32()
}
"score": pa.float32(),
},
)
class AggregateResults(DaskTransformComponent):

def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
metrics = list(self.consumes.keys())
agg = dataframe[metrics].mean()
Expand Down
20 changes: 12 additions & 8 deletions src/components/chunking_component.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import typing as t
import typing as t

import pandas as pd
import pyarrow as pa
from fondant.component import PandasTransformComponent
from fondant.pipeline import lightweight_component


@lightweight_component(
consumes={"text":pa.string()},
produces={"text":pa.string(), "original_document_id":pa.string()},
extra_requires=["langchain==0.0.329"]
consumes={"text": pa.string()},
produces={"text": pa.string(), "original_document_id": pa.string()},
extra_requires=["langchain==0.0.329"],
)
class ChunkTextComponent(PandasTransformComponent):
"""Component that chunks text into smaller segments.
More information about the different chunking strategies can be here:
- https://python.langchain.com/docs/modules/data_connection/document_transformers/
- https://www.pinecone.io/learn/chunking-strategies/.
"""
"""

def __init__(
self,
Expand All @@ -24,13 +26,14 @@ def __init__(
):
"""
Args:
chunk_size: the chunk size
chunk_overlap: the overlap between chunks
chunk_size: the chunk size
chunk_overlap: the overlap between chunks.
"""
from langchain.text_splitter import RecursiveCharacterTextSplitter

self.chunker = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
chunk_overlap=chunk_overlap,
)

def chunk_text(self, row) -> t.List[t.Tuple]:
Expand All @@ -46,6 +49,7 @@ def chunk_text(self, row) -> t.List[t.Tuple]:

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
import itertools

results = dataframe.apply(
self.chunk_text,
axis=1,
Expand Down
29 changes: 15 additions & 14 deletions src/components/evaluate_ragas.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
import typing as t
import pyarrow as pa
import pandas as pd
import pyarrow as pa
from fondant.component import PandasTransformComponent
from fondant.pipeline import lightweight_component



@lightweight_component(
consumes={
"question": pa.string(),
"retrieved_chunks": pa.list_(pa.string())
},
"question": pa.string(),
"retrieved_chunks": pa.list_(pa.string()),
},
produces={
"context_precision": pa.float32(),
"context_relevancy": pa.float32()
},
extra_requires=["ragas==0.0.21"]
"context_precision": pa.float32(),
"context_relevancy": pa.float32(),
},
extra_requires=["ragas==0.0.21"],
)
class RagasEvaluator(PandasTransformComponent):
def __init__(
self,
*,
llm_module_name: str,
llm_class_name: str,
llm_kwargs: dict
llm_kwargs: dict,
) -> None:
"""
Args:
llm_module_name: Module from which the LLM is imported. Defaults to
langchain.chat_models
llm_class_name: Name of the selected llm. Defaults to ChatOpenAI
llm_kwargs: Arguments of the selected llm
llm_kwargs: Arguments of the selected llm.
"""
self.llm = self.extract_llm(
llm_module_name=llm_module_name,
llm_class_name=llm_class_name,
llm_kwargs=llm_kwargs,
)

from ragas.llms import LangchainLLM

self.gpt_wrapper = LangchainLLM(llm=self.llm)
self.metric_functions = self.extract_metric_functions(
metrics=["context_precision", "context_relevancy"],
Expand Down Expand Up @@ -76,10 +75,12 @@ def create_hf_ds(dataframe: pd.DataFrame):
)

from datasets import Dataset

return Dataset.from_pandas(dataframe)

def ragas_eval(self, dataset):
from ragas import evaluate

return evaluate(dataset=dataset, metrics=self.metric_functions)

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -93,4 +94,4 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
results_df = result.to_pandas()
results_df = results_df.set_index(dataframe.index)

return results_df
return results_df
8 changes: 3 additions & 5 deletions src/components/retrieve_from_weaviate.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import typing as t
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from fondant.component import PandasTransformComponent
from fondant.pipeline import lightweight_component


@lightweight_component(
produces={"retrieved_chunks": pa.list_(pa.string())},
extra_requires=["weaviate-client==3.24.1"]
extra_requires=["weaviate-client==3.24.1"],
)
class RetrieveFromWeaviateComponent(PandasTransformComponent):
def __init__(
Expand Down Expand Up @@ -60,7 +59,6 @@ def retrieve_chunks_from_embeddings(self, vector_query: list):
return [retrieved_chunk["passage"] for retrieved_chunk in result_dict]

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:

if "embedding" in dataframe.columns:
dataframe["retrieved_chunks"] = dataframe["embedding"].apply(
self.retrieve_chunks_from_embeddings,
Expand All @@ -72,4 +70,4 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
msg,
)

return dataframe
return dataframe
28 changes: 14 additions & 14 deletions src/pipeline_eval.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Fondant pipeline to evaluate a RAG pipeline."""

import pyarrow as pa
from fondant.pipeline import Pipeline, Resources
from components.retrieve_from_weaviate import RetrieveFromWeaviateComponent
from components.evaluate_ragas import RagasEvaluator
from components.aggregrate_eval_results import AggregateResults
from components.evaluate_ragas import RagasEvaluator
from components.retrieve_from_weaviate import RetrieveFromWeaviateComponent
from fondant.pipeline import Pipeline, Resources


def create_pipeline(
Expand Down Expand Up @@ -61,29 +61,29 @@ def create_pipeline(
)

retrieve_chunks = embed_text_op.apply(
RetrieveFromWeaviateComponent,
arguments={
"weaviate_url": weaviate_url,
"class_name": weaviate_class,
"top_k": retrieval_top_k
},
RetrieveFromWeaviateComponent,
arguments={
"weaviate_url": weaviate_url,
"class_name": weaviate_class,
"top_k": retrieval_top_k,
},
)

retriever_eval = retrieve_chunks.apply(
RagasEvaluator,
arguments={
"llm_module_name": llm_module_name,
"llm_class_name": llm_class_name,
"llm_kwargs": llm_kwargs
}
"llm_kwargs": llm_kwargs,
},
)

retriever_eval.apply(
AggregateResults,
AggregateResults,
consumes={
"context_precision": "context_precision",
"context_relevancy": "context_relevancy"
}
"context_relevancy": "context_relevancy",
},
)

return evaluation_pipeline
28 changes: 13 additions & 15 deletions src/pipeline_index.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Fondant pipeline to index a RAG system."""
import pyarrow as pa
from fondant.pipeline import Pipeline, Resources
from pathlib import Path

import pyarrow as pa
from components.chunking_component import ChunkTextComponent
from fondant.pipeline import Pipeline, Resources


def create_pipeline(
*,
Expand All @@ -17,15 +19,13 @@ def create_pipeline(
accelerator_name=None,
):
"""Create a Fondant pipeline based on the provided arguments."""


Path(base_path).mkdir(parents=True, exist_ok=True)

pipeline = Pipeline(
name="indexing-pipeline",
description="Pipeline to prepare and process data for building a RAG solution",
base_path=base_path
)
base_path=base_path,
)

text = pipeline.read(
"load_from_hf_hub",
Expand All @@ -35,29 +35,27 @@ def create_pipeline(
"n_rows_to_load": n_rows_to_load,
},
produces={
"text": pa.string()
}
"text": pa.string(),
},
)


chunks = text.apply(
ChunkTextComponent,
arguments=chunk_args
arguments=chunk_args,
)


embeddings = chunks.apply(
"embed_text",
arguments={
"model_provider": embed_model_provider,
"model": embed_model
"model": embed_model,
},
resources=Resources(
accelerator_number=number_of_accelerators,
accelerator_name=accelerator_name,
),
cluster_type="local" if number_of_accelerators is not None else "default",
cache=False
cache=False,
)

embeddings.write(
Expand All @@ -68,8 +66,8 @@ def create_pipeline(
},
consumes={
"text": pa.string(),
"embedding": pa.list_(pa.float32()),
}
"embedding": pa.list_(pa.float32()),
},
)

return pipeline

0 comments on commit d3696ed

Please sign in to comment.