Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for Deeplake v4 api #16952

Merged
merged 14 commits into from
Nov 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
"""

import logging
from typing import Any, List, Optional, cast
from typing import Any, List, Optional, cast, Dict, Union, Iterable, Callable

import uuid
from llama_index.core.bridge.pydantic import PrivateAttr
from llama_index.core.schema import BaseNode, MetadataMode, TextNode
from llama_index.core.vector_stores.types import (
Expand All @@ -22,7 +23,235 @@
node_to_metadata_dict,
)

from deeplake.core.vectorstore.deeplake_vectorstore import VectorStore
try:
import deeplake

if deeplake.__version__.startswith("3."):
DEEPLAKE_V4 = False
from deeplake.core.vectorstore import VectorStore
else:
DEEPLAKE_V4 = True

class VectorStore:
def __init__(
self,
path: str,
read_only: bool = False,
token: Optional[str] = None,
exec_option: Optional[str] = None,
verbose: bool = False,
runtime: Optional[Dict] = None,
index_params: Optional[Dict[str, Union[int, str]]] = None,
**kwargs: Any,
):
if DEEPLAKE_INSTALLED is False:
raise ImportError(
"Could not import deeplake python package. "
"Please install it with `pip install deeplake[enterprise]`."
)
self.path = path
self.read_only = read_only
self.token = token
self.exec_options = exec_option
self.verbose = verbose
self.runtime = runtime
self.index_params = index_params
self.kwargs = kwargs
if read_only:
try:
self.ds = deeplake.open_read_only(self.path, self.token)
except Exception as e:
try:
self.ds = deeplake.query(
f"select * from {self.path}", token=self.token
)
except Exception:
raise e
else:
try:
self.ds = deeplake.open(self.path, self.token)
except deeplake.LogNotexistsError:
self.ds = None

def tensors(self) -> list[str]:
return [c.name for c in self.ds.schema.columns]

def add(
self,
text: List[str],
metadata: Optional[List[dict]],
embedding_data: Iterable[str],
embedding_tensor: str,
embedding_function: Optional[Callable] = None,
return_ids: bool = False,
**tensors: Any,
) -> Optional[list[str]]:
if embedding_function is not None:
embedding_data = embedding_function(text)
if embedding_tensor is None:
embedding_tensor = "embedding"
_id = (
tensors["id"]
if "id" in tensors
else [str(uuid.uuid1()) for _ in range(len(text))]
)
if self.ds is None:
emb_size = len(embedding_data[0])
self.__create_dataset(emb_size)

self.ds.append(
{
"text": text,
"metadata": metadata,
embedding_tensor: embedding_data,
"id": _id,
}
)
self.ds.commit()
if return_ids:
return _id
else:
return None

def search_tql(
self, query: str, exec_options: Optional[str]
) -> Dict[str, Any]:
view = self.ds.query(query)
return self.__view_to_docs(view)

def search(
self,
embedding: Union[None, str, List[float]] = None,
k: Optional[int] = None,
distance_metric: Optional[str] = None,
filter: Optional[Dict[str, Any]] = None,
exec_option: Optional[str] = None,
deep_memory: Optional[bool] = None,
return_tensors: Optional[List[str]] = None,
query: Optional[str] = None,
) -> Dict[str, Any]:
if query is None and embedding is None and filter is None:
raise ValueError(
"all, `filter` , `embedding` and `query` were not specified."
" Please specify at least one."
)
if query is not None:
return self.search_tql(query, exec_option)

if isinstance(embedding, str):
if self.embedding_function is None:
raise ValueError(
"embedding_function is required when embedding is a string"
)
embedding = self.embedding_function.embed_documents([embedding])[0]
emb_str = (
None
if embedding is None
else ", ".join([str(e) for e in embedding])
)

column_list = " * " if not return_tensors else ", ".join(return_tensors)

metric = self.__metric_to_function(distance_metric)
order_by = " ASC "
if metric == "cosine_similarity":
order_by = " DESC "
dp = f"(embedding, ARRAY[{emb_str}])"
if emb_str is not None:
column_list += (
f", {self.__metric_to_function(distance_metric)}{dp} as score"
)
mf = self.__metric_to_function(distance_metric)

order_by_clause = (
"" if emb_str is None else f"ORDER BY {mf}{dp} {order_by}"
)
where_clause = self.__generate_where_clause(filter)
limit_clause = "" if k is None else f"LIMIT {k}"

query = f"SELECT {column_list} {where_clause} {order_by_clause} {limit_clause}"
print(">>>>>>>>>>>>>", query)
view = self.ds.query(query)
return self.__view_to_docs(view)

def delete(
self,
ids: List[str],
filter: Optional[Dict[str, Any]] = None,
delete_all: Optional[bool] = None,
) -> None:
if ids is not None:
print(
f"SELECT * from (select *,ROW_NUMBER() as r_id) where id IN ({str(ids)[1:-1]})"
)
view = self.ds.query(
f"SELECT * from (select *,ROW_NUMBER() as r_id) where id IN ({str(ids)[1:-1]})"
)
dlist = view["r_id"][:].tolist()
dlist.reverse()
print(dlist)
for _id in dlist:
self.ds.delete(int(_id))

def dataset(self) -> Any:
return self.ds

def __view_to_docs(self, view: Any) -> Dict[str, Any]:
docs = {}
tenors = [(c.name, str(c.dtype)) for c in view.schema.columns]
for name, type in tenors:
if type == "dict":
docs[name] = [i.to_dict() for i in view[name][:]]
else:
try:
docs[name] = view[name][:].tolist()
except AttributeError:
docs[name] = view[name][:]
return docs

def __metric_to_function(self, metric: str) -> str:
if (
metric is None
or metric == "cosine"
or metric == "cosine_similarity"
):
return "cosine_similarity"
elif metric == "l2" or metric == "l2_norm":
return "l2_norm"
else:
raise ValueError(
f"Unknown metric: {metric}, should be one of "
"['cosine', 'cosine_similarity', 'l2', 'l2_norm']"
)

def __generate_where_clause(self, filter: Dict[str, Any]) -> str:
if filter is None:
return ""
where_clause = "WHERE "
for key, value in filter.items():
if isinstance(value, list):
where_clause += f"{key} IN ({str(value)[1:-1]}) AND "
else:
where_clause += f"{key} == {value} AND "
return where_clause[:-5]

def __create_dataset(self, emb_size=None) -> None:
if emb_size is None:
if self.embedding_function is None:
raise ValueError(
"embedding_function is required to create a new dataset"
)
emb_size = len(self.embedding_function.embed_documents(["test"])[0])
self.ds = deeplake.create(self.path, self.token)
self.ds.add_column("text", deeplake.types.Text("inverted"))
self.ds.add_column("metadata", deeplake.types.Dict())
self.ds.add_column("embedding", deeplake.types.Embedding(size=emb_size))
self.ds.add_column("id", deeplake.types.Text)
self.ds.commit()

DEEPLAKE_INSTALLED = True
except ImportError:
DEEPLAKE_INSTALLED = False

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,12 +284,12 @@ class DeepLakeVectorStore(BasePydanticVectorStore):
token: Optional[str]
read_only: Optional[bool]
dataset_path: str
vectorstore: Any = "VectorStore"

_embedding_dimension: int = PrivateAttr()
_ttl_seconds: Optional[int] = PrivateAttr()
_deeplake_db: Any = PrivateAttr()
_deeplake_db_collection: Any = PrivateAttr()
_vectorstore: "VectorStore" = PrivateAttr()
_id_tensor_name: str = PrivateAttr()

def __init__(
Expand Down Expand Up @@ -107,7 +336,7 @@ def __init__(
num_workers=ingestion_num_workers,
)

self._vectorstore = VectorStore(
self.vectorstore = VectorStore(
path=dataset_path,
ingestion_batch_size=ingestion_batch_size,
num_workers=ingestion_num_workers,
Expand All @@ -118,7 +347,12 @@ def __init__(
verbose=verbose,
**kwargs,
)
self._id_tensor_name = "ids" if "ids" in self._vectorstore.tensors() else "id"
try:
self._id_tensor_name = (
"ids" if "ids" in self.vectorstore.tensors() else "id"
)
except AttributeError:
self._id_tensor_name = "id"

@property
def client(self) -> Any:
Expand All @@ -127,10 +361,10 @@ def client(self) -> Any:
Returns:
Any: DeepLake vectorstore dataset.
"""
return self._vectorstore.dataset
return self.vectorstore.dataset

def summary(self):
self._vectorstore.summary()
self.vectorstore.summary()

def get_nodes(
self,
Expand All @@ -139,9 +373,9 @@ def get_nodes(
) -> List[BaseNode]:
"""Get nodes from vector store."""
if node_ids:
data = self._vectorstore.search(filter={"id": node_ids})
data = self.vectorstore.search(filter={"id": node_ids})
else:
data = self._vectorstore.search(filter={})
data = self.vectorstore.search(filter={})

nodes = []
for metadata in data["metadata"]:
Expand Down Expand Up @@ -197,18 +431,22 @@ def delete_nodes(
**delete_kwargs: Any,
) -> None:
if filters:
self._vectorstore.delete(
self.vectorstore.delete(
ids=[
x.node_id
for x in self.get_nodes(node_ids=node_ids, filters=filters)
]
)
else:
self._vectorstore.delete(ids=node_ids)
self.vectorstore.delete(ids=node_ids)

def clear(self) -> None:
"""Clear the vector store."""
self._vectorstore.delete(filter=lambda x: True)
if DEEPLAKE_V4:
for i in range(len(self.vectorstore.ds) - 1, -1, -1):
self.vectorstore.ds.delete(i)
else:
self.vectorstore.delete(filter=lambda x: True)

def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
"""Add the embeddings and their nodes into DeepLake.
Expand All @@ -235,14 +473,13 @@ def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
id_.append(node.node_id)
text.append(node.get_content(metadata_mode=MetadataMode.NONE))

kwargs = {
"embedding": embedding,
"metadata": metadata,
self._id_tensor_name: id_,
"text": text,
}
kwargs = {self._id_tensor_name: id_}

return self._vectorstore.add(
return self.vectorstore.add(
embedding_data=embedding,
metadata=metadata,
embedding_tensor="embedding",
text=text,
return_ids=True,
**kwargs,
)
Expand All @@ -255,7 +492,7 @@ def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
ref_doc_id (str): The doc_id of the document to delete.

"""
self._vectorstore.delete(filter={"metadata": {"doc_id": ref_doc_id}})
self.vectorstore.delete(filter={"metadata": {"doc_id": ref_doc_id}})

def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
"""Query index for top k most similar nodes.
Expand All @@ -273,11 +510,13 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul
query_embedding = cast(List[float], query.query_embedding)
exec_option = kwargs.get("exec_option")
deep_memory = kwargs.get("deep_memory")
data = self._vectorstore.search(
data = self.vectorstore.search(
embedding=query_embedding,
exec_option=exec_option,
k=query.similarity_top_k,
distance_metric="cosine_similarity",
filter=query.filters,
return_tensors=None,
deep_memory=deep_memory,
)

Expand Down
logan-markewich marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ exclude = ["**/BUILD"]
license = "MIT"
name = "llama-index-vector-stores-deeplake"
readme = "README.md"
version = "0.3.0"
version = "0.3.1"

[tool.poetry.dependencies]
python = ">=3.9,<4.0"
deeplake = ">=3.9.12,<4.0"
deeplake = ">=3.9.12"
logan-markewich marked this conversation as resolved.
Show resolved Hide resolved
llama-index-core = "^0.12.0"
pyjwt = "*"

Expand Down
Loading
Loading