diff --git a/python/docs/source/examples/reddit.md b/python/docs/source/examples/reddit.md new file mode 100644 index 000000000..cef2dad0e --- /dev/null +++ b/python/docs/source/examples/reddit.md @@ -0,0 +1,73 @@ +# Reddit Live Example + +In this example, we'll show how you can receive and process the firehose using Kaskada. + +You can see the full example in the file [reddit.py](https://github.com/kaskada-ai/kaskada/blob/main/python/docs/source/examples/reddit.py). + +## Setup Reddit credentials + +Follow Reddit's [First Steps](https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example#first-steps) guide to create an App and obtain a client ID and secret. +The "script" type application is sufficient for this example. + +## Setup the event data source + +Before we can receive events from Reddit, we need to create a data source to tell Kaskada how to handle the events. +We'll provide a schema and configure the time and entity fields. + +```{literalinclude} reddit.py +:language: python +:start-after: "[start_setup]" +:end-before: "[end_setup]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Define the incoming event handler + +The `asyncpraw` python library takes care of requesting and receiving events from Reddit, all you need to do is create a handler to configure what to do with each event. +This handler converts [Comment](https://praw.readthedocs.io/en/stable/code_overview/models/comment.html#praw.models.Comment) messages into a dict, and passes the dict to Kaskada. + +```{literalinclude} reddit.py +:language: python +:start-after: "[start_incoming]" +:end-before: "[end_incoming]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Construct a real-time query and result handler + +Now we can use Kaskada to transform the events as they arrive. +First we'll use `with_key` to regroup events by author, then we'll apply a simple `count` aggregation. +Finally, we create a handler for the transformed results - here just printing them out. + + +```{literalinclude} reddit.py +:language: python +:start-after: "[start_result]" +:end-before: "[end_result]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Final touches + +Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing. + +```{literalinclude} reddit.py +:start-after: "[start_run]" +:end-before: "[end_run]" +:language: python +:linenos: +:lineno-match: +:dedent: 4 +``` + +Try running it yourself and playing different transformations! + +```bash +python reddit.py +``` diff --git a/python/docs/source/examples/reddit.py b/python/docs/source/examples/reddit.py new file mode 100644 index 000000000..322918a1e --- /dev/null +++ b/python/docs/source/examples/reddit.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# +# Reddit Kaskada Consumer +# +# This script demonstrates the use of Kaskada to consume and compute over +# Reddit updates. + +import os +import asyncio +import time +import kaskada as kd +import pyarrow as pa + +import asyncpraw + +async def main(): + # Initialize the Kaskada session so we can use it for stream processing + kd.init_session() + + # Create the Reddit client. + reddit = asyncpraw.Reddit( + client_id=os.getenv('REDDIT_CLIENT_ID'), + client_secret=os.getenv('REDDIT_CLIENT_SECRET'), + user_agent=os.getenv('REDDIT_USER_AGENT', 'kaskada-demo'), + ) + + # [start_setup] + # Setup the data source. + # This defintes (most of) the schema of the events we'll receive, + # and tells Kaskada which fields to use for time and initial entity. + # + # We'll push events into this source as they arrive in real-time. + comments = kd.sources.PyDict( + schema=pa.schema([ + pa.field("author", pa.string()), + pa.field("body", pa.string()), + pa.field("permalink", pa.string()), + pa.field("submission_id", pa.string()), + pa.field("subreddit", pa.string()), + pa.field("ts", pa.float64()), + ]), + time_column="ts", + key_column="submission_id", + time_unit="s", + ) + # [end_setup] + + # [start_incoming] + # Handler to receive new comments as they're created + async def receive_comments(): + # Creat the subreddit handle + sr = await reddit.subreddit(os.getenv("SUBREDDIT", "all")) + + # Consume the stream of new comments + async for comment in sr.stream.comments(): + # Add each comment to the Kaskada data source + await comments.add_rows({ + "author": comment.author.name, + "body": comment.body, + "permalink": comment.permalink, + "submission_id": comment.submission.id, + "subreddit_id": comment.subreddit.display_name, + "ts": time.time(), + }) + # [start_incoming] + + # [start_result] + # Handler for values emitted by Kaskada. + async def receive_outputs(): + + # We'll perform a very simple aggregation - key by author and count. + comments_by_author = comments.with_key(comments.col("author")) + + # Consume outputs as they're generated and print to STDOUT. + async for row in comments_by_author.count().run_iter(kind="row", mode="live"): + print(f"{row['_key']} has posted {row['result']} times since startup") + # [end_result] + + # [start_run] + # Kickoff the two async processes concurrently. + await asyncio.gather(receive_comments(), receive_outputs()) + # [end_run] + +if __name__ == "__main__": + asyncio.run(main())