Skip to content

Commit

Permalink
Hotfix/event lock (#1104)
Browse files Browse the repository at this point in the history
  • Loading branch information
collindutter authored Aug 26, 2024
1 parent d576619 commit ad500c1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## [0.30.2] - 2024-08-26

### Fixed
- Ensure thread safety when publishing events by adding a thread lock to batch operations in `BaseEventListenerDriver`.

## [0.30.1] - 2024-08-21

### Fixed
Expand Down
11 changes: 7 additions & 4 deletions griptape/drivers/event_listener/base_event_listener_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import threading
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

Expand All @@ -18,6 +19,7 @@
class BaseEventListenerDriver(FuturesExecutorMixin, ABC):
batched: bool = field(default=True, kw_only=True)
batch_size: int = field(default=10, kw_only=True)
thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock()))

_batch: list[dict] = field(default=Factory(list), kw_only=True)

Expand All @@ -39,10 +41,11 @@ def _safe_try_publish_event(self, event: BaseEvent | dict, *, flush: bool) -> No
event_payload = event if isinstance(event, dict) else event.to_dict()

if self.batched:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size or flush:
self.try_publish_event_payload_batch(self.batch)
self._batch = []
with self.thread_lock:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size or flush:
self.try_publish_event_payload_batch(self.batch)
self._batch = []
return
else:
self.try_publish_event_payload(event_payload)
Expand Down

0 comments on commit ad500c1

Please sign in to comment.