Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of smart search #61

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
from scipy.spatial.distance import cosine
ddematheu marked this conversation as resolved.
Show resolved Hide resolved
from pydantic import BaseModel, Field
from neumai.Pipelines.Pipeline import Pipeline
from neumai.Shared.NeumSearch import NeumSearchResult
Expand Down Expand Up @@ -53,4 +54,29 @@ def search_separate(self, query:str, number_of_results:int)-> List:
def search_routed(self, query:str, number_of_results:int)-> List:
"""Routed search based on the contents available in a pipeline"""
# Need to add descriptions to the pipeline and generate a basic index on top of them
raise NotImplementedError("In the works. Contact [email protected] for information")

pipe_to_similarity = {}
for pipe in self.pipelines:
pipe_representative = pipe.sink.get_representative_vector()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems good. The representative vector in the future could be extended to also use the description as the representative vector in cases where there is not possible to compute one of the sink or where you would want to use that instead.

query_vector = pipe.embed.embed_query(query=query)
distance_from_representative = cosine(pipe_representative, query_vector)

# Similarity score, hence subtracted distance from 1
pipe_to_similarity[pipe.id] = 1 - distance_from_representative

# We want to sort by decreasing oeder of similarity score
# The more similar the query to a given representative vector
# the higher rank that pipeline would get in terms of search.
# Currently, we are only selection the only pipeline whose
# representative is most similar to the query.
pipe_to_similarity = dict(sorted(pipe_to_similarity.items(), key=lambda x: x[1], reverse=True)[:1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably make the number of pipelines to use configurable. Assume in most cases you might want to pick the highest. Also might want to consider making it configurable to using any pipelines with a similarity score above X.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about it, I think that making it configurable to similarities above a certain threshold seems best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ddematheu Yep, sounds good. But deciding that threshold is difficult. I mean, you never know even the most similar pipe can have similarity scores that are low sounding like 0.4 etc. We need some way to decide this threshold as per the computed similarities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree with this.maybe keeping to top to start is fine. My thinking is that we could provide different modalities. One where we pass results for N highest pipelines (default to N=1) and another for threshold based where we pass results for pipelines with similarity is higher than N (default N = 0.5 or whatever)

One additional modality might be to look at distribution. Where N is the max distance or % away from the highest scoring.

Btw this are just thoughts, happy to commit to starting with highest and add other modalities based on feedback.

Copy link
Contributor Author

@sky-2002 sky-2002 Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ddematheu These are interesting ideas. Especially the distribution one, sounds similar to(not exactly) p-sampling in language models, maybe we can use that idea, like considering all pipelines whose similarities add up to or more than a value, considering descending order of similarities. But yes, its upto the user feedback that you get.


search_results = []
for pipe_id,similarity_score in pipe_to_similarity.items():
for pipe in self.pipelines:
if pipe.id==pipe_id:
results = pipe.search(query=query, number_of_results=number_of_results)
break
search_results.append(results)
return search_results
# raise NotImplementedError("In the works. Contact [email protected] for information")
6 changes: 6 additions & 0 deletions neumai/neumai/SinkConnectors/LanceDBSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def search(self, vector: List[float], number_of_results: int, filters: List[Filt
)
return matches


def get_representative_vector(self) -> list:
db = self._get_db_connection()
tbl = db.open_table(self.table_name)
return list(tbl.to_pandas()['vector'].mean())


def info(self) -> NeumSinkInfo:
try:
Expand Down
51 changes: 49 additions & 2 deletions neumai/neumai/SinkConnectors/MarqoSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _create_index(
marqo_client,
embedding_dim,
similarity: str = 'cosinesimil',
recreate_index: bool = False,
recreate_index: bool = True,
):
'''
Create a new index
Expand Down Expand Up @@ -185,7 +185,7 @@ def _get_filter_string_from_filter_condition(self, filter_conditions:List[Filter
operator = condition.operator

_filter_string+=self._get_marqo_filter(
column=field, value=condition.value, operator=operator)
column=field, value=condition.value, operator=operator.value)

if _filter_string.endswith(" AND "):
_filter_string = _filter_string.rstrip(" AND ")
Expand Down Expand Up @@ -223,6 +223,53 @@ def search(self, vector: List[float], number_of_results: int, filters: List[Filt
)
)
return matches


def _get_embeddings_from_ids(self, ids):
marqo_client = marqo.Client(
url=self.url,
api_key=self.api_key,
)
embeddings = []
for i in ids:
doc = marqo_client.index(self.index_name).get_document(
document_id=i,
expose_facets=True)
tensor = doc['_tensor_facets'][0]['_embedding']
embeddings.append(tensor)
return embeddings

def get_representative_vector(self) -> list:
"""
This methods calculates the representative vector for a
particular index (collection of vectors). Currently, this
is simply using the mean of all the vectors in the index.

Returns:
list: Returns the representative vector as a list of floats
"""
import numpy as np

marqo_client = marqo.Client(
url=self.url,
api_key=self.api_key,
)

# In Neum, we have one vector per document for marqo, so max number of vectors
# would be same as number of documents
max_results = marqo_client.index(self.index_name).get_stats()['numberOfDocuments']

vector_dimension = marqo_client.index(
self.index_name
).get_settings()['index_defaults']['model_properties']['dimensions']

dummy_vector = [1.0 for _ in range(vector_dimension)]
ids = [i.id for i in self.search(
vector=dummy_vector, number_of_results=max_results)]
embeddings = self._get_embeddings_from_ids(ids)

return list(np.mean(embeddings, 0))


def info(self) -> NeumSinkInfo:
url = self.url
Expand Down