From 276635c9bb054685ce7a4663cc6a6484bd87960f Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 12:56:29 -0600 Subject: [PATCH 01/16] Add and integrate FuturesExecutorMixin --- CHANGELOG.md | 1 + .../base_event_listener_driver.py | 14 ++---- .../vector/base_vector_store_driver.py | 48 ++++++++----------- .../engines/rag/modules/base_rag_module.py | 9 ++-- griptape/engines/rag/stages/base_rag_stage.py | 11 ++--- .../engines/rag/stages/response_rag_stage.py | 5 +- .../engines/rag/stages/retrieval_rag_stage.py | 5 +- griptape/loaders/base_loader.py | 22 ++++----- griptape/mixins/__init__.py | 2 + griptape/mixins/futures_executor_mixin.py | 29 +++++++++++ griptape/structures/workflow.py | 12 ++--- griptape/tasks/actions_subtask.py | 5 +- griptape/tasks/base_task.py | 10 ++-- tests/mocks/mock_futures_executor.py | 4 ++ .../test_base_event_listener_driver.py | 2 - .../mixins/test_futures_executor_mixin.py | 10 ++++ 16 files changed, 104 insertions(+), 85 deletions(-) create mode 100644 griptape/mixins/futures_executor_mixin.py create mode 100644 tests/mocks/mock_futures_executor.py create mode 100644 tests/unit/mixins/test_futures_executor_mixin.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 967738f31..a7d93dcaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support for bitshift composition in `BaseTask` for adding parent/child tasks. - `JsonArtifact` for handling de/seralization of values. - `Chat.logger_level` for setting what the `Chat` utility sets the logger level to. +- `FuturesExecutorMixin` to DRY up and optimize concurrent code across multiple classes. ### Changed - **BREAKING**: Removed all uses of `EventPublisherMixin` in favor of `event_bus`. diff --git a/griptape/drivers/event_listener/base_event_listener_driver.py b/griptape/drivers/event_listener/base_event_listener_driver.py index 9f7cb79fb..0af57f0f3 100644 --- a/griptape/drivers/event_listener/base_event_listener_driver.py +++ b/griptape/drivers/event_listener/base_event_listener_driver.py @@ -2,11 +2,12 @@ import logging from abc import ABC, abstractmethod -from concurrent import futures -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING from attrs import Factory, define, field +from griptape.mixins import FuturesExecutorMixin + if TYPE_CHECKING: from griptape.events import BaseEvent @@ -14,11 +15,7 @@ @define -class BaseEventListenerDriver(ABC): - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - kw_only=True, - ) +class BaseEventListenerDriver(FuturesExecutorMixin, ABC): batched: bool = field(default=True, kw_only=True) batch_size: int = field(default=10, kw_only=True) @@ -29,8 +26,7 @@ def batch(self) -> list[dict]: return self._batch def publish_event(self, event: BaseEvent | dict, *, flush: bool = False) -> None: - with self.futures_executor_fn() as executor: - executor.submit(self._safe_try_publish_event, event, flush=flush) + self.futures_executor.submit(self._safe_try_publish_event, event, flush=flush) @abstractmethod def try_publish_event_payload(self, event_payload: dict) -> None: ... diff --git a/griptape/drivers/vector/base_vector_store_driver.py b/griptape/drivers/vector/base_vector_store_driver.py index ed1f2d589..7ebccdcad 100644 --- a/griptape/drivers/vector/base_vector_store_driver.py +++ b/griptape/drivers/vector/base_vector_store_driver.py @@ -2,22 +2,21 @@ import uuid from abc import ABC, abstractmethod -from concurrent import futures from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Optional -from attrs import Factory, define, field +from attrs import define, field from griptape import utils from griptape.artifacts import BaseArtifact, ListArtifact, TextArtifact -from griptape.mixins import SerializableMixin +from griptape.mixins import FuturesExecutorMixin, SerializableMixin if TYPE_CHECKING: from griptape.drivers import BaseEmbeddingDriver @define -class BaseVectorStoreDriver(SerializableMixin, ABC): +class BaseVectorStoreDriver(SerializableMixin, FuturesExecutorMixin, ABC): DEFAULT_QUERY_COUNT = 5 @dataclass @@ -36,10 +35,6 @@ def to_artifact(self) -> BaseArtifact: return BaseArtifact.from_json(self.meta["artifact"]) # pyright: ignore[reportOptionalSubscript] embedding_driver: BaseEmbeddingDriver = field(kw_only=True, metadata={"serializable": True}) - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - kw_only=True, - ) def upsert_text_artifacts( self, @@ -48,24 +43,23 @@ def upsert_text_artifacts( meta: Optional[dict] = None, **kwargs, ) -> None: - with self.futures_executor_fn() as executor: - if isinstance(artifacts, list): - utils.execute_futures_list( - [ - executor.submit(self.upsert_text_artifact, a, namespace=None, meta=meta, **kwargs) - for a in artifacts - ], - ) - else: - utils.execute_futures_dict( - { - namespace: executor.submit( - self.upsert_text_artifact, a, namespace=namespace, meta=meta, **kwargs - ) - for namespace, artifact_list in artifacts.items() - for a in artifact_list - }, - ) + if isinstance(artifacts, list): + utils.execute_futures_list( + [ + self.futures_executor.submit(self.upsert_text_artifact, a, namespace=None, meta=meta, **kwargs) + for a in artifacts + ], + ) + else: + utils.execute_futures_dict( + { + namespace: self.futures_executor.submit( + self.upsert_text_artifact, a, namespace=namespace, meta=meta, **kwargs + ) + for namespace, artifact_list in artifacts.items() + for a in artifact_list + }, + ) def upsert_text_artifact( self, diff --git a/griptape/engines/rag/modules/base_rag_module.py b/griptape/engines/rag/modules/base_rag_module.py index 01d6d1b1d..668b3aced 100644 --- a/griptape/engines/rag/modules/base_rag_module.py +++ b/griptape/engines/rag/modules/base_rag_module.py @@ -2,25 +2,22 @@ import uuid from abc import ABC -from concurrent import futures -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Optional from attrs import Factory, define, field from griptape.common import Message, PromptStack +from griptape.mixins import FuturesExecutorMixin if TYPE_CHECKING: from griptape.engines.rag import RagContext @define(kw_only=True) -class BaseRagModule(ABC): +class BaseRagModule(FuturesExecutorMixin, ABC): name: str = field( default=Factory(lambda self: f"{self.__class__.__name__}-{uuid.uuid4().hex}", takes_self=True), kw_only=True ) - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - ) def generate_prompt_stack(self, system_prompt: Optional[str], query: str) -> PromptStack: messages = [] diff --git a/griptape/engines/rag/stages/base_rag_stage.py b/griptape/engines/rag/stages/base_rag_stage.py index 4f5a9bcd1..6a28551b4 100644 --- a/griptape/engines/rag/stages/base_rag_stage.py +++ b/griptape/engines/rag/stages/base_rag_stage.py @@ -1,20 +1,15 @@ from abc import ABC, abstractmethod from collections.abc import Sequence -from concurrent import futures -from typing import Callable -from attrs import Factory, define, field +from attrs import define from griptape.engines.rag import RagContext from griptape.engines.rag.modules import BaseRagModule +from griptape.mixins import FuturesExecutorMixin @define(kw_only=True) -class BaseRagStage(ABC): - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - ) - +class BaseRagStage(FuturesExecutorMixin, ABC): @abstractmethod def run(self, context: RagContext) -> RagContext: ... diff --git a/griptape/engines/rag/stages/response_rag_stage.py b/griptape/engines/rag/stages/response_rag_stage.py index 4bc0b2be5..de286317c 100644 --- a/griptape/engines/rag/stages/response_rag_stage.py +++ b/griptape/engines/rag/stages/response_rag_stage.py @@ -31,8 +31,9 @@ def modules(self) -> list[BaseRagModule]: def run(self, context: RagContext) -> RagContext: logging.info("ResponseRagStage: running %s retrieval modules in parallel", len(self.response_modules)) - with self.futures_executor_fn() as executor: - results = utils.execute_futures_list([executor.submit(r.run, context) for r in self.response_modules]) + results = utils.execute_futures_list( + [self.futures_executor.submit(r.run, context) for r in self.response_modules] + ) context.outputs = results diff --git a/griptape/engines/rag/stages/retrieval_rag_stage.py b/griptape/engines/rag/stages/retrieval_rag_stage.py index fa618a7ff..6ce9fb19f 100644 --- a/griptape/engines/rag/stages/retrieval_rag_stage.py +++ b/griptape/engines/rag/stages/retrieval_rag_stage.py @@ -35,8 +35,9 @@ def modules(self) -> list[BaseRagModule]: def run(self, context: RagContext) -> RagContext: logging.info("RetrievalRagStage: running %s retrieval modules in parallel", len(self.retrieval_modules)) - with self.futures_executor_fn() as executor: - results = utils.execute_futures_list([executor.submit(r.run, context) for r in self.retrieval_modules]) + results = utils.execute_futures_list( + [self.futures_executor.submit(r.run, context) for r in self.retrieval_modules] + ) # flatten the list of lists results = list(itertools.chain.from_iterable(results)) diff --git a/griptape/loaders/base_loader.py b/griptape/loaders/base_loader.py index 09551d9ab..525b4df0a 100644 --- a/griptape/loaders/base_loader.py +++ b/griptape/loaders/base_loader.py @@ -1,11 +1,11 @@ from __future__ import annotations from abc import ABC, abstractmethod -from concurrent import futures -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Optional -from attrs import Factory, define, field +from attrs import define, field +from griptape.mixins import FuturesExecutorMixin from griptape.utils.futures import execute_futures_dict from griptape.utils.hash import bytes_to_hash, str_to_hash @@ -16,11 +16,7 @@ @define -class BaseLoader(ABC): - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - kw_only=True, - ) +class BaseLoader(FuturesExecutorMixin, ABC): encoding: Optional[str] = field(default=None, kw_only=True) @abstractmethod @@ -36,10 +32,12 @@ def load_collection( # to avoid duplicate work. sources_by_key = {self.to_key(source): source for source in sources} - with self.futures_executor_fn() as executor: - return execute_futures_dict( - {key: executor.submit(self.load, source, *args, **kwargs) for key, source in sources_by_key.items()}, - ) + return execute_futures_dict( + { + key: self.futures_executor.submit(self.load, source, *args, **kwargs) + for key, source in sources_by_key.items() + }, + ) def to_key(self, source: Any, *args, **kwargs) -> str: if isinstance(source, bytes): diff --git a/griptape/mixins/__init__.py b/griptape/mixins/__init__.py index d9eea53c2..1bfa95c9a 100644 --- a/griptape/mixins/__init__.py +++ b/griptape/mixins/__init__.py @@ -4,6 +4,7 @@ from .rule_mixin import RuleMixin from .serializable_mixin import SerializableMixin from .media_artifact_file_output_mixin import BlobArtifactFileOutputMixin +from .futures_executor_mixin import FuturesExecutorMixin __all__ = [ "ActivityMixin", @@ -12,4 +13,5 @@ "RuleMixin", "BlobArtifactFileOutputMixin", "SerializableMixin", + "FuturesExecutorMixin", ] diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py new file mode 100644 index 000000000..2120de784 --- /dev/null +++ b/griptape/mixins/futures_executor_mixin.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from abc import ABC +from concurrent import futures +from typing import Callable, Optional + +from attrs import Factory, define, field + + +@define(slots=False) +class FuturesExecutorMixin(ABC): + futures_executor_fn: Callable[[], futures.Executor] = field( + default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), + kw_only=True, + ) + + _futures_executor: Optional[futures.Executor] = field(init=False, default=None) + + @property + def futures_executor(self) -> futures.Executor: + if self._futures_executor is None: + self._futures_executor = self.futures_executor_fn() + + return self._futures_executor + + def __exit__(self, *args) -> None: + if self._futures_executor: + self._futures_executor.shutdown() + self._futures_executor = None diff --git a/griptape/structures/workflow.py b/griptape/structures/workflow.py index 53e59e751..15f665735 100644 --- a/griptape/structures/workflow.py +++ b/griptape/structures/workflow.py @@ -1,14 +1,15 @@ from __future__ import annotations import concurrent.futures as futures -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Optional -from attrs import Factory, define, field +from attrs import define from graphlib import TopologicalSorter from griptape.artifacts import ErrorArtifact from griptape.common import observable from griptape.memory.structure import Run +from griptape.mixins import FuturesExecutorMixin from griptape.structures import Structure if TYPE_CHECKING: @@ -16,12 +17,7 @@ @define -class Workflow(Structure): - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - kw_only=True, - ) - +class Workflow(Structure, FuturesExecutorMixin): @property def input_task(self) -> Optional[BaseTask]: return self.order_tasks()[0] if self.tasks else None diff --git a/griptape/tasks/actions_subtask.py b/griptape/tasks/actions_subtask.py index 7d2fb5efd..befad59e0 100644 --- a/griptape/tasks/actions_subtask.py +++ b/griptape/tasks/actions_subtask.py @@ -139,8 +139,9 @@ def run(self) -> BaseArtifact: return ErrorArtifact("no tool output") def execute_actions(self, actions: list[ToolAction]) -> list[tuple[str, BaseArtifact]]: - with self.futures_executor_fn() as executor: - results = utils.execute_futures_dict({a.tag: executor.submit(self.execute_action, a) for a in actions}) + results = utils.execute_futures_dict( + {a.tag: self.futures_executor.submit(self.execute_action, a) for a in actions} + ) return list(results.values()) diff --git a/griptape/tasks/base_task.py b/griptape/tasks/base_task.py index 1899eccd6..f5a772e48 100644 --- a/griptape/tasks/base_task.py +++ b/griptape/tasks/base_task.py @@ -3,15 +3,15 @@ import logging import uuid from abc import ABC, abstractmethod -from concurrent import futures from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Optional from attrs import Factory, define, field from griptape.artifacts import ErrorArtifact from griptape.config import config from griptape.events import FinishTaskEvent, StartTaskEvent, event_bus +from griptape.mixins import FuturesExecutorMixin if TYPE_CHECKING: from griptape.artifacts import BaseArtifact @@ -22,7 +22,7 @@ @define -class BaseTask(ABC): +class BaseTask(FuturesExecutorMixin, ABC): class State(Enum): PENDING = 1 EXECUTING = 2 @@ -37,10 +37,6 @@ class State(Enum): output: Optional[BaseArtifact] = field(default=None, init=False) context: dict[str, Any] = field(factory=dict, kw_only=True) - futures_executor_fn: Callable[[], futures.Executor] = field( - default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - kw_only=True, - ) def __rshift__(self, other: BaseTask) -> BaseTask: self.add_child(other) diff --git a/tests/mocks/mock_futures_executor.py b/tests/mocks/mock_futures_executor.py new file mode 100644 index 000000000..cbbf84560 --- /dev/null +++ b/tests/mocks/mock_futures_executor.py @@ -0,0 +1,4 @@ +from griptape.mixins import FuturesExecutorMixin + + +class MockFuturesExecutor(FuturesExecutorMixin): ... diff --git a/tests/unit/drivers/event_listener/test_base_event_listener_driver.py b/tests/unit/drivers/event_listener/test_base_event_listener_driver.py index 04cfef34b..114778f72 100644 --- a/tests/unit/drivers/event_listener/test_base_event_listener_driver.py +++ b/tests/unit/drivers/event_listener/test_base_event_listener_driver.py @@ -12,9 +12,7 @@ def test_publish_event(self): driver.publish_event(MockEvent().to_dict()) - executor.__enter__.assert_called_once() executor.submit.assert_called_once() - executor.__exit__.assert_called_once() def test__safe_try_publish_event(self): driver = MockEventListenerDriver(batched=False) diff --git a/tests/unit/mixins/test_futures_executor_mixin.py b/tests/unit/mixins/test_futures_executor_mixin.py new file mode 100644 index 000000000..3be336687 --- /dev/null +++ b/tests/unit/mixins/test_futures_executor_mixin.py @@ -0,0 +1,10 @@ +from concurrent import futures + +from tests.mocks.mock_futures_executor import MockFuturesExecutor + + +class TestFuturesExecutorMixin: + def test_futures_executor(self): + executor = futures.ThreadPoolExecutor() + + assert MockFuturesExecutor(futures_executor_fn=lambda: executor).futures_executor == executor From 06ca114109d88285f8460be42740361c5b82b0ac Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 13:45:56 -0600 Subject: [PATCH 02/16] Move executor cleanup to __del__ and fix Workflow --- griptape/mixins/futures_executor_mixin.py | 2 +- griptape/structures/workflow.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 2120de784..783b0292a 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -23,7 +23,7 @@ def futures_executor(self) -> futures.Executor: return self._futures_executor - def __exit__(self, *args) -> None: + def __del__(self) -> None: if self._futures_executor: self._futures_executor.shutdown() self._futures_executor = None diff --git a/griptape/structures/workflow.py b/griptape/structures/workflow.py index 15f665735..f1e1ec86b 100644 --- a/griptape/structures/workflow.py +++ b/griptape/structures/workflow.py @@ -96,7 +96,7 @@ def try_run(self, *args) -> Workflow: for task in ordered_tasks: if task.can_execute(): - future = self.futures_executor_fn().submit(task.execute) + future = self.futures_executor.submit(task.execute) futures_list[future] = task # Wait for all tasks to complete From 68f030309b9d939da2baf5cdef164be05b0e790a Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 14:05:10 -0600 Subject: [PATCH 03/16] Add an executor lock to FuturesExecutorMixin --- griptape/mixins/futures_executor_mixin.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 783b0292a..ba8f133a7 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -2,6 +2,7 @@ from abc import ABC from concurrent import futures +from threading import Lock from typing import Callable, Optional from attrs import Factory, define, field @@ -15,15 +16,25 @@ class FuturesExecutorMixin(ABC): ) _futures_executor: Optional[futures.Executor] = field(init=False, default=None) + _executor_lock: Lock = field(init=False, factory=Lock) @property def futures_executor(self) -> futures.Executor: if self._futures_executor is None: - self._futures_executor = self.futures_executor_fn() + with self._executor_lock: + if self._futures_executor is None: + try: + self._futures_executor = self.futures_executor_fn() + except Exception as e: + raise RuntimeError(f"Failed to initialize futures executor: {e}") return self._futures_executor + def __shutdown_executor(self, wait: bool = True) -> None: + with self._executor_lock: + if self._futures_executor: + self._futures_executor.shutdown(wait=wait) + self._futures_executor = None + def __del__(self) -> None: - if self._futures_executor: - self._futures_executor.shutdown() - self._futures_executor = None + self.__shutdown_executor(wait=False) From 22a30403423ba96dad783860dfe793d086a0155f Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 14:17:07 -0600 Subject: [PATCH 04/16] Cleanup FuturesExecutorMixin --- griptape/mixins/futures_executor_mixin.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index ba8f133a7..710c0026c 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -23,18 +23,12 @@ def futures_executor(self) -> futures.Executor: if self._futures_executor is None: with self._executor_lock: if self._futures_executor is None: - try: - self._futures_executor = self.futures_executor_fn() - except Exception as e: - raise RuntimeError(f"Failed to initialize futures executor: {e}") + self._futures_executor = self.futures_executor_fn() return self._futures_executor - def __shutdown_executor(self, wait: bool = True) -> None: + def __del__(self) -> None: with self._executor_lock: if self._futures_executor: - self._futures_executor.shutdown(wait=wait) + self._futures_executor.shutdown(True) self._futures_executor = None - - def __del__(self) -> None: - self.__shutdown_executor(wait=False) From 7741938b13e0c2ab25eb2f540ed6fe418d075aab Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 14:17:37 -0600 Subject: [PATCH 05/16] Fix shutdown call --- griptape/mixins/futures_executor_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 710c0026c..82d8984e6 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -30,5 +30,5 @@ def futures_executor(self) -> futures.Executor: def __del__(self) -> None: with self._executor_lock: if self._futures_executor: - self._futures_executor.shutdown(True) + self._futures_executor.shutdown(wait=True) self._futures_executor = None From 1d4e4a32456d4c94e478eee271cf06613b2eb4ef Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 14:53:30 -0600 Subject: [PATCH 06/16] Executor lock fix --- griptape/mixins/futures_executor_mixin.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 82d8984e6..e8fbca00b 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -20,10 +20,9 @@ class FuturesExecutorMixin(ABC): @property def futures_executor(self) -> futures.Executor: - if self._futures_executor is None: - with self._executor_lock: - if self._futures_executor is None: - self._futures_executor = self.futures_executor_fn() + with self._executor_lock: + if self._futures_executor is None: + self._futures_executor = self.futures_executor_fn() return self._futures_executor From ca4b8fb3296a0cea44d5b1ade8898563a13ad23e Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 15:12:04 -0600 Subject: [PATCH 07/16] Manually init thread lock in FuturesExecutorMixin --- griptape/mixins/futures_executor_mixin.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index e8fbca00b..59e455abf 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -1,8 +1,8 @@ from __future__ import annotations +import threading from abc import ABC from concurrent import futures -from threading import Lock from typing import Callable, Optional from attrs import Factory, define, field @@ -16,18 +16,18 @@ class FuturesExecutorMixin(ABC): ) _futures_executor: Optional[futures.Executor] = field(init=False, default=None) - _executor_lock: Lock = field(init=False, factory=Lock) + thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock())) @property def futures_executor(self) -> futures.Executor: - with self._executor_lock: + with self.thread_lock: if self._futures_executor is None: self._futures_executor = self.futures_executor_fn() return self._futures_executor def __del__(self) -> None: - with self._executor_lock: + with self.thread_lock: if self._futures_executor: self._futures_executor.shutdown(wait=True) self._futures_executor = None From 056fbaeef3feaa5cc6ace872b149338526c603d7 Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 15:26:40 -0600 Subject: [PATCH 08/16] Fix attrs --- griptape/mixins/futures_executor_mixin.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 59e455abf..6c09aee32 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -8,15 +8,14 @@ from attrs import Factory, define, field -@define(slots=False) +@define(slots=False, kw_only=True) class FuturesExecutorMixin(ABC): futures_executor_fn: Callable[[], futures.Executor] = field( default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), - kw_only=True, ) + thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock())) _futures_executor: Optional[futures.Executor] = field(init=False, default=None) - thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock())) @property def futures_executor(self) -> futures.Executor: From b8c5c6548f2af2d71615a21656604a2a25f1b9b6 Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Fri, 16 Aug 2024 15:56:53 -0600 Subject: [PATCH 09/16] Trigger empty build From 84a1f2c9e1f296b4c2b2754a03e00ca6721e98b0 Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Sat, 17 Aug 2024 13:44:32 -0600 Subject: [PATCH 10/16] Add utils.execute_futures_list_dict and fix BaseVectorStoreDriver.upsert_text_artifacts --- .../vector/base_vector_store_driver.py | 25 +++++++++----- .../vector/local_vector_store_driver.py | 34 ++++++++++--------- griptape/mixins/futures_executor_mixin.py | 25 ++++++-------- griptape/tasks/actions_subtask.py | 6 ++-- griptape/utils/__init__.py | 4 +-- griptape/utils/futures.py | 6 ++++ .../vector/test_local_vector_store_driver.py | 13 +++++++ tests/unit/tools/test_vector_store_tool.py | 3 -- tests/unit/utils/test_futures.py | 15 ++++++-- 9 files changed, 81 insertions(+), 50 deletions(-) diff --git a/griptape/drivers/vector/base_vector_store_driver.py b/griptape/drivers/vector/base_vector_store_driver.py index 7ebccdcad..5c1ed157c 100644 --- a/griptape/drivers/vector/base_vector_store_driver.py +++ b/griptape/drivers/vector/base_vector_store_driver.py @@ -46,20 +46,27 @@ def upsert_text_artifacts( if isinstance(artifacts, list): utils.execute_futures_list( [ - self.futures_executor.submit(self.upsert_text_artifact, a, namespace=None, meta=meta, **kwargs) + self.futures_executor.submit( + self.upsert_text_artifact, a, namespace=None, meta=meta, **kwargs + ) for a in artifacts ], ) else: - utils.execute_futures_dict( - { - namespace: self.futures_executor.submit( - self.upsert_text_artifact, a, namespace=namespace, meta=meta, **kwargs + futures_dict = {} + + for namespace, artifact_list in artifacts.items(): + for a in artifact_list: + if not futures_dict.get(namespace): + futures_dict[namespace] = [] + + futures_dict[namespace].append( + self.futures_executor.submit( + self.upsert_text_artifact, a, namespace=namespace, meta=meta, **kwargs + ) ) - for namespace, artifact_list in artifacts.items() - for a in artifact_list - }, - ) + + utils.execute_futures_list_dict(futures_dict) def upsert_text_artifact( self, diff --git a/griptape/drivers/vector/local_vector_store_driver.py b/griptape/drivers/vector/local_vector_store_driver.py index 2b42b19f5..d8001db12 100644 --- a/griptape/drivers/vector/local_vector_store_driver.py +++ b/griptape/drivers/vector/local_vector_store_driver.py @@ -7,7 +7,7 @@ from dataclasses import asdict from typing import Callable, NoReturn, Optional, TextIO -from attrs import Factory, define, field +from attrs import define, field, Factory from numpy import dot from numpy.linalg import norm @@ -31,24 +31,19 @@ def __attrs_post_init__(self) -> None: if not os.path.isfile(self.persist_file): with open(self.persist_file, "w") as file: - self.save_entries_to_file(file) + self.__save_entries_to_file(file) with open(self.persist_file, "r+") as file: if os.path.getsize(self.persist_file) > 0: self.entries = self.load_entries_from_file(file) else: - self.save_entries_to_file(file) - - def save_entries_to_file(self, json_file: TextIO) -> None: - with self.thread_lock: - serialized_data = {k: asdict(v) for k, v in self.entries.items()} - - json.dump(serialized_data, json_file) + self.__save_entries_to_file(file) def load_entries_from_file(self, json_file: TextIO) -> dict[str, BaseVectorStoreDriver.Entry]: - data = json.load(json_file) + with self.thread_lock: + data = json.load(json_file) - return {k: BaseVectorStoreDriver.Entry.from_dict(v) for k, v in data.items()} + return {k: BaseVectorStoreDriver.Entry.from_dict(v) for k, v in data.items()} def upsert_vector( self, @@ -62,7 +57,7 @@ def upsert_vector( vector_id = vector_id or utils.str_to_hash(str(vector)) with self.thread_lock: - self.entries[self._namespaced_vector_id(vector_id, namespace=namespace)] = self.Entry( + self.entries[self.__namespaced_vector_id(vector_id, namespace=namespace)] = self.Entry( id=vector_id, vector=vector, meta=meta, @@ -73,12 +68,12 @@ def upsert_vector( # TODO: optimize later since it reserializes all entries from memory and stores them in the JSON file # every time a new vector is inserted with open(self.persist_file, "w") as file: - self.save_entries_to_file(file) + self.__save_entries_to_file(file) return vector_id def load_entry(self, vector_id: str, *, namespace: Optional[str] = None) -> Optional[BaseVectorStoreDriver.Entry]: - return self.entries.get(self._namespaced_vector_id(vector_id, namespace=namespace), None) + return self.entries.get(self.__namespaced_vector_id(vector_id, namespace=namespace), None) def load_entries(self, *, namespace: Optional[str] = None) -> list[BaseVectorStoreDriver.Entry]: return [entry for key, entry in self.entries.items() if namespace is None or entry.namespace == namespace] @@ -100,8 +95,9 @@ def query( entries = self.entries entries_and_relatednesses = [ - (entry, self.relatedness_fn(query_embedding, entry.vector)) for entry in entries.values() + (entry, self.relatedness_fn(query_embedding, entry.vector)) for entry in list(entries.values()) ] + entries_and_relatednesses.sort(key=operator.itemgetter(1), reverse=True) result = [ @@ -120,5 +116,11 @@ def query( def delete_vector(self, vector_id: str) -> NoReturn: raise NotImplementedError(f"{self.__class__.__name__} does not support deletion.") - def _namespaced_vector_id(self, vector_id: str, *, namespace: Optional[str]) -> str: + def __save_entries_to_file(self, json_file: TextIO) -> None: + with self.thread_lock: + serialized_data = {k: asdict(v) for k, v in self.entries.items()} + + json.dump(serialized_data, json_file) + + def __namespaced_vector_id(self, vector_id: str, *, namespace: Optional[str]) -> str: return vector_id if namespace is None else f"{namespace}-{vector_id}" diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 6c09aee32..737e07d23 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -1,6 +1,5 @@ from __future__ import annotations -import threading from abc import ABC from concurrent import futures from typing import Callable, Optional @@ -13,20 +12,18 @@ class FuturesExecutorMixin(ABC): futures_executor_fn: Callable[[], futures.Executor] = field( default=Factory(lambda: lambda: futures.ThreadPoolExecutor()), ) - thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock())) - _futures_executor: Optional[futures.Executor] = field(init=False, default=None) + futures_executor: Optional[futures.Executor] = field( + default=Factory( + lambda self: self.futures_executor_fn(), + takes_self=True + ) + ) - @property - def futures_executor(self) -> futures.Executor: - with self.thread_lock: - if self._futures_executor is None: - self._futures_executor = self.futures_executor_fn() + def __del__(self) -> None: + executor = self.futures_executor - return self._futures_executor + if executor: + self.futures_executor = None - def __del__(self) -> None: - with self.thread_lock: - if self._futures_executor: - self._futures_executor.shutdown(wait=True) - self._futures_executor = None + executor.shutdown(wait=True) diff --git a/griptape/tasks/actions_subtask.py b/griptape/tasks/actions_subtask.py index befad59e0..6d405db93 100644 --- a/griptape/tasks/actions_subtask.py +++ b/griptape/tasks/actions_subtask.py @@ -139,12 +139,10 @@ def run(self) -> BaseArtifact: return ErrorArtifact("no tool output") def execute_actions(self, actions: list[ToolAction]) -> list[tuple[str, BaseArtifact]]: - results = utils.execute_futures_dict( - {a.tag: self.futures_executor.submit(self.execute_action, a) for a in actions} + return utils.execute_futures_list( + [self.futures_executor.submit(self.execute_action, a) for a in actions] ) - return list(results.values()) - def execute_action(self, action: ToolAction) -> tuple[str, BaseArtifact]: if action.tool is not None: if action.path is not None: diff --git a/griptape/utils/__init__.py b/griptape/utils/__init__.py index 19730ac3b..03725f59d 100644 --- a/griptape/utils/__init__.py +++ b/griptape/utils/__init__.py @@ -5,8 +5,7 @@ from .python_runner import PythonRunner from .command_runner import CommandRunner from .chat import Chat -from .futures import execute_futures_dict -from .futures import execute_futures_list +from .futures import execute_futures_dict, execute_futures_list, execute_futures_list_dict from .token_counter import TokenCounter from .dict_utils import remove_null_values_in_dict_recursively, dict_merge, remove_key_in_dict_recursively from .file_utils import load_file, load_files @@ -37,6 +36,7 @@ def minify_json(value: str) -> str: "is_dependency_installed", "execute_futures_dict", "execute_futures_list", + "execute_futures_list_dict", "TokenCounter", "remove_null_values_in_dict_recursively", "dict_merge", diff --git a/griptape/utils/futures.py b/griptape/utils/futures.py index ea22e4c56..e8eb74395 100644 --- a/griptape/utils/futures.py +++ b/griptape/utils/futures.py @@ -16,3 +16,9 @@ def execute_futures_list(fs_list: list[futures.Future[T]]) -> list[T]: futures.wait(fs_list, timeout=None, return_when=futures.ALL_COMPLETED) return [future.result() for future in fs_list] + + +def execute_futures_list_dict(fs_dict: dict[str, list[futures.Future[T]]]) -> dict[str, T]: + execute_futures_list([item for sublist in fs_dict.values() for item in sublist]) + + return {key: [f.result() for f in fs] for key, fs in fs_dict.items()} diff --git a/tests/unit/drivers/vector/test_local_vector_store_driver.py b/tests/unit/drivers/vector/test_local_vector_store_driver.py index 6f022793c..cfe049ff3 100644 --- a/tests/unit/drivers/vector/test_local_vector_store_driver.py +++ b/tests/unit/drivers/vector/test_local_vector_store_driver.py @@ -22,3 +22,16 @@ def test_upsert_text_artifacts_list(self, driver): assert len(driver.load_artifacts(namespace="foo")) == 0 assert len(driver.load_artifacts()) == 2 + + def test_upsert_text_artifacts_stress_test(self, driver): + driver.upsert_text_artifacts( + { + "test1": [TextArtifact(f"foo-{i}") for i in range(0, 1000)], + "test2": [TextArtifact(f"foo-{i}") for i in range(0, 1000)], + "test3": [TextArtifact(f"foo-{i}") for i in range(0, 1000)] + } + ) + + assert len(driver.query("foo", namespace="test1")) == 1000 + assert len(driver.query("foo", namespace="test2")) == 1000 + assert len(driver.query("foo", namespace="test3")) == 1000 diff --git a/tests/unit/tools/test_vector_store_tool.py b/tests/unit/tools/test_vector_store_tool.py index ea52a13ea..abffcfd30 100644 --- a/tests/unit/tools/test_vector_store_tool.py +++ b/tests/unit/tools/test_vector_store_tool.py @@ -8,9 +8,6 @@ class TestVectorStoreTool: @pytest.fixture(autouse=True) - def _mock_try_run(self, mocker): - mocker.patch("griptape.drivers.OpenAiEmbeddingDriver.try_embed_chunk", return_value=[0, 1]) - def test_search(self): driver = LocalVectorStoreDriver(embedding_driver=MockEmbeddingDriver()) tool = VectorStoreTool(description="Test", vector_store_driver=driver) diff --git a/tests/unit/utils/test_futures.py b/tests/unit/utils/test_futures.py index 04ddb9877..983fd45cc 100644 --- a/tests/unit/utils/test_futures.py +++ b/tests/unit/utils/test_futures.py @@ -19,8 +19,19 @@ def test_execute_futures_list(self): [executor.submit(self.foobar, "foo"), executor.submit(self.foobar, "baz")] ) - assert result[0] == "foo-bar" - assert result[1] == "baz-bar" + assert set(result) == {"foo-bar", "baz-bar"} + + def test_execute_futures_list_dict(self): + with futures.ThreadPoolExecutor() as executor: + result = utils.execute_futures_list_dict({ + "test1": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], + "test2": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], + "test3": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)] + }) + + assert len(result["test1"]) == 1000 + assert len(result["test2"]) == 1000 + assert len(result["test3"]) == 1000 def foobar(self, foo): return f"{foo}-bar" From da63f5886b76d088c8485d99447a496b8b1d8078 Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Sat, 17 Aug 2024 13:47:12 -0600 Subject: [PATCH 11/16] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b62a3bb0b..70e8b6173 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `JsonArtifact` for handling de/seralization of values. - `Chat.logger_level` for setting what the `Chat` utility sets the logger level to. - `FuturesExecutorMixin` to DRY up and optimize concurrent code across multiple classes. +- `utils.execute_futures_list_dict` for executing a dict of lists of futures. ### Changed - **BREAKING**: Removed all uses of `EventPublisherMixin` in favor of `event_bus`. @@ -58,6 +59,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - `JsonExtractionEngine` failing to parse json when the LLM outputs more than just the json. - Exception when adding `ErrorArtifact`'s to the Prompt Stack. +- Concurrency bug in `BaseVectorStoreDriver.upsert_text_artifacts` ## [0.29.2] - 2024-08-16 From 7bfb8a12af01a47883c2ac3b39fbdb02b2227e4b Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Sat, 17 Aug 2024 13:50:51 -0600 Subject: [PATCH 12/16] Fix formatting and type checks --- griptape/drivers/vector/base_vector_store_driver.py | 4 +--- griptape/drivers/vector/local_vector_store_driver.py | 2 +- griptape/mixins/futures_executor_mixin.py | 5 +---- griptape/tasks/actions_subtask.py | 4 +--- griptape/utils/futures.py | 2 +- .../drivers/vector/test_local_vector_store_driver.py | 2 +- tests/unit/tools/test_vector_store_tool.py | 3 --- tests/unit/utils/test_futures.py | 12 +++++++----- 8 files changed, 13 insertions(+), 21 deletions(-) diff --git a/griptape/drivers/vector/base_vector_store_driver.py b/griptape/drivers/vector/base_vector_store_driver.py index 5c1ed157c..2abb29c3f 100644 --- a/griptape/drivers/vector/base_vector_store_driver.py +++ b/griptape/drivers/vector/base_vector_store_driver.py @@ -46,9 +46,7 @@ def upsert_text_artifacts( if isinstance(artifacts, list): utils.execute_futures_list( [ - self.futures_executor.submit( - self.upsert_text_artifact, a, namespace=None, meta=meta, **kwargs - ) + self.futures_executor.submit(self.upsert_text_artifact, a, namespace=None, meta=meta, **kwargs) for a in artifacts ], ) diff --git a/griptape/drivers/vector/local_vector_store_driver.py b/griptape/drivers/vector/local_vector_store_driver.py index d8001db12..36203d540 100644 --- a/griptape/drivers/vector/local_vector_store_driver.py +++ b/griptape/drivers/vector/local_vector_store_driver.py @@ -7,7 +7,7 @@ from dataclasses import asdict from typing import Callable, NoReturn, Optional, TextIO -from attrs import define, field, Factory +from attrs import Factory, define, field from numpy import dot from numpy.linalg import norm diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 737e07d23..88adc57b8 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -14,10 +14,7 @@ class FuturesExecutorMixin(ABC): ) futures_executor: Optional[futures.Executor] = field( - default=Factory( - lambda self: self.futures_executor_fn(), - takes_self=True - ) + default=Factory(lambda self: self.futures_executor_fn(), takes_self=True) ) def __del__(self) -> None: diff --git a/griptape/tasks/actions_subtask.py b/griptape/tasks/actions_subtask.py index 6d405db93..83ffc2081 100644 --- a/griptape/tasks/actions_subtask.py +++ b/griptape/tasks/actions_subtask.py @@ -139,9 +139,7 @@ def run(self) -> BaseArtifact: return ErrorArtifact("no tool output") def execute_actions(self, actions: list[ToolAction]) -> list[tuple[str, BaseArtifact]]: - return utils.execute_futures_list( - [self.futures_executor.submit(self.execute_action, a) for a in actions] - ) + return utils.execute_futures_list([self.futures_executor.submit(self.execute_action, a) for a in actions]) def execute_action(self, action: ToolAction) -> tuple[str, BaseArtifact]: if action.tool is not None: diff --git a/griptape/utils/futures.py b/griptape/utils/futures.py index e8eb74395..b91bb3918 100644 --- a/griptape/utils/futures.py +++ b/griptape/utils/futures.py @@ -18,7 +18,7 @@ def execute_futures_list(fs_list: list[futures.Future[T]]) -> list[T]: return [future.result() for future in fs_list] -def execute_futures_list_dict(fs_dict: dict[str, list[futures.Future[T]]]) -> dict[str, T]: +def execute_futures_list_dict(fs_dict: dict[str, list[futures.Future[T]]]) -> dict[str, list[T]]: execute_futures_list([item for sublist in fs_dict.values() for item in sublist]) return {key: [f.result() for f in fs] for key, fs in fs_dict.items()} diff --git a/tests/unit/drivers/vector/test_local_vector_store_driver.py b/tests/unit/drivers/vector/test_local_vector_store_driver.py index cfe049ff3..2504b2486 100644 --- a/tests/unit/drivers/vector/test_local_vector_store_driver.py +++ b/tests/unit/drivers/vector/test_local_vector_store_driver.py @@ -28,7 +28,7 @@ def test_upsert_text_artifacts_stress_test(self, driver): { "test1": [TextArtifact(f"foo-{i}") for i in range(0, 1000)], "test2": [TextArtifact(f"foo-{i}") for i in range(0, 1000)], - "test3": [TextArtifact(f"foo-{i}") for i in range(0, 1000)] + "test3": [TextArtifact(f"foo-{i}") for i in range(0, 1000)], } ) diff --git a/tests/unit/tools/test_vector_store_tool.py b/tests/unit/tools/test_vector_store_tool.py index abffcfd30..30596f09f 100644 --- a/tests/unit/tools/test_vector_store_tool.py +++ b/tests/unit/tools/test_vector_store_tool.py @@ -1,5 +1,3 @@ -import pytest - from griptape.artifacts import ListArtifact, TextArtifact from griptape.drivers import LocalVectorStoreDriver from griptape.tools import VectorStoreTool @@ -7,7 +5,6 @@ class TestVectorStoreTool: - @pytest.fixture(autouse=True) def test_search(self): driver = LocalVectorStoreDriver(embedding_driver=MockEmbeddingDriver()) tool = VectorStoreTool(description="Test", vector_store_driver=driver) diff --git a/tests/unit/utils/test_futures.py b/tests/unit/utils/test_futures.py index 983fd45cc..c34124c76 100644 --- a/tests/unit/utils/test_futures.py +++ b/tests/unit/utils/test_futures.py @@ -23,11 +23,13 @@ def test_execute_futures_list(self): def test_execute_futures_list_dict(self): with futures.ThreadPoolExecutor() as executor: - result = utils.execute_futures_list_dict({ - "test1": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], - "test2": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], - "test3": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)] - }) + result = utils.execute_futures_list_dict( + { + "test1": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], + "test2": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], + "test3": [executor.submit(self.foobar, f"foo-{i}") for i in range(0, 1000)], + } + ) assert len(result["test1"]) == 1000 assert len(result["test2"]) == 1000 From 3da772eae20001086b278d90cbee611378e2743a Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Sat, 17 Aug 2024 13:53:56 -0600 Subject: [PATCH 13/16] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70e8b6173..38d5784cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,7 +59,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - `JsonExtractionEngine` failing to parse json when the LLM outputs more than just the json. - Exception when adding `ErrorArtifact`'s to the Prompt Stack. -- Concurrency bug in `BaseVectorStoreDriver.upsert_text_artifacts` +- Concurrency bug in `BaseVectorStoreDriver.upsert_text_artifacts`. ## [0.29.2] - 2024-08-16 From d962d1d88b4689fae662aebf8c835f9c9e9a6975 Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Mon, 19 Aug 2024 09:11:10 -0600 Subject: [PATCH 14/16] None check --- griptape/mixins/futures_executor_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 88adc57b8..84a3b6f25 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -20,7 +20,7 @@ class FuturesExecutorMixin(ABC): def __del__(self) -> None: executor = self.futures_executor - if executor: + if executor is not None: self.futures_executor = None executor.shutdown(wait=True) From 24287ee3e90a8c798c6f61141f8e09d40e3752c7 Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Mon, 19 Aug 2024 09:34:17 -0600 Subject: [PATCH 15/16] Cleanup --- griptape/mixins/futures_executor_mixin.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index 84a3b6f25..c71e69006 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -18,9 +18,7 @@ class FuturesExecutorMixin(ABC): ) def __del__(self) -> None: - executor = self.futures_executor - - if executor is not None: + if executor := self.futures_executor is not None: self.futures_executor = None executor.shutdown(wait=True) From 19dc148ce4aac008251ee8f86286ff6609592a6d Mon Sep 17 00:00:00 2001 From: Vasily Vasinov Date: Mon, 19 Aug 2024 09:43:44 -0600 Subject: [PATCH 16/16] Fix types --- griptape/mixins/futures_executor_mixin.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/griptape/mixins/futures_executor_mixin.py b/griptape/mixins/futures_executor_mixin.py index c71e69006..84a3b6f25 100644 --- a/griptape/mixins/futures_executor_mixin.py +++ b/griptape/mixins/futures_executor_mixin.py @@ -18,7 +18,9 @@ class FuturesExecutorMixin(ABC): ) def __del__(self) -> None: - if executor := self.futures_executor is not None: + executor = self.futures_executor + + if executor is not None: self.futures_executor = None executor.shutdown(wait=True)