Skip to content

Commit

Permalink
Add global event bus (#1051)
Browse files Browse the repository at this point in the history
  • Loading branch information
collindutter authored Aug 8, 2024
1 parent 105f4d0 commit 443cf29
Show file tree
Hide file tree
Showing 30 changed files with 254 additions and 303 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Parameter `structure` to `BaseTask`.
- Method `try_find_task` to `Structure`.
- `TranslateQueryRagModule` `RagEngine` module for translating input queries.
- Global event bus, `griptape.events.event_bus`, for publishing and subscribing to events.

### Changed
- **BREAKING**: Removed all uses of `EventPublisherMixin` in favor of `event_bus`.
- **BREAKING**: Removed `EventPublisherMixin`.
- `BaseTask.add_parent/child` will now call `self.structure.add_task` if possible.

## [0.29.0] - 2024-07-30
Expand Down
89 changes: 52 additions & 37 deletions docs/griptape-framework/drivers/event-listener-drivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@ import os

from griptape.drivers import AmazonSqsEventListenerDriver
from griptape.events import (
EventListener,
EventListener, event_bus
)
from griptape.rules import Rule
from griptape.structures import Agent

agent = Agent(
rules=[
Rule(
value="You will be provided with a block of text, and your task is to extract a list of keywords from it."
)
],
event_listeners=[
event_bus.add_event_listeners(
[
EventListener(
handler=lambda event: { # You can optionally use the handler to transform the event payload before sending it to the Driver
"event": event.to_dict(),
},
driver=AmazonSqsEventListenerDriver(
queue_url=os.environ["AMAZON_SQS_QUEUE_URL"],
),
),
]
)


agent = Agent(
rules=[
Rule(
value="You will be provided with a block of text, and your task is to extract a list of keywords from it."
)
],
)

Expand Down Expand Up @@ -83,23 +84,26 @@ import os

from griptape.drivers import AmazonSqsEventListenerDriver
from griptape.events import (
EventListener,
EventListener, event_bus
)
from griptape.rules import Rule
from griptape.structures import Agent

agent = Agent(
rules=[
Rule(
value="You will be provided with a block of text, and your task is to extract a list of keywords from it."
)
],
event_listeners=[
event_bus.add_event_listeners(
[
EventListener(
driver=AmazonSqsEventListenerDriver(
queue_url=os.environ["AMAZON_SQS_QUEUE_URL"],
),
),
]
)

agent = Agent(
rules=[
Rule(
value="You will be provided with a block of text, and your task is to extract a list of keywords from it."
)
],
)

Expand Down Expand Up @@ -128,10 +132,23 @@ from griptape.drivers import AwsIotCoreEventListenerDriver, OpenAiChatPromptDriv
from griptape.events import (
EventListener,
FinishStructureRunEvent,
event_bus
)
from griptape.rules import Rule
from griptape.structures import Agent

event_bus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=AwsIotCoreEventListenerDriver(
topic=os.environ["AWS_IOT_CORE_TOPIC"],
iot_endpoint=os.environ["AWS_IOT_CORE_ENDPOINT"],
),
),
]
)

agent = Agent(
rules=[
Rule(
Expand All @@ -143,15 +160,6 @@ agent = Agent(
model="gpt-3.5-turbo", temperature=0.7
)
),
event_listeners=[
EventListener(
event_types=[FinishStructureRunEvent],
driver=AwsIotCoreEventListenerDriver(
topic=os.environ["AWS_IOT_CORE_TOPIC"],
iot_endpoint=os.environ["AWS_IOT_CORE_ENDPOINT"],
),
),
],
)

agent.run("I want to fly from Orlando to Boston")
Expand All @@ -171,18 +179,19 @@ from griptape.drivers import GriptapeCloudEventListenerDriver
from griptape.events import (
EventListener,
FinishStructureRunEvent,
event_bus
)
from griptape.structures import Agent

agent = Agent(
event_listeners=[
event_bus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
# By default, GriptapeCloudEventListenerDriver uses the api key provided
# in the GT_CLOUD_API_KEY environment variable.
driver=GriptapeCloudEventListenerDriver(),
),
],
]
)

agent.run(
Expand All @@ -201,20 +210,23 @@ from griptape.drivers import WebhookEventListenerDriver
from griptape.events import (
EventListener,
FinishStructureRunEvent,
event_bus
)
from griptape.structures import Agent

agent = Agent(
event_listeners=[
event_bus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=WebhookEventListenerDriver(
webhook_url=os.environ["WEBHOOK_URL"],
),
),
],
]
)

agent = Agent()

agent.run("Analyze the pros and cons of remote work vs. office work")
```
### Pusher
Expand All @@ -229,12 +241,13 @@ import os
from griptape.drivers import PusherEventListenerDriver
from griptape.events import (
EventListener,
FinishStructureRunEvent
FinishStructureRunEvent,
event_bus
)
from griptape.structures import Agent

agent = Agent(
event_listeners=[
event_bus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=PusherEventListenerDriver(
Expand All @@ -250,6 +263,8 @@ agent = Agent(
],
)

agent = Agent()

agent.run("Analyze the pros and cons of remote work vs. office work")

```
62 changes: 32 additions & 30 deletions docs/griptape-framework/misc/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ search:

## Overview

You can use [EventListener](../../reference/griptape/events/event_listener.md)s to listen for events during a Structure's execution.
You can configure the global [event_bus](../../reference/griptape/events/event_bus.md) with [EventListener](../../reference/griptape/events/event_listener.md)s to listen for various framework events.
See [Event Listener Drivers](../drivers/event-listener-drivers.md) for examples on forwarding events to external services.

## Specific Event Types
Expand All @@ -23,15 +23,14 @@ from griptape.events import (
StartPromptEvent,
FinishPromptEvent,
EventListener,
event_bus
)


def handler(event: BaseEvent):
print(event.__class__)


agent = Agent(
event_listeners=[
event_bus.add_event_listeners([
EventListener(
handler,
event_types=[
Expand All @@ -43,8 +42,9 @@ agent = Agent(
FinishPromptEvent,
],
)
]
)
])

agent = Agent()

agent.run("tell me about griptape")
```
Expand All @@ -69,7 +69,8 @@ Or listen to all events:

```python
from griptape.structures import Agent
from griptape.events import BaseEvent, EventListener
from griptape.events import BaseEvent, EventListener, event_bus



def handler1(event: BaseEvent):
Expand All @@ -79,14 +80,14 @@ def handler1(event: BaseEvent):
def handler2(event: BaseEvent):
print("Handler 2", event.__class__)


agent = Agent(
event_listeners=[
event_bus.add_event_listeners([
EventListener(handler1),
EventListener(handler2),
]
)

agent = Agent()

agent.run("tell me about griptape")
```

Expand Down Expand Up @@ -131,24 +132,26 @@ Handler 2 <class 'griptape.events.finish_structure_run_event.FinishStructureRunE
You can use the [CompletionChunkEvent](../../reference/griptape/events/completion_chunk_event.md) to stream the completion results from Prompt Drivers.

```python
from griptape.events import CompletionChunkEvent, EventListener
from griptape.events import CompletionChunkEvent, EventListener, event_bus
from griptape.tasks import ToolkitTask
from griptape.structures import Pipeline
from griptape.tools import WebScraper, TaskMemoryClient
from griptape.config import OpenAiStructureConfig
from griptape.drivers import OpenAiChatPromptDriver



event_bus.add_event_listeners([
EventListener(
lambda e: print(e.token, end="", flush=True),
event_types=[CompletionChunkEvent],
)
])

pipeline = Pipeline(
config=OpenAiStructureConfig(
prompt_driver=OpenAiChatPromptDriver(model="gpt-4o", stream=True)
),
event_listeners=[
EventListener(
lambda e: print(e.token, end="", flush=True),
event_types=[CompletionChunkEvent],
)
],
)

pipeline.add_tasks(
Expand Down Expand Up @@ -186,26 +189,25 @@ To count tokens, you can use Event Listeners and the [TokenCounter](../../refere

```python
from griptape import utils
from griptape.events import BaseEvent, StartPromptEvent, FinishPromptEvent, EventListener
from griptape.events import BaseEvent, StartPromptEvent, FinishPromptEvent, EventListener, event_bus
from griptape.structures import Agent


token_counter = utils.TokenCounter()

event_bus.add_event_listeners([
EventListener(
lambda e: token_counter.add_tokens(e.token_count),
event_types=[StartPromptEvent, FinishPromptEvent],
)
])

def count_tokens(e: BaseEvent):
if isinstance(e, StartPromptEvent) or isinstance(e, FinishPromptEvent):
token_counter.add_tokens(e.token_count)


agent = Agent(
event_listeners=[
EventListener(
handler=lambda e: count_tokens(e),
event_types=[StartPromptEvent, FinishPromptEvent],
)
]
)
agent = Agent()

agent.run("tell me about large language models")

Expand Down Expand Up @@ -244,9 +246,11 @@ You can use the [StartPromptEvent](../../reference/griptape/events/start_prompt_

```python
from griptape.structures import Agent
from griptape.events import BaseEvent, StartPromptEvent, EventListener
from griptape.events import BaseEvent, StartPromptEvent, EventListener, event_bus


event_bus.add_event_listeners([EventListener(handler=lambda e: print(e), event_types=[StartPromptEvent])])

def handler(event: BaseEvent):
if isinstance(event, StartPromptEvent):
print("Prompt Stack Messages:")
Expand All @@ -256,9 +260,7 @@ def handler(event: BaseEvent):
print(event.prompt)


agent = Agent(
event_listeners=[EventListener(handler=handler, event_types=[StartPromptEvent])]
)
agent = Agent()

agent.run("Write me a poem.")
```
Expand Down
Loading

0 comments on commit 443cf29

Please sign in to comment.