From a9163791a50dc737361d1ffc1c84c90e98c23ae1 Mon Sep 17 00:00:00 2001 From: Ryan Michael Date: Mon, 18 Sep 2023 13:18:57 -0700 Subject: [PATCH] Add an example reading from Reddit (#757) --- python/docs/source/examples/index.md | 3 + python/docs/source/examples/reddit.md | 73 +++++++++++++++++++++++ python/docs/source/examples/reddit.py | 86 +++++++++++++++++++++++++++ 3 files changed, 162 insertions(+) create mode 100644 python/docs/source/examples/reddit.md create mode 100644 python/docs/source/examples/reddit.py diff --git a/python/docs/source/examples/index.md b/python/docs/source/examples/index.md index 3cea7f02d..3c8fa7cbe 100644 --- a/python/docs/source/examples/index.md +++ b/python/docs/source/examples/index.md @@ -2,6 +2,8 @@ The [Time-Centric Calculations example](./time_centric.ipynb) shows how to work with time and produce past training examples and recent results for applying models. +The [Reddit example](./reddit.md) shows how to read and aggregate live messages from Reddit. + The [Bluesky Firehose example](./bluesky.md) shows how to read and aggregate messages from the Bluesky firehose. This demonstrates how to use Kaskada to connect in real-time and parse messages as part of the query. @@ -10,5 +12,6 @@ This demonstrates how to use Kaskada to connect in real-time and parse messages :maxdepth: 2 time_centric +reddit bluesky ``` \ No newline at end of file diff --git a/python/docs/source/examples/reddit.md b/python/docs/source/examples/reddit.md new file mode 100644 index 000000000..9e7516f38 --- /dev/null +++ b/python/docs/source/examples/reddit.md @@ -0,0 +1,73 @@ +# Reddit Live Example + +In this example, we'll show how you can receive and process Reddit comments using Kaskada. + +You can see the full example in the file [reddit.py](https://github.com/kaskada-ai/kaskada/blob/main/python/docs/source/examples/reddit.py). + +## Setup Reddit credentials + +Follow Reddit's [First Steps](https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example#first-steps) guide to create an App and obtain a client ID and secret. +The "script" type application is sufficient for this example. + +## Setup the event data source + +Before we can receive events from Reddit, we need to create a data source to tell Kaskada how to handle the events. +We'll provide a schema and configure the time and entity fields. + +```{literalinclude} reddit.py +:language: python +:start-after: "[start_setup]" +:end-before: "[end_setup]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Define the incoming event handler + +The `asyncpraw` python library takes care of requesting and receiving events from Reddit, all you need to do is create a handler to configure what to do with each event. +This handler converts [Comment](https://praw.readthedocs.io/en/stable/code_overview/models/comment.html#praw.models.Comment) messages into a dict, and passes the dict to Kaskada. + +```{literalinclude} reddit.py +:language: python +:start-after: "[start_incoming]" +:end-before: "[end_incoming]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Construct a real-time query and result handler + +Now we can use Kaskada to transform the events as they arrive. +First we'll use `with_key` to regroup events by author, then we'll apply a simple `count` aggregation. +Finally, we create a handler for the transformed results - here just printing them out. + + +```{literalinclude} reddit.py +:language: python +:start-after: "[start_result]" +:end-before: "[end_result]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Final touches + +Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing. + +```{literalinclude} reddit.py +:start-after: "[start_run]" +:end-before: "[end_run]" +:language: python +:linenos: +:lineno-match: +:dedent: 4 +``` + +Try running it yourself and playing different transformations! + +```bash +python reddit.py +``` diff --git a/python/docs/source/examples/reddit.py b/python/docs/source/examples/reddit.py new file mode 100644 index 000000000..21a6cf0f1 --- /dev/null +++ b/python/docs/source/examples/reddit.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# +# Reddit Kaskada Consumer +# +# This script demonstrates the use of Kaskada to consume and compute over +# Reddit updates. + +import os +import asyncio +import time +import kaskada as kd +import pyarrow as pa + +import asyncpraw + + +async def main(): + # Initialize the Kaskada session so we can use it for stream processing + kd.init_session() + + # Create the Reddit client. + reddit = asyncpraw.Reddit( + client_id=os.getenv('REDDIT_CLIENT_ID'), + client_secret=os.getenv('REDDIT_CLIENT_SECRET'), + user_agent=os.getenv('REDDIT_USER_AGENT', 'kaskada-demo'), + ) + + # [start_setup] + # Setup the data source. + # This defintes (most of) the schema of the events we'll receive, + # and tells Kaskada which fields to use for time and initial entity. + # + # We'll push events into this source as they arrive in real-time. + comments = kd.sources.PyDict( + schema=pa.schema([ + pa.field("author", pa.string()), + pa.field("body", pa.string()), + pa.field("permalink", pa.string()), + pa.field("submission_id", pa.string()), + pa.field("subreddit", pa.string()), + pa.field("ts", pa.float64()), + ]), + time_column="ts", + key_column="submission_id", + time_unit="s", + ) + # [end_setup] + + # [start_incoming] + # Handler to receive new comments as they're created + async def receive_comments(): + # Creat the subreddit handle + sr = await reddit.subreddit(os.getenv("SUBREDDIT", "all")) + + # Consume the stream of new comments + async for comment in sr.stream.comments(): + # Add each comment to the Kaskada data source + await comments.add_rows({ + "author": comment.author.name, + "body": comment.body, + "permalink": comment.permalink, + "submission_id": comment.submission.id, + "subreddit_id": comment.subreddit.display_name, + "ts": time.time(), + }) + # [end_incoming] + + # [start_result] + # Handler for values emitted by Kaskada. + async def receive_outputs(): + + # We'll perform a very simple aggregation - key by author and count. + comments_by_author = comments.with_key(comments.col("author")) + + # Consume outputs as they're generated and print to STDOUT. + async for row in comments_by_author.count().run_iter(kind="row", mode="live"): + print(f"{row['_key']} has posted {row['result']} times since startup") + # [end_result] + + # [start_run] + # Kickoff the two async processes concurrently. + await asyncio.gather(receive_comments(), receive_outputs()) + # [end_run] + +if __name__ == "__main__": + asyncio.run(main())