From fae89d7493be6459aa8a9cc3fb6c798dce6a7ae7 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Wed, 17 Jan 2024 14:59:14 +0000 Subject: [PATCH 01/13] swap in grpclib for async grpc requests --- requirements-devel.txt | 1 + setup.cfg | 1 + weaviate/connect/base.py | 31 +++++----- weaviate/connect/v4.py | 10 ++-- weaviate/proto/v1/base_grpc.py | 3 + weaviate/proto/v1/batch_delete_grpc.py | 3 + weaviate/proto/v1/batch_grpc.py | 3 + weaviate/proto/v1/properties_grpc.py | 3 + weaviate/proto/v1/regen.sh | 4 +- weaviate/proto/v1/search_get_grpc.py | 3 + weaviate/proto/v1/weaviate_grpc.py | 83 ++++++++++++++++++++++++++ 11 files changed, 125 insertions(+), 20 deletions(-) create mode 100644 weaviate/proto/v1/base_grpc.py create mode 100644 weaviate/proto/v1/batch_delete_grpc.py create mode 100644 weaviate/proto/v1/batch_grpc.py create mode 100644 weaviate/proto/v1/properties_grpc.py create mode 100644 weaviate/proto/v1/search_get_grpc.py create mode 100644 weaviate/proto/v1/weaviate_grpc.py diff --git a/requirements-devel.txt b/requirements-devel.txt index 4785084ae..b59ee401c 100644 --- a/requirements-devel.txt +++ b/requirements-devel.txt @@ -5,6 +5,7 @@ authlib>=1.2.1,<2.0.0 grpcio>=1.57.0,<2.0.0 grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 +grpclib==0.4.7 pydantic>=2.5.0,<3.0.0 deprecated>=1.2.14,<2.0.0 diff --git a/setup.cfg b/setup.cfg index 50d8ac3fa..0a2de0858 100644 --- a/setup.cfg +++ b/setup.cfg @@ -46,6 +46,7 @@ install_requires = grpcio>=1.57.0,<2.0.0 grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 + grpclib==0.4.7 python_requires = >=3.8 diff --git a/weaviate/connect/base.py b/weaviate/connect/base.py index daed899e8..32e957a23 100644 --- a/weaviate/connect/base.py +++ b/weaviate/connect/base.py @@ -7,7 +7,7 @@ import grpc # type: ignore from grpc import Channel, ssl_channel_credentials -from grpc.aio import Channel as AsyncChannel # type: ignore +from grpclib.client import Channel as AsyncChannel from pydantic import BaseModel, field_validator, model_validator from weaviate.types import NUMBER @@ -132,21 +132,24 @@ def _grpc_channel(self, async_channel: bool) -> Union[Channel, AsyncChannel, Non return None if async_channel: - import_path = grpc.aio - else: - import_path = grpc - - if self.grpc.secure: - return import_path.secure_channel( - target=self._grpc_target, - credentials=ssl_channel_credentials(), - options=GRPC_OPTIONS, + return AsyncChannel( + host=self.grpc.host, + port=self.grpc.port, + ssl=self.grpc.secure, + # config=GRPC_OPTIONS, ) else: - return import_path.insecure_channel( - target=self._grpc_target, - options=GRPC_OPTIONS, - ) + if self.grpc.secure: + return grpc.secure_channel( + target=self._grpc_target, + credentials=ssl_channel_credentials(), + options=GRPC_OPTIONS, + ) + else: + return grpc.insecure_channel( + target=self._grpc_target, + options=GRPC_OPTIONS, + ) @property def _http_scheme(self) -> str: diff --git a/weaviate/connect/v4.py b/weaviate/connect/v4.py index 22dcc69b3..a78291cfa 100644 --- a/weaviate/connect/v4.py +++ b/weaviate/connect/v4.py @@ -58,7 +58,7 @@ ) from weaviate.warnings import _Warnings -from weaviate.proto.v1 import weaviate_pb2_grpc +from weaviate.proto.v1 import weaviate_pb2_grpc, weaviate_grpc Session = Union[Client, OAuth2Client] AsyncSession = Union[AsyncClient, AsyncOAuth2Client] @@ -90,7 +90,7 @@ def __init__( self._connection_params = connection_params self._grpc_available = False self._grpc_stub: Optional[weaviate_pb2_grpc.WeaviateStub] = None - self._grpc_stub_async: Optional[weaviate_pb2_grpc.WeaviateStub] = None + self._grpc_stub_async: Optional[weaviate_grpc.WeaviateStub] = None self.timeout_config = timeout_config self.__connection_config = connection_config self.__trust_env = trust_env @@ -348,7 +348,7 @@ async def aopen(self) -> None: if self._grpc_stub_async is None: self._grpc_channel_async = self._connection_params._grpc_channel(async_channel=True) assert self._grpc_channel_async is not None - self._grpc_stub_async = weaviate_pb2_grpc.WeaviateStub(self._grpc_channel_async) + self._grpc_stub_async = weaviate_grpc.WeaviateStub(self._grpc_channel_async) async def aclose(self) -> None: if self._aclient is not None: @@ -356,7 +356,7 @@ async def aclose(self) -> None: self._aclient = None if self._grpc_stub_async is not None: assert self._grpc_channel_async is not None - await self._grpc_channel_async.close() + self._grpc_channel_async.close() self._grpc_stub_async = None def close(self) -> None: @@ -625,7 +625,7 @@ def grpc_stub(self) -> Optional[weaviate_pb2_grpc.WeaviateStub]: return self._grpc_stub @property - def agrpc_stub(self) -> Optional[weaviate_pb2_grpc.WeaviateStub]: + def agrpc_stub(self) -> Optional[weaviate_grpc.WeaviateStub]: if not self._grpc_available: raise WeaviateGRPCUnavailableError( "Did you forget to call client.connect() before using the client?" diff --git a/weaviate/proto/v1/base_grpc.py b/weaviate/proto/v1/base_grpc.py new file mode 100644 index 000000000..256c265dc --- /dev/null +++ b/weaviate/proto/v1/base_grpc.py @@ -0,0 +1,3 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: v1/base.proto +# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/batch_delete_grpc.py b/weaviate/proto/v1/batch_delete_grpc.py new file mode 100644 index 000000000..eafd2b5c1 --- /dev/null +++ b/weaviate/proto/v1/batch_delete_grpc.py @@ -0,0 +1,3 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: v1/batch_delete.proto +# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/batch_grpc.py b/weaviate/proto/v1/batch_grpc.py new file mode 100644 index 000000000..8d740096e --- /dev/null +++ b/weaviate/proto/v1/batch_grpc.py @@ -0,0 +1,3 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: v1/batch.proto +# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/properties_grpc.py b/weaviate/proto/v1/properties_grpc.py new file mode 100644 index 000000000..c2f9ab587 --- /dev/null +++ b/weaviate/proto/v1/properties_grpc.py @@ -0,0 +1,3 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: v1/properties.proto +# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/regen.sh b/weaviate/proto/v1/regen.sh index da2e4f7ec..c6a3759ba 100755 --- a/weaviate/proto/v1/regen.sh +++ b/weaviate/proto/v1/regen.sh @@ -4,8 +4,10 @@ echo "this script assumes that you have checked out weaviate next to the client" cd "${0%/*}/.." -python3 -m grpc_tools.protoc -I ../../../weaviate/grpc/proto --python_out=./ --pyi_out=./ --grpc_python_out=./ ../../../weaviate/grpc/proto/v1/*.proto +python3 -m grpc_tools.protoc -I ../../../weaviate/grpc/proto --python_out=./ --pyi_out=./ --grpc_python_out=./ --grpclib_python_out=. ../../../weaviate/grpc/proto/v1/*.proto +sed -i '' 's/ v1./ weaviate.proto.v1./g' v1/weaviate_grpc.py +sed -i '' 's/\[v1./\[weaviate.proto.v1./g' v1/weaviate_grpc.py sed -i '' 's/from v1/from weaviate.proto.v1/g' v1/*.py sed -i '' 's/from v1/from weaviate.proto.v1/g' v1/*.pyi diff --git a/weaviate/proto/v1/search_get_grpc.py b/weaviate/proto/v1/search_get_grpc.py new file mode 100644 index 000000000..f67cfeb5e --- /dev/null +++ b/weaviate/proto/v1/search_get_grpc.py @@ -0,0 +1,3 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: v1/search_get.proto +# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/weaviate_grpc.py b/weaviate/proto/v1/weaviate_grpc.py new file mode 100644 index 000000000..a09745a1c --- /dev/null +++ b/weaviate/proto/v1/weaviate_grpc.py @@ -0,0 +1,83 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: weaviate.proto.v1.weaviate.proto +# plugin: grpclib.plugin.main +import abc +import typing + +import grpclib.const +import grpclib.client + +if typing.TYPE_CHECKING: + import grpclib.server + +import weaviate.proto.v1.batch_pb2 +import weaviate.proto.v1.batch_delete_pb2 +import weaviate.proto.v1.search_get_pb2 +import weaviate.proto.v1.weaviate_pb2 + + +class WeaviateBase(abc.ABC): + @abc.abstractmethod + async def Search( + self, + stream: "grpclib.server.Stream[weaviate.proto.v1.search_get_pb2.SearchRequest, weaviate.proto.v1.search_get_pb2.SearchReply]", + ) -> None: + pass + + @abc.abstractmethod + async def BatchObjects( + self, + stream: "grpclib.server.Stream[weaviate.proto.v1.batch_pb2.BatchObjectsRequest, weaviate.proto.v1.batch_pb2.BatchObjectsReply]", + ) -> None: + pass + + @abc.abstractmethod + async def BatchDelete( + self, + stream: "grpclib.server.Stream[weaviate.proto.v1.batch_delete_pb2.BatchDeleteRequest, weaviate.proto.v1.batch_delete_pb2.BatchDeleteReply]", + ) -> None: + pass + + def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: + return { + "/weaviate.v1.Weaviate/Search": grpclib.const.Handler( + self.Search, + grpclib.const.Cardinality.UNARY_UNARY, + weaviate.proto.v1.search_get_pb2.SearchRequest, + weaviate.proto.v1.search_get_pb2.SearchReply, + ), + "/weaviate.v1.Weaviate/BatchObjects": grpclib.const.Handler( + self.BatchObjects, + grpclib.const.Cardinality.UNARY_UNARY, + weaviate.proto.v1.batch_pb2.BatchObjectsRequest, + weaviate.proto.v1.batch_pb2.BatchObjectsReply, + ), + "/weaviate.v1.Weaviate/BatchDelete": grpclib.const.Handler( + self.BatchDelete, + grpclib.const.Cardinality.UNARY_UNARY, + weaviate.proto.v1.batch_delete_pb2.BatchDeleteRequest, + weaviate.proto.v1.batch_delete_pb2.BatchDeleteReply, + ), + } + + +class WeaviateStub: + def __init__(self, channel: grpclib.client.Channel) -> None: + self.Search = grpclib.client.UnaryUnaryMethod( + channel, + "/weaviate.v1.Weaviate/Search", + weaviate.proto.v1.search_get_pb2.SearchRequest, + weaviate.proto.v1.search_get_pb2.SearchReply, + ) + self.BatchObjects = grpclib.client.UnaryUnaryMethod( + channel, + "/weaviate.v1.Weaviate/BatchObjects", + weaviate.proto.v1.batch_pb2.BatchObjectsRequest, + weaviate.proto.v1.batch_pb2.BatchObjectsReply, + ) + self.BatchDelete = grpclib.client.UnaryUnaryMethod( + channel, + "/weaviate.v1.Weaviate/BatchDelete", + weaviate.proto.v1.batch_delete_pb2.BatchDeleteRequest, + weaviate.proto.v1.batch_delete_pb2.BatchDeleteReply, + ) From 8bc470a3715b0f44efbc6ab30ce52fe11636e651 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Wed, 17 Jan 2024 17:48:24 +0000 Subject: [PATCH 02/13] fix batching when in env with already running loop --- requirements-devel.txt | 1 + setup.cfg | 1 + weaviate/collections/batch/batch_wrapper.py | 30 +++++++-------------- weaviate/collections/batch/client.py | 2 +- weaviate/collections/batch/collection.py | 2 +- 5 files changed, 13 insertions(+), 23 deletions(-) diff --git a/requirements-devel.txt b/requirements-devel.txt index b59ee401c..e57cfff80 100644 --- a/requirements-devel.txt +++ b/requirements-devel.txt @@ -6,6 +6,7 @@ grpcio>=1.57.0,<2.0.0 grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 grpclib==0.4.7 +nest_asyncio>=1.5.9,<2.0.0 pydantic>=2.5.0,<3.0.0 deprecated>=1.2.14,<2.0.0 diff --git a/setup.cfg b/setup.cfg index 0a2de0858..f04895629 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,6 +47,7 @@ install_requires = grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 grpclib==0.4.7 + nest_asyncio>=1.5.9,<2.0.0 python_requires = >=3.8 diff --git a/weaviate/collections/batch/batch_wrapper.py b/weaviate/collections/batch/batch_wrapper.py index 1026c5bb1..431cfbca9 100644 --- a/weaviate/collections/batch/batch_wrapper.py +++ b/weaviate/collections/batch/batch_wrapper.py @@ -4,6 +4,7 @@ from copy import copy from typing import List, Optional, Any, cast +import nest_asyncio # type: ignore from requests.exceptions import ConnectionError as RequestsConnectionError from weaviate.collections.batch.base import _BatchBase, _BatchDataWrapper @@ -26,26 +27,19 @@ def __init__( self._concurrent_requests: int = 2 self._batch_data = _BatchDataWrapper() - self.__shut_background_thread_down: Optional[threading.Event] = None def __start_event_loop_thread(self, loop: asyncio.AbstractEventLoop) -> None: - while ( - self.__shut_background_thread_down is not None - and not self.__shut_background_thread_down.is_set() - ): - if loop.is_running(): - continue - else: - loop.run_forever() - - def _open_async_connection(self) -> asyncio.AbstractEventLoop: + loop.set_debug(True) # in case of errors, shows async errors in the thread to users + loop.run_forever() + + def _start_event_loop(self) -> asyncio.AbstractEventLoop: try: self._current_loop = asyncio.get_running_loop() + nest_asyncio.apply(self._current_loop) except RuntimeError: self._current_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._current_loop) - self.__shut_background_thread_down = threading.Event() event_loop = threading.Thread( target=self.__start_event_loop_thread, daemon=True, @@ -57,24 +51,18 @@ def _open_async_connection(self) -> asyncio.AbstractEventLoop: while not self._current_loop.is_running(): time.sleep(0.01) - future = asyncio.run_coroutine_threadsafe(self._connection.aopen(), self._current_loop) - future.result() # Wait for self._connection.aopen() to finish + asyncio.run_coroutine_threadsafe(self._connection.aopen(), self._current_loop) return self._current_loop # enter is in inherited classes def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - assert ( - self._current_batch is not None - and self._current_loop is not None - and self.__shut_background_thread_down is not None - ) + assert self._current_batch is not None and self._current_loop is not None self._current_batch._shutdown() future = asyncio.run_coroutine_threadsafe(self._connection.aclose(), self._current_loop) future.result() # Wait for self._connection.aclose() to finish - self.__shut_background_thread_down.set() - self._current_loop.stop() + self._current_loop.call_soon_threadsafe(self._current_loop.stop) self._current_loop = None self._current_batch = None diff --git a/weaviate/collections/batch/client.py b/weaviate/collections/batch/client.py index 1b56ceeaa..8e389f8ee 100644 --- a/weaviate/collections/batch/client.py +++ b/weaviate/collections/batch/client.py @@ -92,7 +92,7 @@ def add_reference( class _BatchClientWrapper(_BatchWrapper): def __enter__(self) -> _BatchClient: - loop = self._open_async_connection() + loop = self._start_event_loop() self._current_batch = _BatchClient( connection=self._connection, diff --git a/weaviate/collections/batch/collection.py b/weaviate/collections/batch/collection.py index 75053c69f..5cb07729e 100644 --- a/weaviate/collections/batch/collection.py +++ b/weaviate/collections/batch/collection.py @@ -112,7 +112,7 @@ def __init__( self.__tenant = tenant def __enter__(self) -> _BatchCollection[Properties]: - loop = self._open_async_connection() + loop = self._start_event_loop() self._current_batch = _BatchCollection[Properties]( connection=self._connection, From 1ece2d8618b81edda203453c0e9ed84f8b82839f Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 10:19:41 +0000 Subject: [PATCH 03/13] always create new event loop for batching logic --- requirements-devel.txt | 1 - setup.cfg | 1 - weaviate/collections/batch/batch_wrapper.py | 23 ++++++++++++--------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/requirements-devel.txt b/requirements-devel.txt index e57cfff80..b59ee401c 100644 --- a/requirements-devel.txt +++ b/requirements-devel.txt @@ -6,7 +6,6 @@ grpcio>=1.57.0,<2.0.0 grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 grpclib==0.4.7 -nest_asyncio>=1.5.9,<2.0.0 pydantic>=2.5.0,<3.0.0 deprecated>=1.2.14,<2.0.0 diff --git a/setup.cfg b/setup.cfg index f04895629..0a2de0858 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,7 +47,6 @@ install_requires = grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 grpclib==0.4.7 - nest_asyncio>=1.5.9,<2.0.0 python_requires = >=3.8 diff --git a/weaviate/collections/batch/batch_wrapper.py b/weaviate/collections/batch/batch_wrapper.py index 431cfbca9..1b6487600 100644 --- a/weaviate/collections/batch/batch_wrapper.py +++ b/weaviate/collections/batch/batch_wrapper.py @@ -4,7 +4,6 @@ from copy import copy from typing import List, Optional, Any, cast -import nest_asyncio # type: ignore from requests.exceptions import ConnectionError as RequestsConnectionError from weaviate.collections.batch.base import _BatchBase, _BatchDataWrapper @@ -30,15 +29,16 @@ def __init__( def __start_event_loop_thread(self, loop: asyncio.AbstractEventLoop) -> None: loop.set_debug(True) # in case of errors, shows async errors in the thread to users - loop.run_forever() + asyncio.set_event_loop(loop) + try: + loop.run_forever() + finally: + # This is entered when loop.stop is scheduled from the main thread + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() def _start_event_loop(self) -> asyncio.AbstractEventLoop: - try: - self._current_loop = asyncio.get_running_loop() - nest_asyncio.apply(self._current_loop) - except RuntimeError: - self._current_loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._current_loop) + self._current_loop = asyncio.new_event_loop() event_loop = threading.Thread( target=self.__start_event_loop_thread, @@ -51,7 +51,8 @@ def _start_event_loop(self) -> asyncio.AbstractEventLoop: while not self._current_loop.is_running(): time.sleep(0.01) - asyncio.run_coroutine_threadsafe(self._connection.aopen(), self._current_loop) + future = asyncio.run_coroutine_threadsafe(self._connection.aopen(), self._current_loop) + future.result() # Wait for self._connection.aopen() to finish return self._current_loop @@ -62,7 +63,9 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: future = asyncio.run_coroutine_threadsafe(self._connection.aclose(), self._current_loop) future.result() # Wait for self._connection.aclose() to finish - self._current_loop.call_soon_threadsafe(self._current_loop.stop) + self._current_loop.call_soon_threadsafe( + self._current_loop.stop + ) # stop the event loop triggering pulldown self._current_loop = None self._current_batch = None From eba88fb9c981be562fed73eb4e03538ef427e982 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 10:21:37 +0000 Subject: [PATCH 04/13] improve comment --- weaviate/collections/batch/batch_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/batch/batch_wrapper.py b/weaviate/collections/batch/batch_wrapper.py index 1b6487600..1e6db3e95 100644 --- a/weaviate/collections/batch/batch_wrapper.py +++ b/weaviate/collections/batch/batch_wrapper.py @@ -65,7 +65,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self._current_loop.call_soon_threadsafe( self._current_loop.stop - ) # stop the event loop triggering pulldown + ) # Stop the event loop in the background thread self._current_loop = None self._current_batch = None From 79771156fe5edbd8f689b7e8bf443077dc83369b Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 10:25:27 +0000 Subject: [PATCH 05/13] remove commented out code --- weaviate/connect/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/weaviate/connect/base.py b/weaviate/connect/base.py index 32e957a23..e7ca10551 100644 --- a/weaviate/connect/base.py +++ b/weaviate/connect/base.py @@ -136,7 +136,6 @@ def _grpc_channel(self, async_channel: bool) -> Union[Channel, AsyncChannel, Non host=self.grpc.host, port=self.grpc.port, ssl=self.grpc.secure, - # config=GRPC_OPTIONS, ) else: if self.grpc.secure: From da0c9d14053da05d968e9134ae70663f3e836747 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 10:40:14 +0000 Subject: [PATCH 06/13] move all event loop logic into base.py --- weaviate/collections/batch/base.py | 58 +++++++++++++++++---- weaviate/collections/batch/batch_wrapper.py | 41 +-------------- weaviate/collections/batch/client.py | 3 -- weaviate/collections/batch/collection.py | 6 --- 4 files changed, 48 insertions(+), 60 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 39c7a6a24..28d0206d2 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -148,7 +148,6 @@ def __init__( connection: ConnectionV4, consistency_level: Optional[ConsistencyLevel], results: _BatchDataWrapper, - event_loop: asyncio.AbstractEventLoop, fixed_batch_size: Optional[int] = None, # dynamic by default fixed_concurrent_requests: Optional[int] = None, # dynamic by default objects_: Optional[ObjectsBatchRequest] = None, @@ -186,12 +185,55 @@ def __init__( self.__last_scale_up: float = 0 self.__max_observed_rate: int = 0 - self.__loop = event_loop - + self.__loop: Optional[asyncio.AbstractEventLoop] = self.__start_new_event_loop() self.__start_bg_thread() + def __run_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: + loop.set_debug(True) # in case of errors, shows async errors in the thread to users + asyncio.set_event_loop(loop) + try: + loop.run_forever() + finally: + # This is entered when loop.stop is scheduled from the main thread + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + def __start_new_event_loop(self) -> asyncio.AbstractEventLoop: + self.__loop = asyncio.new_event_loop() + + event_loop = threading.Thread( + target=self.__run_event_loop, + daemon=True, + args=(self.__loop,), + name="eventLoop", + ) + event_loop.start() + + while not self.__loop.is_running(): + time.sleep(0.01) + + future = asyncio.run_coroutine_threadsafe(self.__connection.aopen(), self.__loop) + future.result() # Wait for self._connection.aopen() to finish + + return self.__loop + + def _shutdown(self) -> None: + """Shutdown the current batch and wait for all requests to be finished.""" + self.flush() + + # we are done, shut bg threads down and end the event loop + self.__shut_background_thread_down.set() + + assert self.__loop is not None + future = asyncio.run_coroutine_threadsafe(self.__connection.aclose(), self.__loop) + future.result() # Wait for self._connection.aclose() to finish + + self.__loop.call_soon_threadsafe( + self.__loop.stop + ) # Stop the event loop in the background thread + def __start_bg_thread(self) -> None: - """Create a background process that periodically checks how congested the batch queue is.""" + """Create a background thread that periodically checks how congested the batch queue is.""" self.__shut_background_thread_down = threading.Event() def periodic_check() -> None: @@ -281,6 +323,7 @@ def periodic_check() -> None: self.__active_requests += 1 self.__active_requests_lock.release() + assert self.__loop is not None # do not block the thread - the results are written to a central (locked) list and we want to have multiple concurrent batch-requests asyncio.run_coroutine_threadsafe( self.__send_batch_async( @@ -363,13 +406,6 @@ def flush(self) -> None: ): time.sleep(0.01) - def _shutdown(self) -> None: - """Shutdown the current batch and wait for all requests to be finished.""" - self.flush() - - # we are done, shut bg threads down and end the event loop - self.__shut_background_thread_down.set() - def _add_object( self, collection: str, diff --git a/weaviate/collections/batch/batch_wrapper.py b/weaviate/collections/batch/batch_wrapper.py index 1e6db3e95..ec0dd0321 100644 --- a/weaviate/collections/batch/batch_wrapper.py +++ b/weaviate/collections/batch/batch_wrapper.py @@ -1,5 +1,3 @@ -import asyncio -import threading import time from copy import copy from typing import List, Optional, Any, cast @@ -20,53 +18,16 @@ def __init__( self._connection = connection self._consistency_level = consistency_level self._current_batch: Optional[_BatchBase] = None - self._current_loop: Optional[asyncio.AbstractEventLoop] = None # config options self._batch_size: Optional[int] = None self._concurrent_requests: int = 2 self._batch_data = _BatchDataWrapper() - def __start_event_loop_thread(self, loop: asyncio.AbstractEventLoop) -> None: - loop.set_debug(True) # in case of errors, shows async errors in the thread to users - asyncio.set_event_loop(loop) - try: - loop.run_forever() - finally: - # This is entered when loop.stop is scheduled from the main thread - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() - - def _start_event_loop(self) -> asyncio.AbstractEventLoop: - self._current_loop = asyncio.new_event_loop() - - event_loop = threading.Thread( - target=self.__start_event_loop_thread, - daemon=True, - args=(self._current_loop,), - name="eventLoop", - ) - event_loop.start() - - while not self._current_loop.is_running(): - time.sleep(0.01) - - future = asyncio.run_coroutine_threadsafe(self._connection.aopen(), self._current_loop) - future.result() # Wait for self._connection.aopen() to finish - - return self._current_loop - # enter is in inherited classes def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - assert self._current_batch is not None and self._current_loop is not None + assert self._current_batch is not None self._current_batch._shutdown() - future = asyncio.run_coroutine_threadsafe(self._connection.aclose(), self._current_loop) - future.result() # Wait for self._connection.aclose() to finish - - self._current_loop.call_soon_threadsafe( - self._current_loop.stop - ) # Stop the event loop in the background thread - self._current_loop = None self._current_batch = None def wait_for_vector_indexing( diff --git a/weaviate/collections/batch/client.py b/weaviate/collections/batch/client.py index 8e389f8ee..d142216f5 100644 --- a/weaviate/collections/batch/client.py +++ b/weaviate/collections/batch/client.py @@ -92,15 +92,12 @@ def add_reference( class _BatchClientWrapper(_BatchWrapper): def __enter__(self) -> _BatchClient: - loop = self._start_event_loop() - self._current_batch = _BatchClient( connection=self._connection, consistency_level=self._consistency_level, results=self._batch_data, fixed_batch_size=self._batch_size, fixed_concurrent_requests=self._concurrent_requests, - event_loop=loop, ) return self._current_batch diff --git a/weaviate/collections/batch/collection.py b/weaviate/collections/batch/collection.py index 5cb07729e..4398deecf 100644 --- a/weaviate/collections/batch/collection.py +++ b/weaviate/collections/batch/collection.py @@ -1,4 +1,3 @@ -import asyncio from typing import Generic, List, Optional, Sequence, Union from weaviate.collections.batch.base import _BatchBase, _BatchDataWrapper @@ -16,7 +15,6 @@ def __init__( connection: ConnectionV4, consistency_level: Optional[ConsistencyLevel], results: _BatchDataWrapper, - event_loop: asyncio.AbstractEventLoop, fixed_batch_size: Optional[int], fixed_concurrent_requests: Optional[int], name: str, @@ -26,7 +24,6 @@ def __init__( connection=connection, consistency_level=consistency_level, results=results, - event_loop=event_loop, fixed_batch_size=fixed_batch_size, fixed_concurrent_requests=fixed_concurrent_requests, ) @@ -112,8 +109,6 @@ def __init__( self.__tenant = tenant def __enter__(self) -> _BatchCollection[Properties]: - loop = self._start_event_loop() - self._current_batch = _BatchCollection[Properties]( connection=self._connection, consistency_level=self._consistency_level, @@ -122,7 +117,6 @@ def __enter__(self) -> _BatchCollection[Properties]: fixed_concurrent_requests=self._concurrent_requests, name=self.__name, tenant=self.__tenant, - event_loop=loop, ) return self._current_batch From 87d749acdf07855126171c07f65ea3ca4617a9ef Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 10:55:48 +0000 Subject: [PATCH 07/13] move loop into bg thread --- weaviate/collections/batch/base.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 28d0206d2..57649fdb0 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -185,7 +185,6 @@ def __init__( self.__last_scale_up: float = 0 self.__max_observed_rate: int = 0 - self.__loop: Optional[asyncio.AbstractEventLoop] = self.__start_new_event_loop() self.__start_bg_thread() def __run_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: @@ -199,23 +198,23 @@ def __run_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: loop.close() def __start_new_event_loop(self) -> asyncio.AbstractEventLoop: - self.__loop = asyncio.new_event_loop() + loop = asyncio.new_event_loop() event_loop = threading.Thread( target=self.__run_event_loop, daemon=True, - args=(self.__loop,), + args=(loop,), name="eventLoop", ) event_loop.start() - while not self.__loop.is_running(): + while not loop.is_running(): time.sleep(0.01) - future = asyncio.run_coroutine_threadsafe(self.__connection.aopen(), self.__loop) + future = asyncio.run_coroutine_threadsafe(self.__connection.aopen(), loop) future.result() # Wait for self._connection.aopen() to finish - return self.__loop + return loop def _shutdown(self) -> None: """Shutdown the current batch and wait for all requests to be finished.""" @@ -224,19 +223,13 @@ def _shutdown(self) -> None: # we are done, shut bg threads down and end the event loop self.__shut_background_thread_down.set() - assert self.__loop is not None - future = asyncio.run_coroutine_threadsafe(self.__connection.aclose(), self.__loop) - future.result() # Wait for self._connection.aclose() to finish - - self.__loop.call_soon_threadsafe( - self.__loop.stop - ) # Stop the event loop in the background thread - def __start_bg_thread(self) -> None: """Create a background thread that periodically checks how congested the batch queue is.""" self.__shut_background_thread_down = threading.Event() def periodic_check() -> None: + loop = self.__start_new_event_loop() + while ( self.__shut_background_thread_down is not None and not self.__shut_background_thread_down.is_set() @@ -323,18 +316,21 @@ def periodic_check() -> None: self.__active_requests += 1 self.__active_requests_lock.release() - assert self.__loop is not None # do not block the thread - the results are written to a central (locked) list and we want to have multiple concurrent batch-requests asyncio.run_coroutine_threadsafe( self.__send_batch_async( self.__batch_objects.pop_items(self.__recommended_num_objects), self.__batch_references.pop_items(self.__recommended_num_refs), ), - self.__loop, + loop, ) time.sleep(refresh_time) + future = asyncio.run_coroutine_threadsafe(self.__connection.aclose(), loop) + future.result() # Wait for self._connection.aclose() to finish + loop.call_soon_threadsafe(loop.stop) + demon = threading.Thread( target=periodic_check, daemon=True, From 5ea49f449b9c16a315b7c3113d1d796b4e057094 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 10:56:46 +0000 Subject: [PATCH 08/13] open connection in same scope as closing connection --- weaviate/collections/batch/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 57649fdb0..9e55d05d2 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -211,9 +211,6 @@ def __start_new_event_loop(self) -> asyncio.AbstractEventLoop: while not loop.is_running(): time.sleep(0.01) - future = asyncio.run_coroutine_threadsafe(self.__connection.aopen(), loop) - future.result() # Wait for self._connection.aopen() to finish - return loop def _shutdown(self) -> None: @@ -230,6 +227,9 @@ def __start_bg_thread(self) -> None: def periodic_check() -> None: loop = self.__start_new_event_loop() + future = asyncio.run_coroutine_threadsafe(self.__connection.aopen(), loop) + future.result() # Wait for self._connection.aopen() to finish + while ( self.__shut_background_thread_down is not None and not self.__shut_background_thread_down.is_set() From 1af8c14f30646b3801b819effb6f8422880a2269 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 11:19:55 +0000 Subject: [PATCH 09/13] remove grpclib --- requirements-devel.txt | 1 - setup.cfg | 1 - weaviate/connect/base.py | 25 ++++++-- weaviate/connect/v4.py | 10 ++-- weaviate/proto/v1/base_grpc.py | 3 - weaviate/proto/v1/batch_delete_grpc.py | 3 - weaviate/proto/v1/batch_grpc.py | 3 - weaviate/proto/v1/properties_grpc.py | 3 - weaviate/proto/v1/regen.sh | 4 +- weaviate/proto/v1/search_get_grpc.py | 3 - weaviate/proto/v1/weaviate_grpc.py | 83 -------------------------- 11 files changed, 25 insertions(+), 114 deletions(-) delete mode 100644 weaviate/proto/v1/base_grpc.py delete mode 100644 weaviate/proto/v1/batch_delete_grpc.py delete mode 100644 weaviate/proto/v1/batch_grpc.py delete mode 100644 weaviate/proto/v1/properties_grpc.py delete mode 100644 weaviate/proto/v1/search_get_grpc.py delete mode 100644 weaviate/proto/v1/weaviate_grpc.py diff --git a/requirements-devel.txt b/requirements-devel.txt index b59ee401c..4785084ae 100644 --- a/requirements-devel.txt +++ b/requirements-devel.txt @@ -5,7 +5,6 @@ authlib>=1.2.1,<2.0.0 grpcio>=1.57.0,<2.0.0 grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 -grpclib==0.4.7 pydantic>=2.5.0,<3.0.0 deprecated>=1.2.14,<2.0.0 diff --git a/setup.cfg b/setup.cfg index 0a2de0858..50d8ac3fa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -46,7 +46,6 @@ install_requires = grpcio>=1.57.0,<2.0.0 grpcio-tools>=1.57.0,<2.0.0 grpcio-health-checking>=1.57.0,<2.0.0 - grpclib==0.4.7 python_requires = >=3.8 diff --git a/weaviate/connect/base.py b/weaviate/connect/base.py index e7ca10551..03ea9cf3f 100644 --- a/weaviate/connect/base.py +++ b/weaviate/connect/base.py @@ -7,7 +7,9 @@ import grpc # type: ignore from grpc import Channel, ssl_channel_credentials -from grpclib.client import Channel as AsyncChannel +from grpc.aio import Channel as AsyncChannel # type: ignore + +# from grpclib.client import Channel as AsyncChannel from pydantic import BaseModel, field_validator, model_validator from weaviate.types import NUMBER @@ -132,11 +134,22 @@ def _grpc_channel(self, async_channel: bool) -> Union[Channel, AsyncChannel, Non return None if async_channel: - return AsyncChannel( - host=self.grpc.host, - port=self.grpc.port, - ssl=self.grpc.secure, - ) + # return AsyncChannel( + # host=self.grpc.host, + # port=self.grpc.port, + # ssl=self.grpc.secure, + # ) + if self.grpc.secure: + return grpc.aio.secure_channel( + target=self._grpc_target, + credentials=ssl_channel_credentials(), + options=GRPC_OPTIONS, + ) + else: + return grpc.aio.insecure_channel( + target=self._grpc_target, + options=GRPC_OPTIONS, + ) else: if self.grpc.secure: return grpc.secure_channel( diff --git a/weaviate/connect/v4.py b/weaviate/connect/v4.py index a78291cfa..22dcc69b3 100644 --- a/weaviate/connect/v4.py +++ b/weaviate/connect/v4.py @@ -58,7 +58,7 @@ ) from weaviate.warnings import _Warnings -from weaviate.proto.v1 import weaviate_pb2_grpc, weaviate_grpc +from weaviate.proto.v1 import weaviate_pb2_grpc Session = Union[Client, OAuth2Client] AsyncSession = Union[AsyncClient, AsyncOAuth2Client] @@ -90,7 +90,7 @@ def __init__( self._connection_params = connection_params self._grpc_available = False self._grpc_stub: Optional[weaviate_pb2_grpc.WeaviateStub] = None - self._grpc_stub_async: Optional[weaviate_grpc.WeaviateStub] = None + self._grpc_stub_async: Optional[weaviate_pb2_grpc.WeaviateStub] = None self.timeout_config = timeout_config self.__connection_config = connection_config self.__trust_env = trust_env @@ -348,7 +348,7 @@ async def aopen(self) -> None: if self._grpc_stub_async is None: self._grpc_channel_async = self._connection_params._grpc_channel(async_channel=True) assert self._grpc_channel_async is not None - self._grpc_stub_async = weaviate_grpc.WeaviateStub(self._grpc_channel_async) + self._grpc_stub_async = weaviate_pb2_grpc.WeaviateStub(self._grpc_channel_async) async def aclose(self) -> None: if self._aclient is not None: @@ -356,7 +356,7 @@ async def aclose(self) -> None: self._aclient = None if self._grpc_stub_async is not None: assert self._grpc_channel_async is not None - self._grpc_channel_async.close() + await self._grpc_channel_async.close() self._grpc_stub_async = None def close(self) -> None: @@ -625,7 +625,7 @@ def grpc_stub(self) -> Optional[weaviate_pb2_grpc.WeaviateStub]: return self._grpc_stub @property - def agrpc_stub(self) -> Optional[weaviate_grpc.WeaviateStub]: + def agrpc_stub(self) -> Optional[weaviate_pb2_grpc.WeaviateStub]: if not self._grpc_available: raise WeaviateGRPCUnavailableError( "Did you forget to call client.connect() before using the client?" diff --git a/weaviate/proto/v1/base_grpc.py b/weaviate/proto/v1/base_grpc.py deleted file mode 100644 index 256c265dc..000000000 --- a/weaviate/proto/v1/base_grpc.py +++ /dev/null @@ -1,3 +0,0 @@ -# Generated by the Protocol Buffers compiler. DO NOT EDIT! -# source: v1/base.proto -# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/batch_delete_grpc.py b/weaviate/proto/v1/batch_delete_grpc.py deleted file mode 100644 index eafd2b5c1..000000000 --- a/weaviate/proto/v1/batch_delete_grpc.py +++ /dev/null @@ -1,3 +0,0 @@ -# Generated by the Protocol Buffers compiler. DO NOT EDIT! -# source: v1/batch_delete.proto -# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/batch_grpc.py b/weaviate/proto/v1/batch_grpc.py deleted file mode 100644 index 8d740096e..000000000 --- a/weaviate/proto/v1/batch_grpc.py +++ /dev/null @@ -1,3 +0,0 @@ -# Generated by the Protocol Buffers compiler. DO NOT EDIT! -# source: v1/batch.proto -# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/properties_grpc.py b/weaviate/proto/v1/properties_grpc.py deleted file mode 100644 index c2f9ab587..000000000 --- a/weaviate/proto/v1/properties_grpc.py +++ /dev/null @@ -1,3 +0,0 @@ -# Generated by the Protocol Buffers compiler. DO NOT EDIT! -# source: v1/properties.proto -# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/regen.sh b/weaviate/proto/v1/regen.sh index c6a3759ba..da2e4f7ec 100755 --- a/weaviate/proto/v1/regen.sh +++ b/weaviate/proto/v1/regen.sh @@ -4,10 +4,8 @@ echo "this script assumes that you have checked out weaviate next to the client" cd "${0%/*}/.." -python3 -m grpc_tools.protoc -I ../../../weaviate/grpc/proto --python_out=./ --pyi_out=./ --grpc_python_out=./ --grpclib_python_out=. ../../../weaviate/grpc/proto/v1/*.proto +python3 -m grpc_tools.protoc -I ../../../weaviate/grpc/proto --python_out=./ --pyi_out=./ --grpc_python_out=./ ../../../weaviate/grpc/proto/v1/*.proto -sed -i '' 's/ v1./ weaviate.proto.v1./g' v1/weaviate_grpc.py -sed -i '' 's/\[v1./\[weaviate.proto.v1./g' v1/weaviate_grpc.py sed -i '' 's/from v1/from weaviate.proto.v1/g' v1/*.py sed -i '' 's/from v1/from weaviate.proto.v1/g' v1/*.pyi diff --git a/weaviate/proto/v1/search_get_grpc.py b/weaviate/proto/v1/search_get_grpc.py deleted file mode 100644 index f67cfeb5e..000000000 --- a/weaviate/proto/v1/search_get_grpc.py +++ /dev/null @@ -1,3 +0,0 @@ -# Generated by the Protocol Buffers compiler. DO NOT EDIT! -# source: v1/search_get.proto -# plugin: grpclib.plugin.main diff --git a/weaviate/proto/v1/weaviate_grpc.py b/weaviate/proto/v1/weaviate_grpc.py deleted file mode 100644 index a09745a1c..000000000 --- a/weaviate/proto/v1/weaviate_grpc.py +++ /dev/null @@ -1,83 +0,0 @@ -# Generated by the Protocol Buffers compiler. DO NOT EDIT! -# source: weaviate.proto.v1.weaviate.proto -# plugin: grpclib.plugin.main -import abc -import typing - -import grpclib.const -import grpclib.client - -if typing.TYPE_CHECKING: - import grpclib.server - -import weaviate.proto.v1.batch_pb2 -import weaviate.proto.v1.batch_delete_pb2 -import weaviate.proto.v1.search_get_pb2 -import weaviate.proto.v1.weaviate_pb2 - - -class WeaviateBase(abc.ABC): - @abc.abstractmethod - async def Search( - self, - stream: "grpclib.server.Stream[weaviate.proto.v1.search_get_pb2.SearchRequest, weaviate.proto.v1.search_get_pb2.SearchReply]", - ) -> None: - pass - - @abc.abstractmethod - async def BatchObjects( - self, - stream: "grpclib.server.Stream[weaviate.proto.v1.batch_pb2.BatchObjectsRequest, weaviate.proto.v1.batch_pb2.BatchObjectsReply]", - ) -> None: - pass - - @abc.abstractmethod - async def BatchDelete( - self, - stream: "grpclib.server.Stream[weaviate.proto.v1.batch_delete_pb2.BatchDeleteRequest, weaviate.proto.v1.batch_delete_pb2.BatchDeleteReply]", - ) -> None: - pass - - def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: - return { - "/weaviate.v1.Weaviate/Search": grpclib.const.Handler( - self.Search, - grpclib.const.Cardinality.UNARY_UNARY, - weaviate.proto.v1.search_get_pb2.SearchRequest, - weaviate.proto.v1.search_get_pb2.SearchReply, - ), - "/weaviate.v1.Weaviate/BatchObjects": grpclib.const.Handler( - self.BatchObjects, - grpclib.const.Cardinality.UNARY_UNARY, - weaviate.proto.v1.batch_pb2.BatchObjectsRequest, - weaviate.proto.v1.batch_pb2.BatchObjectsReply, - ), - "/weaviate.v1.Weaviate/BatchDelete": grpclib.const.Handler( - self.BatchDelete, - grpclib.const.Cardinality.UNARY_UNARY, - weaviate.proto.v1.batch_delete_pb2.BatchDeleteRequest, - weaviate.proto.v1.batch_delete_pb2.BatchDeleteReply, - ), - } - - -class WeaviateStub: - def __init__(self, channel: grpclib.client.Channel) -> None: - self.Search = grpclib.client.UnaryUnaryMethod( - channel, - "/weaviate.v1.Weaviate/Search", - weaviate.proto.v1.search_get_pb2.SearchRequest, - weaviate.proto.v1.search_get_pb2.SearchReply, - ) - self.BatchObjects = grpclib.client.UnaryUnaryMethod( - channel, - "/weaviate.v1.Weaviate/BatchObjects", - weaviate.proto.v1.batch_pb2.BatchObjectsRequest, - weaviate.proto.v1.batch_pb2.BatchObjectsReply, - ) - self.BatchDelete = grpclib.client.UnaryUnaryMethod( - channel, - "/weaviate.v1.Weaviate/BatchDelete", - weaviate.proto.v1.batch_delete_pb2.BatchDeleteRequest, - weaviate.proto.v1.batch_delete_pb2.BatchDeleteReply, - ) From 5fbc01e65093db5e5592146ceffe81c2820a77fb Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 12:32:33 +0000 Subject: [PATCH 10/13] assert errors correctly --- integration/test_collection_batch.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/integration/test_collection_batch.py b/integration/test_collection_batch.py index f150c24f5..eb076c614 100644 --- a/integration/test_collection_batch.py +++ b/integration/test_collection_batch.py @@ -130,12 +130,13 @@ def test_add_object_batch_with_tenant(batch_collection: BatchCollection) -> None mt_collection.tenants.create([Tenant(name="tenant" + str(i)) for i in range(5)]) for i in range(5): - with mt_collection.with_tenant("tenant" + str(i % 5)).batch as batch: + col = mt_collection.with_tenant("tenant" + str(i % 5)) + with col.batch as batch: batch.add_object( properties={"name": "tenant" + str(i % 5)}, ) - assert len(mt_collection.batch.failed_objects()) == 0 - assert len(mt_collection.batch.failed_references()) == 0 + assert len(col.batch.failed_objects()) == 0 + assert len(col.batch.failed_references()) == 0 objs = mt_collection.with_tenant("tenant1").query.fetch_objects().objects assert len(objs) == 1 for obj in objs: From 760b322bd9ffa83c79baffdf2a0c8df52f201dd0 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 12:34:11 +0000 Subject: [PATCH 11/13] avoid races by waiting for bg_thread shutdown --- weaviate/collections/batch/base.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 9e55d05d2..937ff24b0 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -185,11 +185,10 @@ def __init__( self.__last_scale_up: float = 0 self.__max_observed_rate: int = 0 - self.__start_bg_thread() + self.__bg_thread = self.__start_bg_thread() def __run_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: - loop.set_debug(True) # in case of errors, shows async errors in the thread to users - asyncio.set_event_loop(loop) + loop.set_debug(True) # in case of errors, shows async errors in the terminal to users try: loop.run_forever() finally: @@ -219,14 +218,15 @@ def _shutdown(self) -> None: # we are done, shut bg threads down and end the event loop self.__shut_background_thread_down.set() + while self.__bg_thread.is_alive(): + time.sleep(0.01) - def __start_bg_thread(self) -> None: + def __start_bg_thread(self) -> threading.Thread: """Create a background thread that periodically checks how congested the batch queue is.""" self.__shut_background_thread_down = threading.Event() def periodic_check() -> None: loop = self.__start_new_event_loop() - future = asyncio.run_coroutine_threadsafe(self.__connection.aopen(), loop) future.result() # Wait for self._connection.aopen() to finish @@ -337,6 +337,7 @@ def periodic_check() -> None: name="BgBatchScheduler", ) demon.start() + return demon async def __send_batch_async( self, objs: List[_BatchObject], refs: List[_BatchReference] From 21e7b4b637b6245b62decdc1b79dac8a859d0ffd Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 12:36:07 +0000 Subject: [PATCH 12/13] add back old grpc lib code --- weaviate/connect/base.py | 41 ++++++++++++++-------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/weaviate/connect/base.py b/weaviate/connect/base.py index 03ea9cf3f..531ff6d49 100644 --- a/weaviate/connect/base.py +++ b/weaviate/connect/base.py @@ -134,34 +134,21 @@ def _grpc_channel(self, async_channel: bool) -> Union[Channel, AsyncChannel, Non return None if async_channel: - # return AsyncChannel( - # host=self.grpc.host, - # port=self.grpc.port, - # ssl=self.grpc.secure, - # ) - if self.grpc.secure: - return grpc.aio.secure_channel( - target=self._grpc_target, - credentials=ssl_channel_credentials(), - options=GRPC_OPTIONS, - ) - else: - return grpc.aio.insecure_channel( - target=self._grpc_target, - options=GRPC_OPTIONS, - ) + import_path = grpc.aio else: - if self.grpc.secure: - return grpc.secure_channel( - target=self._grpc_target, - credentials=ssl_channel_credentials(), - options=GRPC_OPTIONS, - ) - else: - return grpc.insecure_channel( - target=self._grpc_target, - options=GRPC_OPTIONS, - ) + import_path = grpc + + if self.grpc.secure: + return import_path.secure_channel( + target=self._grpc_target, + credentials=ssl_channel_credentials(), + options=GRPC_OPTIONS, + ) + else: + return import_path.insecure_channel( + target=self._grpc_target, + options=GRPC_OPTIONS, + ) @property def _http_scheme(self) -> str: From d2efd91afd1b4cbbe9934da86a59a8c04f4cfaaa Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 18 Jan 2024 12:45:19 +0000 Subject: [PATCH 13/13] remove commented out import --- weaviate/connect/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/weaviate/connect/base.py b/weaviate/connect/base.py index 531ff6d49..23182b241 100644 --- a/weaviate/connect/base.py +++ b/weaviate/connect/base.py @@ -9,7 +9,6 @@ from grpc import Channel, ssl_channel_credentials from grpc.aio import Channel as AsyncChannel # type: ignore -# from grpclib.client import Channel as AsyncChannel from pydantic import BaseModel, field_validator, model_validator from weaviate.types import NUMBER