Skip to content

Commit

Permalink
Add thread lock when batch publishing events
Browse files Browse the repository at this point in the history
  • Loading branch information
collindutter committed Aug 26, 2024
1 parent d576619 commit ddab692
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions griptape/drivers/event_listener/base_event_listener_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import threading
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

Expand All @@ -18,6 +19,7 @@
class BaseEventListenerDriver(FuturesExecutorMixin, ABC):
batched: bool = field(default=True, kw_only=True)
batch_size: int = field(default=10, kw_only=True)
thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock()))

_batch: list[dict] = field(default=Factory(list), kw_only=True)

Expand All @@ -39,10 +41,11 @@ def _safe_try_publish_event(self, event: BaseEvent | dict, *, flush: bool) -> No
event_payload = event if isinstance(event, dict) else event.to_dict()

if self.batched:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size or flush:
self.try_publish_event_payload_batch(self.batch)
self._batch = []
with self.thread_lock:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size or flush:
self.try_publish_event_payload_batch(self.batch)
self._batch = []
return
else:
self.try_publish_event_payload(event_payload)
Expand Down

0 comments on commit ddab692

Please sign in to comment.