Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Bytewax version to 0.21.0 #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@

import dotenv


def _build_dataflow() -> Dataflow:
flow = Dataflow("supercharged-slackbot")

return flow

flow = Dataflow("supercharged-slackbot")

# Load environment variables from .env
dotenv.load_dotenv()
Expand All @@ -21,6 +16,3 @@ def _build_dataflow() -> Dataflow:
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[logging.StreamHandler()],
)

# Dataflow needs to be assigned to a global variable called "flow"
flow = _build_dataflow()
7 changes: 3 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
bytewax==0.18.0
bytewax==0.21.0
annotated-types==0.6.0
anyio==4.2.0
certifi==2024.2.2
charset-normalizer==3.3.2
colorama==0.4.6
coloredlogs==15.0.1
fastembed==0.1.1
fastembed==0.3.6
flatbuffers==23.5.26
grpcio==1.60.1
grpcio-tools==1.60.1
Expand All @@ -19,7 +19,7 @@ hyperframe==6.0.1
idna==3.6
mpmath==1.3.0
numpy==1.26.4
onnx==1.15.0
onnx==1.17.0
onnxruntime==1.17.0
packaging==23.2
portalocker==2.8.2
Expand All @@ -32,7 +32,6 @@ qdrant-client[fastembed]==1.7.3
requests==2.31.0
sniffio==1.3.0
sympy==1.12
tokenizers==0.13.3
tqdm==4.66.2
typing_extensions==4.9.0
urllib3==2.2.0
Expand Down
33 changes: 13 additions & 20 deletions step1.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

log = logging.getLogger(__name__)

# Load environment variables from .env
dotenv.load_dotenv()


def channel_is(channel: str) -> Callable[[SlackMessage], bool]:
"""Predicate function to check if the message was posted on the given channel."""
Expand All @@ -28,33 +31,23 @@ def _func(msg: SlackMessage) -> bool:
return _func


def _build_dataflow() -> Dataflow:
# Create a bytewax stream object.
flow = Dataflow("supercharged-slackbot")

# Data will be flowing in from the Slack stream.
stream = op.input("input", flow, SlackSource(url=os.environ["SLACK_PROXY_URL"]))

# Inspect will show what entries are in the stream.
op.inspect_debug("debug", stream)
# Create a bytewax stream object.
flow = Dataflow("supercharged-slackbot")

# Filter the messages based on which Slack channel they were posted on.
stream = op.filter("filter_channel", stream, channel_is(os.environ["SLACK_CHANNEL_ID"]))
# Data will be flowing in from the Slack stream.
stream = op.input("input", flow, SlackSource(url=os.environ["SLACK_PROXY_URL"]))

# Output the messages into the console
op.output("output", stream, StdOutSink())
# Inspect will show what entries are in the stream.
op.inspect_debug("debug", stream)

return flow
# Filter the messages based on which Slack channel they were posted on.
stream = op.filter("filter_channel", stream, channel_is(os.environ["SLACK_CHANNEL_ID"]))


# Load environment variables from .env
dotenv.load_dotenv()
# Output the messages into the console
op.output("output", stream, StdOutSink())

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[logging.StreamHandler()],
)

# Dataflow needs to be assigned to a global variable called "flow"
flow = _build_dataflow()
71 changes: 34 additions & 37 deletions step2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from datetime import timedelta
from datetime import timezone

import bytewax.operators as op
import dotenv

import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.operators.window import EventClockConfig
from bytewax.operators.window import TumblingWindow
from bytewax.operators.windowing import EventClock, TumblingWindower

from utils.connectors.slack import SlackMessage
from utils.connectors.slack import SlackSource
Expand Down Expand Up @@ -47,54 +48,50 @@ def get_message_channel(msg: SlackMessage) -> str:
return msg.channel


def _build_dataflow() -> Dataflow:
# Create a bytewax stream object.
flow = Dataflow("supercharged-slackbot")
def get_timestamp(msg) -> datetime:
return msg.timestamp

# Data will be flowing in from the Slack stream.
stream = op.input("input", flow, SlackSource(url=os.environ["SLACK_PROXY_URL"]))

keyed_stream = op.key_on("key_on_channel", stream, get_message_channel)
# Load environment variables from .env
dotenv.load_dotenv()

# Filter the messages based on which Slack channel they were posted on.
filtered_stream = op.filter(
"filter_channel", keyed_stream, channel_is(os.environ["SLACK_CHANNEL_ID"])
)

# Branch the stream into two: one for bot mentions, one for the rest
b_out = op.branch("is_mention", filtered_stream, is_mention)
# Create a bytewax stream object.
flow = Dataflow("supercharged-slackbot")

messages = b_out.falses
mentions = b_out.trues
# Data will be flowing in from the Slack stream.
stream = op.input("input", flow, SlackSource(url=os.environ["SLACK_PROXY_URL"]))

# Inspect what messages got to which stream
op.inspect_debug("message", messages)
op.inspect_debug("mention", mentions)
keyed_stream = op.key_on("key_on_channel", stream, get_message_channel)

# We use windowing to throttle the amount of requests we are making to the
# LLM API.
clock = EventClockConfig(
lambda msg: msg.timestamp, wait_for_system_duration=timedelta(seconds=0)
)
windower = TumblingWindow(
length=timedelta(seconds=10), align_to=datetime(2024, 2, 1, tzinfo=timezone.utc)
)
windowed_messages = op.window.collect_window("window", messages, clock, windower)
# Filter the messages based on which Slack channel they were posted on.
filtered_stream = op.filter(
"filter_channel", keyed_stream, channel_is(os.environ["SLACK_CHANNEL_ID"])
)

# Output the message windows into the console
op.output("output", windowed_messages, StdOutSink())
# Branch the stream into two: one for bot mentions, one for the rest
b_out = op.branch("is_mention", filtered_stream, is_mention)

return flow
messages = b_out.falses
mentions = b_out.trues

# Inspect what messages got to which stream
op.inspect_debug("message", messages)
op.inspect_debug("mention", mentions)

# Load environment variables from .env
dotenv.load_dotenv()
# We use windowing to throttle the amount of requests we are making to the
# LLM API.
clock = EventClock(get_timestamp, wait_for_system_duration=timedelta(seconds=0))
windower = TumblingWindower(
length=timedelta(seconds=10), align_to=datetime(2024, 2, 1, tzinfo=timezone.utc)
)
windowed_messages = win.collect_window("window", messages, clock, windower)

# Output the message windows into the console
op.output("output", windowed_messages.down, StdOutSink())

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[logging.StreamHandler()],
)

# Dataflow needs to be assigned to a global variable called "flow"
flow = _build_dataflow()
117 changes: 58 additions & 59 deletions step3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@

import logging
import os
from typing import Callable
from typing import NewType
from typing import Callable, Optional, NewType
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import dotenv

import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.operators.window import EventClockConfig
from bytewax.operators.window import TumblingWindow
from bytewax.operators.window import WindowMetadata
from bytewax.operators.windowing import EventClock, TumblingWindower, WindowMetadata

import openai

Expand All @@ -25,8 +23,23 @@

log = logging.getLogger(__name__)

# Load environment variables from .env
dotenv.load_dotenv()

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[logging.StreamHandler()],
)


def get_timestamp(msg) -> datetime:
return msg.timestamp


Summary = NewType("Summary", str)


def get_message_channel(msg: SlackMessage) -> str:
"""Extract the channel identifier from a message."""
return msg.channel
Expand Down Expand Up @@ -77,19 +90,23 @@ def __init__(self):
{summary}
"""

def create_initial_state(self) -> Summary:
@classmethod
def create_initial_state(cls) -> Summary:
"""Get initial state for the stateful stream step."""
return Summary("No-one has said anything yet.")

def __call__(
self, previous_state: str, item: tuple[WindowMetadata, list[SlackMessage]]
def new_message(
self, previous_state: Optional[Summary], item: tuple[int, list[SlackMessage]]
) -> tuple[Summary, Summary]:
"""This is called whenewer a new window of messages arrive.

It gets the previous state as the first argument, and returns the new
state and an object to be passed downstream.
"""
_, messages = item # we don't need the window metadata here
if previous_state is None:
previous_state = Summarizer.create_initial_state()

_, messages = item # we don't need the window id

system_prompt = self._prompt.format(summary=previous_state)

Expand All @@ -111,63 +128,45 @@ def __call__(
return new_state, summary


def _build_dataflow() -> Dataflow:
# Create a bytewax stream object.
flow = Dataflow("supercharged-slackbot")
# Create a bytewax stream object.
flow = Dataflow("supercharged-slackbot")

# Data will be flowing in from the Slack stream.
stream = op.input("input", flow, SlackSource(url=os.environ["SLACK_PROXY_URL"]))
# Data will be flowing in from the Slack stream.
stream = op.input("input", flow, SlackSource(url=os.environ["SLACK_PROXY_URL"]))

# Key the stream elements based on the channel id. In here we are not processing
# any channels separately, but this approach very much allows it. The windowing
# step requires a keyed stream, so that's why we are adding it here.
keyed_stream = op.key_on("key_on_channel", stream, get_message_channel)

# Filter the messages based on which Slack channel they were posted on.
filtered_stream = op.filter(
"filter_channel", keyed_stream, channel_is(os.environ["SLACK_CHANNEL_ID"])
)
# Key the stream elements based on the channel id. In here we are not processing
# any channels separately, but this approach very much allows it. The windowing
# step requires a keyed stream, so that's why we are adding it here.
keyed_stream = op.key_on("key_on_channel", stream, get_message_channel)

# Branch the stream into two: one for bot mentions, one for the rest
b_out = op.branch("is_mention", filtered_stream, is_mention)

messages = b_out.falses
mentions = b_out.trues

# Inspect what messages got to which stream
op.inspect_debug("message", messages)
op.inspect_debug("mention", mentions)

# We use windowing to throttle the amount of requests we are making to the
# LLM API.
clock = EventClockConfig(
lambda msg: msg.timestamp, wait_for_system_duration=timedelta(seconds=0)
)
windower = TumblingWindow(
length=timedelta(seconds=10), align_to=datetime(2024, 1, 1, tzinfo=timezone.utc)
)
windowed_messages = op.window.collect_window("window", messages, clock, windower)

# Create a stateful step which keeps track of the current discussion summary
summarizer = Summarizer()
summary_stream = op.stateful_map(
"summarize", windowed_messages, summarizer.create_initial_state, summarizer
)
# Filter the messages based on which Slack channel they were posted on.
filtered_stream = op.filter(
"filter_channel", keyed_stream, channel_is(os.environ["SLACK_CHANNEL_ID"])
)

# Output the message windows into the console
op.output("output", summary_stream, StdOutSink())
# Branch the stream into two: one for bot mentions, one for the rest
b_out = op.branch("is_mention", filtered_stream, is_mention)

return flow
messages = b_out.falses
mentions = b_out.trues

# Inspect what messages got to which stream
op.inspect_debug("message", messages)
op.inspect_debug("mention", mentions)

# Load environment variables from .env
dotenv.load_dotenv()
# We use windowing to throttle the amount of requests we are making to the
# LLM API.
clock = EventClock(get_timestamp, wait_for_system_duration=timedelta(seconds=0))
windower = TumblingWindower(
length=timedelta(seconds=10), align_to=datetime(2024, 1, 1, tzinfo=timezone.utc)
)
windowed_messages = win.collect_window("window", messages, clock, windower)

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-7s %(message)s",
handlers=[logging.StreamHandler()],
# Create a stateful step which keeps track of the current discussion summary
summarizer = Summarizer()
summary_stream = op.stateful_map(
"summarize", windowed_messages.down, summarizer.new_message
)

# Dataflow needs to be assigned to a global variable called "flow"
flow = _build_dataflow()
# Output the message windows into the console
op.output("output", summary_stream, StdOutSink())
Loading