From a9922497782047607bd9f17685dd4615079ea74b Mon Sep 17 00:00:00 2001 From: Collin Dutter Date: Tue, 3 Sep 2024 10:41:47 -0700 Subject: [PATCH] Merge main into dev --- CHANGELOG.md | 6 +++++ .../base_event_listener_driver.py | 11 +++++--- pyproject.toml | 2 +- tests/unit/tools/test_file_manager.py | 25 ++++++++++++++++++- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17fe7f136a..8542bfbb5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 **Note**: This release includes breaking changes. Please refer to the [Migration Guide](./MIGRATION.md#030x-to-031x) for details. +## [0.30.2] - 2024-08-26 + +### Fixed +- Ensure thread safety when publishing events by adding a thread lock to batch operations in `BaseEventListenerDriver`. +- `FileManagerTool` failing to save Artifacts created by `ExtractionTool` with a `CsvExtractionEngine`. + ## [0.30.1] - 2024-08-21 ### Fixed diff --git a/griptape/drivers/event_listener/base_event_listener_driver.py b/griptape/drivers/event_listener/base_event_listener_driver.py index 0af57f0f38..75bdc9f75d 100644 --- a/griptape/drivers/event_listener/base_event_listener_driver.py +++ b/griptape/drivers/event_listener/base_event_listener_driver.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading from abc import ABC, abstractmethod from typing import TYPE_CHECKING @@ -18,6 +19,7 @@ class BaseEventListenerDriver(FuturesExecutorMixin, ABC): batched: bool = field(default=True, kw_only=True) batch_size: int = field(default=10, kw_only=True) + thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock())) _batch: list[dict] = field(default=Factory(list), kw_only=True) @@ -39,10 +41,11 @@ def _safe_try_publish_event(self, event: BaseEvent | dict, *, flush: bool) -> No event_payload = event if isinstance(event, dict) else event.to_dict() if self.batched: - self._batch.append(event_payload) - if len(self.batch) >= self.batch_size or flush: - self.try_publish_event_payload_batch(self.batch) - self._batch = [] + with self.thread_lock: + self._batch.append(event_payload) + if len(self.batch) >= self.batch_size or flush: + self.try_publish_event_payload_batch(self.batch) + self._batch = [] return else: self.try_publish_event_payload(event_payload) diff --git a/pyproject.toml b/pyproject.toml index 2afdc5910f..7bff51e16a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "griptape" -version = "0.30.1" +version = "0.30.2" description = "Modular Python framework for LLM workflows, tools, memory, and data." authors = ["Griptape "] license = "Apache 2.0" diff --git a/tests/unit/tools/test_file_manager.py b/tests/unit/tools/test_file_manager.py index 469918a025..e97bcbe17b 100644 --- a/tests/unit/tools/test_file_manager.py +++ b/tests/unit/tools/test_file_manager.py @@ -5,7 +5,7 @@ import pytest -from griptape.artifacts import ListArtifact, TextArtifact +from griptape.artifacts import CsvRowArtifact, ListArtifact, TextArtifact from griptape.drivers.file_manager.local_file_manager_driver import LocalFileManagerDriver from griptape.loaders.text_loader import TextLoader from griptape.tools import FileManagerTool @@ -106,6 +106,29 @@ def test_save_memory_artifacts_to_disk_for_multiple_artifacts(self, temp_dir): assert Path(os.path.join(temp_dir, "test", f"{artifacts[1].name}-{file_name}")).read_text() == "baz" assert result.value == "Successfully saved memory artifacts to disk" + def test_save_memory_artifacts_to_disk_for_non_string_artifact(self, temp_dir): + memory = defaults.text_task_memory("Memory1") + artifact = CsvRowArtifact({"foo": "bar"}) + + memory.store_artifact("foobar", artifact) + + file_manager = FileManagerTool( + input_memory=[memory], file_manager_driver=LocalFileManagerDriver(workdir=temp_dir) + ) + result = file_manager.save_memory_artifacts_to_disk( + { + "values": { + "dir_name": "test", + "file_name": "foobar.txt", + "memory_name": memory.name, + "artifact_namespace": "foobar", + } + } + ) + + assert Path(os.path.join(temp_dir, "test", "foobar.txt")).read_text() == "bar" + assert result.value == "Successfully saved memory artifacts to disk" + def test_save_content_to_file(self, temp_dir): file_manager = FileManagerTool(file_manager_driver=LocalFileManagerDriver(workdir=temp_dir)) result = file_manager.save_content_to_file(