Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example reading from Reddit #757

Merged
merged 6 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/docs/source/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

The [Time-Centric Calculations example](./time_centric.ipynb) shows how to work with time and produce past training examples and recent results for applying models.

The [Reddit example](./reddit.md) shows how to read and aggregate live messages from Reddit.

The [Bluesky Firehose example](./bluesky.md) shows how to read and aggregate messages from the Bluesky firehose.
This demonstrates how to use Kaskada to connect in real-time and parse messages as part of the query.

Expand All @@ -10,5 +12,6 @@ This demonstrates how to use Kaskada to connect in real-time and parse messages
:maxdepth: 2
time_centric
reddit
bluesky
```
73 changes: 73 additions & 0 deletions python/docs/source/examples/reddit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Reddit Live Example
kerinin marked this conversation as resolved.
Show resolved Hide resolved

In this example, we'll show how you can receive and process Reddit comments using Kaskada.

You can see the full example in the file [reddit.py](https://github.com/kaskada-ai/kaskada/blob/main/python/docs/source/examples/reddit.py).

## Setup Reddit credentials

Follow Reddit's [First Steps](https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example#first-steps) guide to create an App and obtain a client ID and secret.
The "script" type application is sufficient for this example.

## Setup the event data source

Before we can receive events from Reddit, we need to create a data source to tell Kaskada how to handle the events.
We'll provide a schema and configure the time and entity fields.

```{literalinclude} reddit.py
:language: python
:start-after: "[start_setup]"
:end-before: "[end_setup]"
:linenos:
:lineno-match:
:dedent: 4
```

## Define the incoming event handler

The `asyncpraw` python library takes care of requesting and receiving events from Reddit, all you need to do is create a handler to configure what to do with each event.
This handler converts [Comment](https://praw.readthedocs.io/en/stable/code_overview/models/comment.html#praw.models.Comment) messages into a dict, and passes the dict to Kaskada.

```{literalinclude} reddit.py
:language: python
:start-after: "[start_incoming]"
:end-before: "[end_incoming]"
:linenos:
:lineno-match:
:dedent: 4
```

## Construct a real-time query and result handler

Now we can use Kaskada to transform the events as they arrive.
First we'll use `with_key` to regroup events by author, then we'll apply a simple `count` aggregation.
Finally, we create a handler for the transformed results - here just printing them out.


```{literalinclude} reddit.py
:language: python
:start-after: "[start_result]"
:end-before: "[end_result]"
:linenos:
:lineno-match:
:dedent: 4
```

## Final touches

Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing.

```{literalinclude} reddit.py
:start-after: "[start_run]"
:end-before: "[end_run]"
:language: python
:linenos:
:lineno-match:
:dedent: 4
```

Try running it yourself and playing different transformations!

```bash
python reddit.py
```
86 changes: 86 additions & 0 deletions python/docs/source/examples/reddit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python
#
# Reddit Kaskada Consumer
#
# This script demonstrates the use of Kaskada to consume and compute over
# Reddit updates.

import os
import asyncio
import time
import kaskada as kd
import pyarrow as pa

import asyncpraw


async def main():
# Initialize the Kaskada session so we can use it for stream processing
kd.init_session()

# Create the Reddit client.
reddit = asyncpraw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT', 'kaskada-demo'),
)

# [start_setup]
# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
#
# We'll push events into this source as they arrive in real-time.
comments = kd.sources.PyDict(
schema=pa.schema([
pa.field("author", pa.string()),
pa.field("body", pa.string()),
pa.field("permalink", pa.string()),
pa.field("submission_id", pa.string()),
pa.field("subreddit", pa.string()),
pa.field("ts", pa.float64()),
]),
time_column="ts",
key_column="submission_id",
time_unit="s",
)
# [end_setup]

# [start_incoming]
# Handler to receive new comments as they're created
async def receive_comments():
# Creat the subreddit handle
sr = await reddit.subreddit(os.getenv("SUBREDDIT", "all"))

# Consume the stream of new comments
async for comment in sr.stream.comments():
# Add each comment to the Kaskada data source
await comments.add_rows({
"author": comment.author.name,
"body": comment.body,
"permalink": comment.permalink,
"submission_id": comment.submission.id,
"subreddit_id": comment.subreddit.display_name,
"ts": time.time(),
})
# [end_incoming]

# [start_result]
# Handler for values emitted by Kaskada.
async def receive_outputs():

# We'll perform a very simple aggregation - key by author and count.
comments_by_author = comments.with_key(comments.col("author"))

# Consume outputs as they're generated and print to STDOUT.
async for row in comments_by_author.count().run_iter(kind="row", mode="live"):
print(f"{row['_key']} has posted {row['result']} times since startup")
# [end_result]

# [start_run]
# Kickoff the two async processes concurrently.
await asyncio.gather(receive_comments(), receive_outputs())
# [end_run]

if __name__ == "__main__":
asyncio.run(main())
Loading