diff --git a/neumai-tools/neumai_tools/PipelineCollection/PipelineCollection.py b/neumai-tools/neumai_tools/PipelineCollection/PipelineCollection.py index ca858c9..0253731 100644 --- a/neumai-tools/neumai_tools/PipelineCollection/PipelineCollection.py +++ b/neumai-tools/neumai_tools/PipelineCollection/PipelineCollection.py @@ -1,4 +1,6 @@ from typing import List +from numpy import dot +from numpy.linalg import norm from pydantic import BaseModel, Field from neumai.Pipelines.Pipeline import Pipeline from neumai.Shared.NeumSearch import NeumSearchResult @@ -53,4 +55,27 @@ def search_separate(self, query:str, number_of_results:int)-> List: def search_routed(self, query:str, number_of_results:int)-> List: """Routed search based on the contents available in a pipeline""" # Need to add descriptions to the pipeline and generate a basic index on top of them - raise NotImplementedError("In the works. Contact founders@tryneum.com for information") \ No newline at end of file + + pipe_to_similarity = {} + for pipe in self.pipelines: + pipe_representative = pipe.sink.get_representative_vector() + query_vector = pipe.embed.embed_query(query=query) + + # calculating similarity score + pipe_to_similarity[pipe.id] = dot(pipe_representative, query_vector)/(norm(pipe_representative)*norm(query_vector)) + + # We want to sort by decreasing oeder of similarity score + # The more similar the query to a given representative vector + # the higher rank that pipeline would get in terms of search. + # Currently, we are only selection the only pipeline whose + # representative is most similar to the query. + pipe_to_similarity = dict(sorted(pipe_to_similarity.items(), key=lambda x: x[1], reverse=True)[:1]) + + search_results = [] + for pipe_id,similarity_score in pipe_to_similarity.items(): + for pipe in self.pipelines: + if pipe.id==pipe_id: + results = pipe.search(query=query, number_of_results=number_of_results) + break + search_results.append(results) + return search_results \ No newline at end of file diff --git a/neumai/neumai/SinkConnectors/LanceDBSink.py b/neumai/neumai/SinkConnectors/LanceDBSink.py index 4ea99f2..2ba9505 100644 --- a/neumai/neumai/SinkConnectors/LanceDBSink.py +++ b/neumai/neumai/SinkConnectors/LanceDBSink.py @@ -160,6 +160,12 @@ def search(self, vector: List[float], number_of_results: int, filters: List[Filt ) return matches + + def get_representative_vector(self) -> list: + db = self._get_db_connection() + tbl = db.open_table(self.table_name) + return list(tbl.to_pandas()['vector'].mean()) + def info(self) -> NeumSinkInfo: try: diff --git a/neumai/neumai/SinkConnectors/MarqoSink.py b/neumai/neumai/SinkConnectors/MarqoSink.py index 479cd26..eb41a9c 100644 --- a/neumai/neumai/SinkConnectors/MarqoSink.py +++ b/neumai/neumai/SinkConnectors/MarqoSink.py @@ -64,7 +64,7 @@ def _create_index( marqo_client, embedding_dim, similarity: str = 'cosinesimil', - recreate_index: bool = False, + recreate_index: bool = True, ): ''' Create a new index @@ -185,7 +185,7 @@ def _get_filter_string_from_filter_condition(self, filter_conditions:List[Filter operator = condition.operator _filter_string+=self._get_marqo_filter( - column=field, value=condition.value, operator=operator) + column=field, value=condition.value, operator=operator.value) if _filter_string.endswith(" AND "): _filter_string = _filter_string.rstrip(" AND ") @@ -223,6 +223,53 @@ def search(self, vector: List[float], number_of_results: int, filters: List[Filt ) ) return matches + + + def _get_embeddings_from_ids(self, ids): + marqo_client = marqo.Client( + url=self.url, + api_key=self.api_key, + ) + embeddings = [] + for i in ids: + doc = marqo_client.index(self.index_name).get_document( + document_id=i, + expose_facets=True) + tensor = doc['_tensor_facets'][0]['_embedding'] + embeddings.append(tensor) + return embeddings + + def get_representative_vector(self) -> list: + """ + This methods calculates the representative vector for a + particular index (collection of vectors). Currently, this + is simply using the mean of all the vectors in the index. + + Returns: + list: Returns the representative vector as a list of floats + """ + import numpy as np + + marqo_client = marqo.Client( + url=self.url, + api_key=self.api_key, + ) + + # In Neum, we have one vector per document for marqo, so max number of vectors + # would be same as number of documents + max_results = marqo_client.index(self.index_name).get_stats()['numberOfDocuments'] + + vector_dimension = marqo_client.index( + self.index_name + ).get_settings()['index_defaults']['model_properties']['dimensions'] + + dummy_vector = [1.0 for _ in range(vector_dimension)] + ids = [i.id for i in self.search( + vector=dummy_vector, number_of_results=max_results)] + embeddings = self._get_embeddings_from_ids(ids) + + return list(np.mean(embeddings, 0)) + def info(self) -> NeumSinkInfo: url = self.url