Skip to content

Commit

Permalink
0.0.38 (#42)
Browse files Browse the repository at this point in the history
Filter conditions for sinks
Support across Marqo, Weaviate, Pinecone, Qdrant, Supabase and SingleStore
Fixes to issues in LanceDB
Fix to Search results to make score optional
  • Loading branch information
ddematheu authored Dec 29, 2023
1 parent 773822f commit ee96901
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 46 deletions.
5 changes: 3 additions & 2 deletions neumai/neumai/Pipelines/Pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from neumai.SinkConnectors.filter_utils import FilterCondition
from .PipelineRun import PipelineRun
from .TriggerSchedule import TriggerSchedule
from neumai.SinkConnectors.SinkConnector import SinkConnector
Expand Down Expand Up @@ -152,9 +153,9 @@ def run(self) -> int:
except Exception as e:
raise e

def search(self, query:str, number_of_results:int, filter:dict={}) -> List[NeumSearchResult]:
def search(self, query:str, number_of_results:int, filters:List[FilterCondition]={}) -> List[NeumSearchResult]:
vector_for_query = self.embed.embed_query(query=query)
matches = self.sink.search(vector=vector_for_query, number_of_results=number_of_results, filter=filter)
matches = self.sink.search(vector=vector_for_query, number_of_results=number_of_results, filters=filters)
return matches

# Todo standardize the model serialization as we are mixing FE and BE concepts into the SDK
Expand Down
2 changes: 1 addition & 1 deletion neumai/neumai/Shared/NeumSearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ class NeumSearchResult(BaseModel):

id:str = Field(..., description="Search result vector ID")
metadata:dict = Field(...,description="Search result vector metadata")
score:float = Field(..., description="Search result similarity score")
score: Optional[float] = Field(None, description="Search result similarity score")
vector: Optional[List[float]] = Field(None, description="Search result vector")
3 changes: 2 additions & 1 deletion neumai/neumai/SinkConnectors/LanceDBSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from neumai.SinkConnectors.SinkConnector import SinkConnector
from typing import List, Optional
from neumai.SinkConnectors.filter_utils import FilterCondition
from pydantic import Field

import lancedb
Expand Down Expand Up @@ -100,7 +101,7 @@ def store(self, vectors_to_store: List[NeumVector]) -> int:


def search(self, vector: List[float],
number_of_results: int, filter: dict = {}) -> List[NeumSearchResult]:
number_of_results: int, filter: List[FilterCondition] = []) -> List[NeumSearchResult]:

db = self._get_db_connection()
tbl = db.open_table(self.table_name)
Expand Down
6 changes: 2 additions & 4 deletions neumai/neumai/SinkConnectors/MarqoSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _get_marqo_filter(self, column: str, value: Any, operator: FilterOperator) -
raise Exception(f"Operator {operator} is currently not supported")


def _get_filter_string_from_filter_condition(self, filter_conditions):
def _get_filter_string_from_filter_condition(self, filter_conditions:List[FilterCondition]):

_filter_string = ""
for condition in filter_conditions:
Expand All @@ -192,12 +192,10 @@ def _get_filter_string_from_filter_condition(self, filter_conditions):
return _filter_string


def search(self, vector: List[float], number_of_results: int, filter: List[dict] = [{}]) -> List:
def search(self, vector: List[float], number_of_results: int, filters: List[FilterCondition] = []) -> List:
url = self.url
api_key = self.api_key
index_name = self.index_name

filter = dict_to_filter_condition(filter)
filter_string = self._get_filter_string_from_filter_condition(filter_conditions=filter)

try:
Expand Down
30 changes: 28 additions & 2 deletions neumai/neumai/SinkConnectors/PineconeSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
PineconeIndexInfoException,
PineconeQueryException,
)
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
from pydantic import Field
import pinecone

Expand Down Expand Up @@ -102,21 +103,46 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
raise PineconeInsertionException(f"Failed to store in Pinecone. Exception - {e}")
return int(vectors_stored)

def search(self, vector: List[float], number_of_results:int, filter:dict = {}) -> List[NeumSearchResult]:
def translate_to_pinecone(filter_conditions:List[FilterCondition]):
query_parts = []

for condition in filter_conditions:
mongo_operator = {
FilterOperator.EQUAL: '$eq',
FilterOperator.NOT_EQUAL: '$ne',
FilterOperator.GREATER_THAN: '$gt',
FilterOperator.GREATER_THAN_OR_EQUAL: '$gte',
FilterOperator.LESS_THAN: '$lt',
FilterOperator.LESS_THAN_OR_EQUAL: '$lte',
FilterOperator.IN: '$in',
}.get(condition.operator, None)

if mongo_operator:
query_parts.append({condition.field: {mongo_operator: condition.value}})
else:
# Handle complex cases like IN, NOT IN, etc.
pass

return {"$and": query_parts} # Combine using $and, can be changed to $or if needed

def search(self, vector: List[float], number_of_results:int, filter:List[FilterCondition] = []) -> List[NeumSearchResult]:
import pinecone
api_key = self.api_key
environment = self.environment
index = self.index
namespace = self.namespace
if environment == "gcp-starter": namespace = None # short-term fix given gcp-starter limitation

filters = self.translate_to_pinecone(filter)

try:
pinecone.init(
api_key=api_key,
environment=environment)
index = pinecone.Index(index)
results = index.query(
vector=vector,
filter=filter,
filter=filters,
top_k=number_of_results,
namespace=namespace,
include_values=False,
Expand Down
52 changes: 51 additions & 1 deletion neumai/neumai/SinkConnectors/QdrantSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
)
from neumai.SinkConnectors.SinkConnector import SinkConnector
from typing import List, Optional
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
from qdrant_client.http.models import Distance, VectorParams
from qdrant_client.http.models import PointStruct
from qdrant_client.http.models import UpdateStatus
from qdrant_client import QdrantClient
from qdrant_client.http.models import Filter
from pydantic import Field

class QdrantSink(SinkConnector):
Expand Down Expand Up @@ -85,10 +87,57 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
return len(points)
raise QdrantInsertionException("Qdrant storing failed. Try again later.")

def search(self, vector: List[float], number_of_results: int, filter:dict = {}) -> List:
def filter_conditions_to_qdrant_filter(filters: List[FilterCondition]) -> dict:
if len(filters) > 1:
weaviate_filter = {
"operator":"And",
"operands" : []
}
for filter in filters:
weaviate_filter = {
"path":[filter.field],
"operator": filter.operator,
"valueText": filter.value
}
weaviate_filter["operands"].append(weaviate_filter)
else:
neum_filter = filters[0]
weaviate_filter = {
"path":[neum_filter.field],
"operator": neum_filter.operator,
"valueText": neum_filter.value
}
return weaviate_filter

def translate_to_qdrant(filter_conditions:List[FilterCondition]):
qdrant_filter = {"must": []}

for condition in filter_conditions:
if condition.operator == FilterOperator.EQUAL:
qdrant_filter["must"].append({"key": condition.field, "match": {"value": condition.value}})
elif condition.operator == FilterOperator.NOT_EQUAL:
# Qdrant doesn't have a direct "not equal" filter, so it's handled with must_not
qdrant_filter.setdefault("must_not", []).append({"key": condition.field, "match": {"value": condition.value}})
elif condition.operator in [FilterOperator.LESS_THAN, FilterOperator.LESS_THAN_OR_EQUAL,
FilterOperator.GREATER_THAN, FilterOperator.GREATER_THAN_OR_EQUAL]:
range_filter = {"key": condition.field, "range": {}}
if condition.operator == FilterOperator.LESS_THAN:
range_filter["range"]["lt"] = condition.value
elif condition.operator == FilterOperator.LESS_THAN_OR_EQUAL:
range_filter["range"]["lte"] = condition.value
elif condition.operator == FilterOperator.GREATER_THAN:
range_filter["range"]["gt"] = condition.value
elif condition.operator == FilterOperator.GREATER_THAN_OR_EQUAL:
range_filter["range"]["gte"] = condition.value
qdrant_filter["must"].append(range_filter)

return qdrant_filter

def search(self, vector: List[float], number_of_results: int, filter:List[FilterCondition]=[]) -> List:
url = self.url
api_key = self.api_key
collection_name = self.collection_name
filters = self.translate_to_qdrant(filter)

try:
qdrant_client = QdrantClient(
Expand All @@ -100,6 +149,7 @@ def search(self, vector: List[float], number_of_results: int, filter:dict = {})
query_vector=vector,
with_payload= True,
limit=number_of_results,
query_filter=Filter(**filters)
)
except Exception as e:
raise QdrantQueryException(f"Failed to query Qdrant. Exception - {e}")
Expand Down
35 changes: 30 additions & 5 deletions neumai/neumai/SinkConnectors/SingleStoreSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)
from neumai.SinkConnectors.SinkConnector import SinkConnector
from typing import List, Optional
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
from pydantic import Field
import singlestoredb as s2

Expand Down Expand Up @@ -115,15 +116,39 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:

return len(vectors_to_store), None

def search(self, vector: List[float], number_of_results: int, filter:dict={}) -> List[NeumSearchResult]:
def translate_to_sql(filter_conditions:List[FilterCondition]):
query_parts = []
for condition in filter_conditions:
sql_operator = condition.operator.value
# Handle special formatting for IN, NOT IN, BETWEEN, etc.
if condition.operator in [FilterOperator.IN, FilterOperator.NOT_IN]:
values = '(' + ', '.join(map(str, condition.value.split(','))) + ')'
else:
values = condition.value

query_parts.append(f"{condition.field} {sql_operator} {values}")

conditions_str = " AND ".join(query_parts)
return conditions_str

def search(self, vector: List[float], number_of_results: int, filter:List[FilterCondition]=[]) -> List[NeumSearchResult]:
url = self.url
table = self.table

query = f"""SELECT id, text, dot_product(vector, json_array_pack('{vector}')) AS score
FROM {table}
ORDER BY score DESC
LIMIT {number_of_results}"""
if len(filter)>0:
list_of_fields = ",".join([f.field for f in filter])
query = f"""SELECT id, text, dot_product(vector, json_array_pack('{vector}')) AS score, {list_of_fields}
FROM {table}
WHERE {self.translate_to_sql(filter)}
ORDER BY score DESC
LIMIT {number_of_results}"""

else:
query = f"""SELECT id, text, dot_product(vector, json_array_pack('{vector}')) AS score
FROM {table}
ORDER BY score DESC
LIMIT {number_of_results}"""

try:
with s2.connect(url, results_type="dict") as conn:
with conn.cursor() as cur:
Expand Down
3 changes: 2 additions & 1 deletion neumai/neumai/SinkConnectors/SinkConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel
from neumai.SinkConnectors.filter_utils import FilterCondition
import json

class SinkConnector(ABC, BaseModel):
Expand Down Expand Up @@ -32,7 +33,7 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
"""Store vectors with a given service"""

@abstractmethod
def search(self, vector:List[float], number_of_results:int, filter:dict={}) -> List[NeumSearchResult]:
def search(self, vector:List[float], number_of_results:int, filters:List[FilterCondition]={}) -> List[NeumSearchResult]:
"""Search vectors for a given service"""

@abstractmethod
Expand Down
30 changes: 28 additions & 2 deletions neumai/neumai/SinkConnectors/SupabaseSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SupabaseIndexInfoException,
SupabaseQueryException
)
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
from pydantic import Field
import vecs

Expand Down Expand Up @@ -83,10 +84,35 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
vx.disconnect()
return len(vectors_to_store)

def search(self, vector: List[float], number_of_results:int, filter:dict={}) -> List:
def translate_to_supabase(filter_conditions:List[FilterCondition]):
query_parts = []

for condition in filter_conditions:
mongo_operator = {
FilterOperator.EQUAL: '$eq',
FilterOperator.NOT_EQUAL: '$ne',
FilterOperator.GREATER_THAN: '$gt',
FilterOperator.GREATER_THAN_OR_EQUAL: '$gte',
FilterOperator.LESS_THAN: '$lt',
FilterOperator.LESS_THAN_OR_EQUAL: '$lte',
FilterOperator.IN: '$in',
}.get(condition.operator, None)

if mongo_operator:
query_parts.append({condition.field: {mongo_operator: condition.value}})
else:
# Handle complex cases like IN, NOT IN, etc.
pass

return {"$and": query_parts} # Combine using $and, can be changed to $or if needed

def search(self, vector: List[float], number_of_results:int, filter:List[FilterCondition]=[]) -> List:
database_connection = self.database_connection
vx = vecs.create_client(database_connection)
collection_name = self.collection_name

filters = self.translate_to_supabase(filter)

try:
db = vx.get_collection(name=collection_name)
except:
Expand All @@ -99,7 +125,7 @@ def search(self, vector: List[float], number_of_results:int, filter:dict={}) ->
include_metadata=True,
include_value=True,
limit=number_of_results,
filters=filter
filters=filters
)
except Exception as e:
raise SupabaseQueryException(f"Error querying vectors from Supabase. Exception: {e}")
Expand Down
28 changes: 26 additions & 2 deletions neumai/neumai/SinkConnectors/WeaviateSink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
WeaviateIndexInfoException,
WeaviateQueryException
)
from neumai.SinkConnectors.filter_utils import FilterCondition
from pydantic import Field
from weaviate.util import generate_uuid5, _capitalize_first_letter
import weaviate
Expand Down Expand Up @@ -171,7 +172,29 @@ def store(self, vectors_to_store:List[NeumVector]) -> Tuple[List, dict]:

return len(vectors_to_store)

def search(self, vector: List[float], number_of_results: int, filter:dict={}) -> List[NeumSearchResult]:
def filter_conditions_to_weaviate_filter(filters: List[FilterCondition]) -> dict:
if len(filters) > 1:
weaviate_filter = {
"operator":"And",
"operands" : []
}
for filter in filters:
weaviate_filter = {
"path":[filter.field],
"operator": filter.operator,
"valueText": filter.value
}
weaviate_filter["operands"].append(weaviate_filter)
else:
neum_filter = filters[0]
weaviate_filter = {
"path":[neum_filter.field],
"operator": neum_filter.operator,
"valueText": neum_filter.value
}
return weaviate_filter

def search(self, vector: List[float], number_of_results: int, filter:List[FilterCondition]=[]) -> List[NeumSearchResult]:
api_key = self.api_key
url = self.url
# Weaviate requires first letter to be capitalized
Expand Down Expand Up @@ -200,7 +223,8 @@ def search(self, vector: List[float], number_of_results: int, filter:dict={}) ->

# Add .with_where(filter) only if filter is not empty
if filter:
client_query = client_query.with_where(filter)
weaviate_filter = self.filter_conditions_to_weaviate_filter(filter)
client_query = client_query.with_where(weaviate_filter)

# Final execution of the query
search_result = client_query.do()
Expand Down
Loading

0 comments on commit ee96901

Please sign in to comment.