Skip to content

Commit

Permalink
Add an example reading from Reddit (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerinin authored Sep 18, 2023
1 parent 2da66a0 commit a916379
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/docs/source/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

The [Time-Centric Calculations example](./time_centric.ipynb) shows how to work with time and produce past training examples and recent results for applying models.

The [Reddit example](./reddit.md) shows how to read and aggregate live messages from Reddit.

The [Bluesky Firehose example](./bluesky.md) shows how to read and aggregate messages from the Bluesky firehose.
This demonstrates how to use Kaskada to connect in real-time and parse messages as part of the query.

Expand All @@ -10,5 +12,6 @@ This demonstrates how to use Kaskada to connect in real-time and parse messages
:maxdepth: 2
time_centric
reddit
bluesky
```
73 changes: 73 additions & 0 deletions python/docs/source/examples/reddit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Reddit Live Example

In this example, we'll show how you can receive and process Reddit comments using Kaskada.

You can see the full example in the file [reddit.py](https://github.com/kaskada-ai/kaskada/blob/main/python/docs/source/examples/reddit.py).

## Setup Reddit credentials

Follow Reddit's [First Steps](https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example#first-steps) guide to create an App and obtain a client ID and secret.
The "script" type application is sufficient for this example.

## Setup the event data source

Before we can receive events from Reddit, we need to create a data source to tell Kaskada how to handle the events.
We'll provide a schema and configure the time and entity fields.

```{literalinclude} reddit.py
:language: python
:start-after: "[start_setup]"
:end-before: "[end_setup]"
:linenos:
:lineno-match:
:dedent: 4
```

## Define the incoming event handler

The `asyncpraw` python library takes care of requesting and receiving events from Reddit, all you need to do is create a handler to configure what to do with each event.
This handler converts [Comment](https://praw.readthedocs.io/en/stable/code_overview/models/comment.html#praw.models.Comment) messages into a dict, and passes the dict to Kaskada.

```{literalinclude} reddit.py
:language: python
:start-after: "[start_incoming]"
:end-before: "[end_incoming]"
:linenos:
:lineno-match:
:dedent: 4
```

## Construct a real-time query and result handler

Now we can use Kaskada to transform the events as they arrive.
First we'll use `with_key` to regroup events by author, then we'll apply a simple `count` aggregation.
Finally, we create a handler for the transformed results - here just printing them out.


```{literalinclude} reddit.py
:language: python
:start-after: "[start_result]"
:end-before: "[end_result]"
:linenos:
:lineno-match:
:dedent: 4
```

## Final touches

Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing.

```{literalinclude} reddit.py
:start-after: "[start_run]"
:end-before: "[end_run]"
:language: python
:linenos:
:lineno-match:
:dedent: 4
```

Try running it yourself and playing different transformations!

```bash
python reddit.py
```
86 changes: 86 additions & 0 deletions python/docs/source/examples/reddit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python
#
# Reddit Kaskada Consumer
#
# This script demonstrates the use of Kaskada to consume and compute over
# Reddit updates.

import os
import asyncio
import time
import kaskada as kd
import pyarrow as pa

import asyncpraw


async def main():
# Initialize the Kaskada session so we can use it for stream processing
kd.init_session()

# Create the Reddit client.
reddit = asyncpraw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT', 'kaskada-demo'),
)

# [start_setup]
# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
#
# We'll push events into this source as they arrive in real-time.
comments = kd.sources.PyDict(
schema=pa.schema([
pa.field("author", pa.string()),
pa.field("body", pa.string()),
pa.field("permalink", pa.string()),
pa.field("submission_id", pa.string()),
pa.field("subreddit", pa.string()),
pa.field("ts", pa.float64()),
]),
time_column="ts",
key_column="submission_id",
time_unit="s",
)
# [end_setup]

# [start_incoming]
# Handler to receive new comments as they're created
async def receive_comments():
# Creat the subreddit handle
sr = await reddit.subreddit(os.getenv("SUBREDDIT", "all"))

# Consume the stream of new comments
async for comment in sr.stream.comments():
# Add each comment to the Kaskada data source
await comments.add_rows({
"author": comment.author.name,
"body": comment.body,
"permalink": comment.permalink,
"submission_id": comment.submission.id,
"subreddit_id": comment.subreddit.display_name,
"ts": time.time(),
})
# [end_incoming]

# [start_result]
# Handler for values emitted by Kaskada.
async def receive_outputs():

# We'll perform a very simple aggregation - key by author and count.
comments_by_author = comments.with_key(comments.col("author"))

# Consume outputs as they're generated and print to STDOUT.
async for row in comments_by_author.count().run_iter(kind="row", mode="live"):
print(f"{row['_key']} has posted {row['result']} times since startup")
# [end_result]

# [start_run]
# Kickoff the two async processes concurrently.
await asyncio.gather(receive_comments(), receive_outputs())
# [end_run]

if __name__ == "__main__":
asyncio.run(main())

0 comments on commit a916379

Please sign in to comment.