Skip to content

Commit

Permalink
Add events util
Browse files Browse the repository at this point in the history
  • Loading branch information
collindutter committed Dec 20, 2024
1 parent 78c7e06 commit 9a02b8b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
2 changes: 2 additions & 0 deletions griptape/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .reference_utils import references_from_artifacts
from .file_utils import get_mime_type
from .contextvars_utils import with_contextvars
from .events import Events


def minify_json(value: str) -> str:
Expand Down Expand Up @@ -49,4 +50,5 @@ def minify_json(value: str) -> str:
"references_from_artifacts",
"get_mime_type",
"with_contextvars",
"Events",
]
68 changes: 68 additions & 0 deletions griptape/utils/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

from queue import Queue
from threading import Thread
from typing import TYPE_CHECKING, Optional

from attrs import Factory, define, field

from griptape.events import (
BaseEvent,
EventBus,
EventListener,
FinishStructureRunEvent,
)
from griptape.utils.contextvars_utils import with_contextvars

if TYPE_CHECKING:
from collections.abc import Iterator

from griptape.structures import Structure


@define
class Events:
"""A wrapper for Structures that converts `BaseChunkEvent`s into an iterator of TextArtifacts.
It achieves this by running the Structure in a separate thread, listening for events from the Structure,
and yielding those events.
See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators
Attributes:
structure: The Structure to wrap.
_event_queue: A queue to hold events from the Structure.
"""

structure: Structure = field()
event_types: Optional[list[type[BaseEvent]]] = field(default=None, kw_only=True)

_event_queue: Queue[BaseEvent] = field(default=Factory(lambda: Queue()))

def run(self, *args) -> Iterator[BaseEvent]:
t = Thread(target=with_contextvars(self._run_structure), args=args)
t.start()

Check warning on line 44 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L43-L44

Added lines #L43 - L44 were not covered by tests

while True:
event = self._event_queue.get()
yield event

Check warning on line 48 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L47-L48

Added lines #L47 - L48 were not covered by tests
if isinstance(event, FinishStructureRunEvent):
break
t.join()

Check warning on line 51 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L50-L51

Added lines #L50 - L51 were not covered by tests

def _run_structure(self, *args) -> None:
def event_handler(event: BaseEvent) -> None:
self._event_queue.put(event)

Check warning on line 55 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L54-L55

Added lines #L54 - L55 were not covered by tests

event_types = [BaseEvent] if self.event_types is None else self.event_types

Check warning on line 57 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L57

Added line #L57 was not covered by tests
if FinishStructureRunEvent not in event_types:
event_types.append(FinishStructureRunEvent)
stream_event_listener = EventListener(

Check warning on line 60 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L59-L60

Added lines #L59 - L60 were not covered by tests
on_event=event_handler,
event_types=event_types,
)
EventBus.add_event_listener(stream_event_listener)

Check warning on line 64 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L64

Added line #L64 was not covered by tests

self.structure.run(*args)

Check warning on line 66 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L66

Added line #L66 was not covered by tests

EventBus.remove_event_listener(stream_event_listener)

Check warning on line 68 in griptape/utils/events.py

View check run for this annotation

Codecov / codecov/patch

griptape/utils/events.py#L68

Added line #L68 was not covered by tests

0 comments on commit 9a02b8b

Please sign in to comment.