From 1b1accc5ded4a11c7cf335dc8028bf68db6f952b Mon Sep 17 00:00:00 2001 From: Kyle Roche Date: Tue, 4 Jun 2024 09:59:22 -0700 Subject: [PATCH] Feature/pusher event driver (#821) Co-authored-by: Collin Dutter --- .github/workflows/docs-integration-tests.yml | 4 + CHANGELOG.md | 1 + .../drivers/event-listener-drivers.md | 35 +++++++++ griptape/drivers/__init__.py | 2 + .../pusher_event_listener_driver.py | 39 ++++++++++ poetry.lock | 73 ++++++++++++++++++- pyproject.toml | 3 + .../test_pusher_event_listener_driver.py | 42 +++++++++++ 8 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 griptape/drivers/event_listener/pusher_event_listener_driver.py create mode 100644 tests/unit/drivers/event_listener/test_pusher_event_listener_driver.py diff --git a/.github/workflows/docs-integration-tests.yml b/.github/workflows/docs-integration-tests.yml index eb3621870..a87810774 100644 --- a/.github/workflows/docs-integration-tests.yml +++ b/.github/workflows/docs-integration-tests.yml @@ -112,6 +112,10 @@ jobs: AWS_IOT_CORE_ENDPOINT: ${{ secrets.INTEG_AWS_IOT_CORE_ENDPOINT }} AWS_IOT_CORE_TOPIC: ${{ secrets.INTEG_AWS_IOT_CORE_TOPIC }} ELEVEN_LABS_API_KEY: ${{ secrets.INTEG_ELEVEN_LABS_API_KEY }} + PUSHER_APP_ID: ${{ secrets.INTEG_PUSHER_APP_ID }} + PUSHER_KEY: ${{ secrets.INTEG_PUSHER_KEY }} + PUSHER_SECRET: ${{ secrets.INTEG_PUSHER_SECRET }} + PUSHER_CLUSTER: ${{ secretes.INTEG_PUSHER_CLUSTER }} services: postgres: image: ankane/pgvector:v0.5.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index cf0d8ed36..62a9f49ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `AudioTranscriptionTask` and `AudioTranscriptionClient` for transcribing audio content in Structures. - `OpenAiAudioTranscriptionDriver` for integration with OpenAI's speech-to-text models, including Whisper. - Parameter `env` to `BaseStructureRunDriver` to set environment variables for a Structure Run. +- `PusherEventListenerDriver` to enable sending of framework events over a Pusher WebSocket. ### Changed - **BREAKING**: Updated OpenAI-based image query drivers to remove Vision from the name. diff --git a/docs/griptape-framework/drivers/event-listener-drivers.md b/docs/griptape-framework/drivers/event-listener-drivers.md index d35489f98..6e8f59b22 100644 --- a/docs/griptape-framework/drivers/event-listener-drivers.md +++ b/docs/griptape-framework/drivers/event-listener-drivers.md @@ -212,4 +212,39 @@ agent = Agent( agent.run("Analyze the pros and cons of remote work vs. office work") ``` +### Pusher Event Listener Driver +!!! info + This driver requires the `drivers-event-listener-pusher` [extra](../index.md#extras). + +The [PusherEventListenerDriver](../../reference/griptape/drivers/event_listener/pusher_event_listener_driver.md) sends Events to [Pusher](https://pusher.com). + +```python +import os +from griptape.drivers import PusherEventListenerDriver +from griptape.events import ( + EventListener, + FinishStructureRunEvent +) +from griptape.structures import Agent + +agent = Agent( + event_listeners=[ + EventListener( + event_types=[FinishStructureRunEvent], + driver=PusherEventListenerDriver( + batched=False, + app_id=os.environ["PUSHER_APP_ID"], + key=os.environ["PUSHER_KEY"], + secret=os.environ["PUSHER_SECRET"], + cluster=os.environ["PUSHER_CLUSTER"], + channel='my-channel', + event_name='my-event' + ), + ), + ], +) + +agent.run("Analyze the pros and cons of remote work vs. office work") + +``` diff --git a/griptape/drivers/__init__.py b/griptape/drivers/__init__.py index 6c64756e1..f44602f8b 100644 --- a/griptape/drivers/__init__.py +++ b/griptape/drivers/__init__.py @@ -93,6 +93,7 @@ from .event_listener.webhook_event_listener_driver import WebhookEventListenerDriver from .event_listener.aws_iot_core_event_listener_driver import AwsIotCoreEventListenerDriver from .event_listener.griptape_cloud_event_listener_driver import GriptapeCloudEventListenerDriver +from .event_listener.pusher_event_listener_driver import PusherEventListenerDriver from .file_manager.base_file_manager_driver import BaseFileManagerDriver from .file_manager.local_file_manager_driver import LocalFileManagerDriver @@ -193,6 +194,7 @@ "WebhookEventListenerDriver", "AwsIotCoreEventListenerDriver", "GriptapeCloudEventListenerDriver", + "PusherEventListenerDriver", "BaseFileManagerDriver", "LocalFileManagerDriver", "AmazonS3FileManagerDriver", diff --git a/griptape/drivers/event_listener/pusher_event_listener_driver.py b/griptape/drivers/event_listener/pusher_event_listener_driver.py new file mode 100644 index 000000000..d41b679d7 --- /dev/null +++ b/griptape/drivers/event_listener/pusher_event_listener_driver.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING +from attrs import define, field, Factory +from griptape.drivers import BaseEventListenerDriver +from griptape.utils import import_optional_dependency + +if TYPE_CHECKING: + from pusher import Pusher + + +@define +class PusherEventListenerDriver(BaseEventListenerDriver): + app_id: str = field(kw_only=True) + key: str = field(kw_only=True) + secret: str = field(kw_only=True) + cluster: str = field(kw_only=True) + channel: str = field(kw_only=True) + event_name: str = field(kw_only=True) + pusher_client: Pusher = field( + default=Factory( + lambda self: import_optional_dependency("pusher").Pusher( + app_id=self.app_id, key=self.key, secret=self.secret, cluster=self.cluster, ssl=True + ), + takes_self=True, + ), + kw_only=True, + ) + + def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: + data = [ + {"channel": self.channel, "name": self.event_name, "data": event_payload} + for event_payload in event_payload_batch + ] + + self.pusher_client.trigger_batch(data) + + def try_publish_event_payload(self, event_payload: dict) -> None: + self.pusher_client.trigger(channels=self.channel, event_name=self.event_name, data=event_payload) diff --git a/poetry.lock b/poetry.lock index 7e6804fcc..7d78e319e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiohttp" @@ -2974,6 +2974,22 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.12\""} +[[package]] +name = "ndg-httpsclient" +version = "0.5.1" +description = "Provides enhanced HTTPS support for httplib and urllib2 using PyOpenSSL" +optional = true +python-versions = ">=2.7,<3.0.dev0 || >=3.4.dev0" +files = [ + {file = "ndg_httpsclient-0.5.1-py2-none-any.whl", hash = "sha256:d2c7225f6a1c6cf698af4ebc962da70178a99bcde24ee6d1961c4f3338130d57"}, + {file = "ndg_httpsclient-0.5.1-py3-none-any.whl", hash = "sha256:dd174c11d971b6244a891f7be2b32ca9853d3797a72edb34fa5d7b07d8fff7d4"}, + {file = "ndg_httpsclient-0.5.1.tar.gz", hash = "sha256:d72faed0376ab039736c2ba12e30695e2788c4aa569c9c3e3d72131de2592210"}, +] + +[package.dependencies] +pyasn1 = ">=0.1.1" +PyOpenSSL = "*" + [[package]] name = "networkx" version = "3.2.1" @@ -3747,6 +3763,30 @@ files = [ [package.extras] tests = ["pytest"] +[[package]] +name = "pusher" +version = "3.3.2" +description = "A Python library to interract with the Pusher Channels API" +optional = true +python-versions = "*" +files = [ + {file = "pusher-3.3.2-py2.py3-none-any.whl", hash = "sha256:4ae2d2dc77eb28043da042796992b8b41454f458c4d8c8e151f7498f2c4374c7"}, + {file = "pusher-3.3.2.tar.gz", hash = "sha256:14f412c8e26562aaf663a114951792fefae01f0d220dac84971f926404620760"}, +] + +[package.dependencies] +ndg-httpsclient = "*" +pyasn1 = "*" +pynacl = "*" +pyopenssl = "*" +requests = ">=2.3.0" +six = "*" +urllib3 = "*" + +[package.extras] +aiohttp = ["aiohttp (>=0.20.0)"] +tornado = ["tornado (>=5.0.0)"] + [[package]] name = "py-partiql-parser" version = "0.5.0" @@ -3996,6 +4036,32 @@ snappy = ["python-snappy"] test = ["pytest (>=7)"] zstd = ["zstandard"] +[[package]] +name = "pynacl" +version = "1.5.0" +description = "Python binding to the Networking and Cryptography (NaCl) library" +optional = true +python-versions = ">=3.6" +files = [ + {file = "PyNaCl-1.5.0-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:401002a4aaa07c9414132aaed7f6836ff98f59277a234704ff66878c2ee4a0d1"}, + {file = "PyNaCl-1.5.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:52cb72a79269189d4e0dc537556f4740f7f0a9ec41c1322598799b0bdad4ef92"}, + {file = "PyNaCl-1.5.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a36d4a9dda1f19ce6e03c9a784a2921a4b726b02e1c736600ca9c22029474394"}, + {file = "PyNaCl-1.5.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:0c84947a22519e013607c9be43706dd42513f9e6ae5d39d3613ca1e142fba44d"}, + {file = "PyNaCl-1.5.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06b8f6fa7f5de8d5d2f7573fe8c863c051225a27b61e6860fd047b1775807858"}, + {file = "PyNaCl-1.5.0-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:a422368fc821589c228f4c49438a368831cb5bbc0eab5ebe1d7fac9dded6567b"}, + {file = "PyNaCl-1.5.0-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:61f642bf2378713e2c2e1de73444a3778e5f0a38be6fee0fe532fe30060282ff"}, + {file = "PyNaCl-1.5.0-cp36-abi3-win32.whl", hash = "sha256:e46dae94e34b085175f8abb3b0aaa7da40767865ac82c928eeb9e57e1ea8a543"}, + {file = "PyNaCl-1.5.0-cp36-abi3-win_amd64.whl", hash = "sha256:20f42270d27e1b6a29f54032090b972d97f0a1b0948cc52392041ef7831fee93"}, + {file = "PyNaCl-1.5.0.tar.gz", hash = "sha256:8ac7448f09ab85811607bdd21ec2464495ac8b7c66d146bf545b0f08fb9220ba"}, +] + +[package.dependencies] +cffi = ">=1.4.1" + +[package.extras] +docs = ["sphinx (>=1.6.5)", "sphinx-rtd-theme"] +tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"] + [[package]] name = "pyopenssl" version = "24.1.0" @@ -5995,7 +6061,7 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.link testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] [extras] -all = ["anthropic", "beautifulsoup4", "boto3", "cohere", "elevenlabs", "filetype", "google-generativeai", "mail-parser", "markdownify", "marqo", "opensearch-py", "pandas", "pgvector", "pillow", "pinecone-client", "playwright", "psycopg2-binary", "pymongo", "pypdf", "redis", "snowflake-sqlalchemy", "sqlalchemy-redshift", "torch", "trafilatura", "transformers", "voyageai"] +all = ["anthropic", "beautifulsoup4", "boto3", "cohere", "elevenlabs", "filetype", "google-generativeai", "mail-parser", "markdownify", "marqo", "opensearch-py", "pandas", "pgvector", "pillow", "pinecone-client", "playwright", "psycopg2-binary", "pusher", "pymongo", "pypdf", "redis", "snowflake-sqlalchemy", "sqlalchemy-redshift", "torch", "trafilatura", "transformers", "voyageai"] drivers-embedding-amazon-bedrock = ["boto3"] drivers-embedding-amazon-sagemaker = ["boto3"] drivers-embedding-google = ["google-generativeai"] @@ -6003,6 +6069,7 @@ drivers-embedding-huggingface = ["huggingface-hub", "transformers"] drivers-embedding-voyageai = ["voyageai"] drivers-event-listener-amazon-iot = ["boto3"] drivers-event-listener-amazon-sqs = ["boto3"] +drivers-event-listener-pusher = ["pusher"] drivers-memory-conversation-amazon-dynamodb = ["boto3"] drivers-memory-conversation-redis = ["redis"] drivers-prompt-amazon-bedrock = ["anthropic", "boto3"] @@ -6033,4 +6100,4 @@ loaders-pdf = ["pypdf"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "da89e6bb7aa395fd5badc416ec859e574416ea1fe07ee6f93035fe6a7e314dcd" +content-hash = "b035e9419f6dbc25ee6663b81c49730946b9990641e14aa656080915cb4735b2" diff --git a/pyproject.toml b/pyproject.toml index 55ae82ce2..f9b4d1ba7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ markdownify = {version = "^0.11.6", optional = true} voyageai = {version = "^0.2.1", optional = true} elevenlabs = {version = "^1.1.2", optional = true} torch = {version = "^2.3.0", optional = true} +pusher = {version = "^3.3.2", optional = true} # loaders pandas = {version = "^1.3", optional = true} @@ -96,6 +97,7 @@ drivers-web-scraper-markdownify = ["playwright", "beautifulsoup4", "markdownify" drivers-event-listener-amazon-sqs = ["boto3"] drivers-event-listener-amazon-iot = ["boto3"] +drivers-event-listener-pusher = ["pusher"] loaders-dataframe = ["pandas"] loaders-pdf = ["pypdf"] @@ -128,6 +130,7 @@ all = [ "voyageai", "elevenlabs", "torch", + "pusher", # loaders "pandas", diff --git a/tests/unit/drivers/event_listener/test_pusher_event_listener_driver.py b/tests/unit/drivers/event_listener/test_pusher_event_listener_driver.py new file mode 100644 index 000000000..6f0636b5c --- /dev/null +++ b/tests/unit/drivers/event_listener/test_pusher_event_listener_driver.py @@ -0,0 +1,42 @@ +from pytest import fixture +from tests.mocks.mock_event import MockEvent +from griptape.drivers import PusherEventListenerDriver +from unittest.mock import Mock + + +class TestPusherEventListenerDriver: + @fixture(autouse=True) + def mock_post(self, mocker): + mock_pusher_client = mocker.patch("pusher.Pusher") + mock_pusher_client.return_value.trigger.return_value = Mock() + mock_pusher_client.return_value.trigger_batch.return_value = Mock() + + return mock_pusher_client + + @fixture() + def driver(self): + return PusherEventListenerDriver( + app_id="test-app-id", + key="test-key", + secret="test-secret", + cluster="test-cluster", + channel="test-channel", + event_name="test-event", + ) + + def test_init(self, driver): + assert driver + + def test_try_publish_event_payload(self, driver): + data = MockEvent().to_dict() + driver.try_publish_event_payload(data) + + driver.pusher_client.trigger.assert_called_with(channels="test-channel", event_name="test-event", data=data) + + def test_try_publish_event_payload_batch(self, driver): + data = [MockEvent().to_dict() for _ in range(3)] + driver.try_publish_event_payload_batch(data) + + driver.pusher_client.trigger_batch.assert_called_with( + [{"channel": "test-channel", "name": "test-event", "data": data[i]} for i in range(3)] + )