From 74521fbd95c7758a02792334cb703bf12482344a Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Mon, 24 Jul 2023 11:59:07 +0200 Subject: [PATCH 1/2] feat: add push endpoint --- requirements.txt | 1 + vectordb/client/client.py | 8 ++++++++ vectordb/db/base.py | 8 ++++++++ vectordb/db/executors/hnsw_indexer.py | 19 +++++++++++++++++++ .../db/executors/inmemory_exact_indexer.py | 10 ++++++++++ vectordb/db/executors/typed_executor.py | 2 +- 6 files changed, 47 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 73dd97d..81a7185 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ jina>=3.19.0 docarray[hnswlib]>=0.34.0 +more-itertools diff --git a/vectordb/client/client.py b/vectordb/client/client.py index 7a0eff8..9f2ab89 100644 --- a/vectordb/client/client.py +++ b/vectordb/client/client.py @@ -54,6 +54,14 @@ def __init__(self, address, reverse_order=False): def index(self, *args, **kwargs): return self._client.index(*args, **kwargs) + @unify_input_output + @pass_kwargs_as_params + def push(self, *args, **kwargs): + return self._client.post(on='/push', *args, **kwarg) + + def build(self, *args, **kwargs): + return self._client.post(on='/build', *args, **kwarg) + @sort_matches_by_scores @unify_input_output @pass_kwargs_as_params diff --git a/vectordb/db/base.py b/vectordb/db/base.py index 07b07e3..37b7749 100644 --- a/vectordb/db/base.py +++ b/vectordb/db/base.py @@ -233,6 +233,14 @@ async def _deploy(): def index(self, docs: 'DocList[TSchema]', parameters: Optional[Dict] = None, **kwargs): return self._executor.index(docs, parameters) + @pass_kwargs_as_params + @unify_input_output + def push(self, docs: 'DocList[TSchema]', parameters: Optional[Dict] = None, **kwargs): + return self._executor.push(docs, parameters) + + def build(self, **kwargs): + return self._executor.push(**kwargs) + @pass_kwargs_as_params @unify_input_output def update(self, docs: 'DocList[TSchema]', parameters: Optional[Dict] = None, **kwargs): diff --git a/vectordb/db/executors/hnsw_indexer.py b/vectordb/db/executors/hnsw_indexer.py index d4298c5..97d9571 100644 --- a/vectordb/db/executors/hnsw_indexer.py +++ b/vectordb/db/executors/hnsw_indexer.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING import numpy as np +from more_itertools import chunked from vectordb.db.executors.typed_executor import TypedExecutor from jina.serve.executors.decorators import requests, write @@ -35,6 +36,7 @@ def __init__(self, 'M': M, 'allow_replace_deleted': allow_replace_deleted, 'num_threads': num_threads}) + self._pushed_docs = DocList[self._input_schema]() db_conf.work_dir = self.work_dir self._indexer = HnswDocumentIndex[self._input_schema](db_config=db_conf) @@ -44,6 +46,10 @@ def _index(self, docs, *args, **kwargs): def index(self, docs, *args, **kwargs): self.logger.debug(f'Index {len(docs)}') + if len(self._pushed_docs) > 0: + self.logger.debug(f'{len(self._pushed_docs)} were waiting to be indexed. Indexing them') + self._index(self._pushed_docs) + self._pushed_docs.clear() return self._index(docs) @write @@ -88,6 +94,19 @@ def delete(self, docs, *args, **kwargs): self.logger.debug(f'Delete') return self._delete(docs, *args, **kwargs) + @write + @requests(on='/push') + def push(self, docs, *args, **kwargs): + self.logger.debug(f'Push {len(docs)}') + self._pushed_docs.extend(docs) + + @write + @requests(on='/build') + def build(self, *args, **kwargs): + self.logger.debug(f'Building index with {len(self._pushed_docs)} pushed docs') + self._index(self._pushed_docs, *args, **kwargs) + self._pushed_docs.clear() + @write @requests(on='/delete') async def async_delete(self, docs, *args, **kwargs): diff --git a/vectordb/db/executors/inmemory_exact_indexer.py b/vectordb/db/executors/inmemory_exact_indexer.py index 2c45f99..302e43a 100644 --- a/vectordb/db/executors/inmemory_exact_indexer.py +++ b/vectordb/db/executors/inmemory_exact_indexer.py @@ -26,6 +26,16 @@ def index(self, docs, *args, **kwargs): self.logger.debug(f'Index {len(docs)}') return self._index(docs, *args, **kwargs) + @write + @requests(on='/push') + def push(self, docs, *args, **kwargs): + self.logger.debug(f'Push {len(docs)}') + return self._index(docs, *args, **kwargs) + + @requests(on='/build') + def build(self, *args, **kwargs): + self.logger.debug(f'Build call has no effect in InMemoryExactNNIndexer.') + def _search(self, docs, parameters, *args, **kwargs): from docarray import DocList res = DocList[self._output_schema]() diff --git a/vectordb/db/executors/typed_executor.py b/vectordb/db/executors/typed_executor.py index 2b224d1..55d3f56 100644 --- a/vectordb/db/executors/typed_executor.py +++ b/vectordb/db/executors/typed_executor.py @@ -14,7 +14,7 @@ InputSchema = TypeVar('InputSchema', bound='BaseDoc') OutputSchema = TypeVar('OutputSchema', bound='BaseDoc') -methods = ['/index', '/update', '/delete', '/search'] +methods = ['/index', '/update', '/delete', '/search', '/push', '/build'] class TypedExecutor(Executor, Generic[InputSchema, OutputSchema]): From 72e09b5e188129a198b54cd5429583ed17febcab Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Mon, 24 Jul 2023 12:49:10 +0200 Subject: [PATCH 2/2] fix: fix passing arguments to instance served and deployed --- vectordb/db/base.py | 51 +++++++++++++-------------- vectordb/db/executors/hnsw_indexer.py | 1 + 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/vectordb/db/base.py b/vectordb/db/base.py index 37b7749..8852baf 100644 --- a/vectordb/db/base.py +++ b/vectordb/db/base.py @@ -52,7 +52,10 @@ def __init__(self, *args, **kwargs): self._workspace = kwargs['work_dir'] if 'workspace' in kwargs: self._workspace = kwargs['workspace'] - self._uses_with = kwargs + self._uses_with = {} + self._uses_with.update(**kwargs) + if 'workspace' in self._uses_with: + self._uses_with.pop('workspace') kwargs['requests'] = REQUESTS_MAP kwargs['runtime_args'] = {'workspace': self._workspace} self._executor = self._executor_cls(*args, **kwargs) @@ -72,14 +75,6 @@ def _get_jina_object(cls, obj_name: Optional[str] = None, **kwargs): from jina import Deployment, Flow - is_instance = False - uses_with = uses_with or {} - if isinstance(cls, VectorDB): - is_instance = True - uses_with = uses_with.update(**cls._uses_with) - - if is_instance: - workspace = workspace or cls._workspace replicas = replicas or 1 shards = shards or 1 protocol = protocol or 'grpc' @@ -150,38 +145,42 @@ def _get_jina_object(cls, polling=polling, **kwargs) else: jina_object = Flow(port=port, protocol=protocol, env=['JINA_LOG_LEVEL=DEBUG'], **kwargs).add(name='indexer', - uses=uses, - uses_with=uses_with, - shards=shards, - replicas=replicas, - stateful=stateful, - peer_ports=peer_ports, - polling=polling, - workspace=workspace) + uses=uses, + uses_with=uses_with, + shards=shards, + replicas=replicas, + stateful=stateful, + peer_ports=peer_ports, + polling=polling, + workspace=workspace) return jina_object - @classmethod - def serve(cls, + def serve(self, *, port: Optional[Union[str, List[str]]] = 8081, protocol: Optional[Union[str, List[str]]] = None, **kwargs): protocol = protocol or 'grpc' protocol_list = [p.lower() for p in protocol] if isinstance(protocol, list) else [protocol.lower()] - ctxt_manager = cls._get_jina_object(to_deploy=False, port=port, protocol=protocol, **kwargs) + uses_with = kwargs.get('uses_with', {}) + uses_with.update(self._uses_with) + workspace = kwargs.get('workspace', self._workspace) + ctxt_manager = self._get_jina_object(to_deploy=False, port=port, protocol=protocol, workspace=workspace, + uses_with=uses_with, **kwargs) port = port[0] if isinstance(port, list) else port - return Service(ctxt_manager, address=f'{protocol_list[0]}://0.0.0.0:{port}', schema=cls._input_schema, - reverse_order=cls.reverse_score_order) + return Service(ctxt_manager, address=f'{protocol_list[0]}://0.0.0.0:{port}', schema=self._input_schema, + reverse_order=self.reverse_score_order) - @classmethod - def deploy(cls, + def deploy(self, **kwargs): from tempfile import mkdtemp import os import yaml from yaml.loader import SafeLoader - jina_obj = cls._get_jina_object(to_deploy=True, **kwargs) + uses_with = kwargs.get('uses_with', {}) + uses_with.update(self._uses_with) + jina_obj = self._get_jina_object(to_deploy=True, uses_with=uses_with, **kwargs) tmpdir = mkdtemp() jina_obj.save_config(os.path.join(tmpdir, 'flow.yml')) @@ -238,7 +237,7 @@ def index(self, docs: 'DocList[TSchema]', parameters: Optional[Dict] = None, **k def push(self, docs: 'DocList[TSchema]', parameters: Optional[Dict] = None, **kwargs): return self._executor.push(docs, parameters) - def build(self, **kwargs): + def build(self, **kwargs): return self._executor.push(**kwargs) @pass_kwargs_as_params diff --git a/vectordb/db/executors/hnsw_indexer.py b/vectordb/db/executors/hnsw_indexer.py index 97d9571..a0de908 100644 --- a/vectordb/db/executors/hnsw_indexer.py +++ b/vectordb/db/executors/hnsw_indexer.py @@ -25,6 +25,7 @@ def __init__(self, num_threads=1, *args, **kwargs): from docarray.index import HnswDocumentIndex + from docarray import DocList super().__init__(*args, **kwargs) workspace = self.workspace.replace('[', '_').replace(']', '_') self.work_dir = f'{workspace}' if self.handle_persistence else f'{workspace}/{"".join(random.choice(string.ascii_lowercase) for _ in range(5))}'