diff --git a/python/docs/source/examples/bluesky.md b/python/docs/source/examples/bluesky.md new file mode 100644 index 000000000..53b3936ee --- /dev/null +++ b/python/docs/source/examples/bluesky.md @@ -0,0 +1,73 @@ +# Bluesky Firehose Example + +Bluesky is a "distributed social network" that aims to improve on some of the perceived shortcomings of X (nee Twitter). +Bluesky uses a distributed protocol name the [AT Protocol](https://atproto.com/) to exchange messages between users, and provides a "firehose" delivering every message sent over the protocol in real-time. + +In this example, we'll show how you can receive and process the firehose using Kaskada. + +You can see the full example in the file [bluesky.py](https://github.com/kaskada-ai/kaskada/blob/main/python/docs/source/examples/bluesky.py). + +## Setup the event data source + +Before we can receive events from Bluesky, we need to create a data source to tell Kaskada how to handle the events. +We'll provide a schema and configure the time and entity fields. + +```{literalinclude} bluesky.py +:language: python +:start-after: "[start_setup]" +:end-before: "[end_setup]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Define the incoming event handler + +The `atproto` python library takes care of requesting and receiving events from Bluesky, all you need to do is create a handler to configure what to do with each event. +This handler parses the message to find [Commit](https://atproto.com/specs/repository#commit-objects) events. +For each Commit, we'll parse out any [Post](https://atproto.com/blog/create-post#post-record-structure) messages. +Finally we do some schema munging to get the Post into the event format we described when creating the data source. + +```{literalinclude} bluesky.py +:language: python +:start-after: "[start_incoming]" +:end-before: "[end_incoming]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Construct a real-time query and result handler + +Now we can use Kaskada to transform the events as they arrive. +First we'll use `with_key` to regroup events by language, then we'll apply a simple `count` aggregation. +Finally, we create a handler for the transformed results - here just printing them out. + + +```{literalinclude} bluesky.py +:language: python +:start-after: "[start_result]" +:end-before: "[end_result]" +:linenos: +:lineno-match: +:dedent: 4 +``` + +## Final touches + +Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing. + +```{literalinclude} bluesky.py +:start-after: "[start_run]" +:end-before: "[end_run]" +:language: python +:linenos: +:lineno-match: +:dedent: 4 +``` + +Try running it yourself and playing different transformations! + +```bash +python bluesky.py +``` diff --git a/python/docs/source/examples/bluesky.py b/python/docs/source/examples/bluesky.py new file mode 100644 index 000000000..f1f6e22a0 --- /dev/null +++ b/python/docs/source/examples/bluesky.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# +# Bluesky Kaskada Consumer +# +# This script demonstrates the use of Kaskada to consume and compute over +# the BlueSky firehose. + +import asyncio +import time +import kaskada as kd +import pyarrow as pa + +from atproto.firehose import AsyncFirehoseSubscribeReposClient, parse_subscribe_repos_message +from atproto import CAR, AtUri, models +from atproto.xrpc_client.models import get_or_create, ids, is_record_type + + +async def main(): + # Initialize the Kaskada session so we can use it for stream processing + kd.init_session() + + # Create the BlueSky (At protocol) client. + # The firehose doesn't (currently) require authentication. + at_client = AsyncFirehoseSubscribeReposClient() + + # [start_setup] + # Setup the data source. + # This defintes (most of) the schema of the events we'll receive, + # and tells Kaskada which fields to use for time and initial entity. + # + # We'll push events into this source as they arrive in real-time. + posts = kd.sources.PyDict( + rows=[], + schema=pa.schema([ + pa.field("record", pa.struct([ + pa.field("created_at", pa.string()), + pa.field("text", pa.string()), + # pa.field("embed", pa.struct([...])), + pa.field("entities", pa.string()), + # pa.field("facets", pa.list_(...)), + pa.field("labels", pa.float64()), + pa.field("langs", pa.list_(pa.string())), + # pa.field("reply", pa.struct([...])), + pa.field("py_type", pa.string()), + ])), + pa.field("uri", pa.string()), + pa.field("cid", pa.string()), + pa.field("author", pa.string()), + pa.field("ts", pa.float64()), + ]), + time_column="ts", + key_column="author", + time_unit="s", + ) + # [end_setup] + + # [start_incoming] + # Handler for newly-arrived messages from BlueSky. + async def receive_at(message) -> None: + # Extract the contents of the message and bail if it's not a "commit" + commit = parse_subscribe_repos_message(message) + if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): + return + + # Get the operations included in the message + ops = _get_ops_by_type(commit) + for new_post in ops["posts"]["created"]: + + # The atproto library's use of schemas is sort of confusing + # This isn't always the expected type and I'm not sure why... + if not isinstance(new_post['record'], models.app.bsky.feed.post.Main): + continue + + # The parsing produces a hot mess of incompatible types, so we build + # a dict from scratch to simplify. + posts.add_rows({ + "record": dict(new_post["record"]), + "uri": new_post["uri"], + "cid": new_post["cid"], + "author": new_post["author"], + "ts": time.time(), + }) + # [end_incoming] + + # [start_result] + # Handler for values emitted by Kaskada. + async def receive_outputs(): + + # We'll perform a very simple aggregation - key by language and count. + posts_by_first_lang = posts.with_key(posts.col("record").col("langs").index(0)) + + # Consume outputs as they're generated and print to STDOUT. + async for row in posts_by_first_lang.count().run_iter(kind="row", mode="live"): + print(f"{row['_key']} has posted {row['result']} times since startup") + # [end_result] + + # [start_run] + # Kickoff the two async processes concurrently. + await asyncio.gather(at_client.start(receive_at), receive_outputs()) + # [end_run] + +# Copied from https://raw.githubusercontent.com/MarshalX/atproto/main/examples/firehose/process_commits.py +def _get_ops_by_type(commit: models.ComAtprotoSyncSubscribeRepos.Commit) -> dict: # noqa: C901, E302 + operation_by_type = { + 'posts': {'created': [], 'deleted': []}, + 'reposts': {'created': [], 'deleted': []}, + 'likes': {'created': [], 'deleted': []}, + 'follows': {'created': [], 'deleted': []}, + } + + car = CAR.from_bytes(commit.blocks) + for op in commit.ops: + uri = AtUri.from_str(f'at://{commit.repo}/{op.path}') + + if op.action == 'update': + # not supported yet + continue + + if op.action == 'create': + if not op.cid: + continue + + create_info = {'uri': str(uri), 'cid': str(op.cid), 'author': commit.repo} + + record_raw_data = car.blocks.get(op.cid) + if not record_raw_data: + continue + + record = get_or_create(record_raw_data, strict=False) + if uri.collection == ids.AppBskyFeedLike and is_record_type(record, ids.AppBskyFeedLike): + operation_by_type['likes']['created'].append({'record': record, **create_info}) + elif uri.collection == ids.AppBskyFeedPost and is_record_type(record, ids.AppBskyFeedPost): + operation_by_type['posts']['created'].append({'record': record, **create_info}) + elif uri.collection == ids.AppBskyFeedRepost and is_record_type(record, ids.AppBskyFeedRepost): + operation_by_type['reposts']['created'].append({'record': record, **create_info}) + elif uri.collection == ids.AppBskyGraphFollow and is_record_type(record, ids.AppBskyGraphFollow): + operation_by_type['follows']['created'].append({'record': record, **create_info}) + + if op.action == 'delete': + if uri.collection == ids.AppBskyFeedLike: + operation_by_type['likes']['deleted'].append({'uri': str(uri)}) + elif uri.collection == ids.AppBskyFeedPost: + operation_by_type['posts']['deleted'].append({'uri': str(uri)}) + elif uri.collection == ids.AppBskyFeedRepost: + operation_by_type['reposts']['deleted'].append({'uri': str(uri)}) + elif uri.collection == ids.AppBskyGraphFollow: + operation_by_type['follows']['deleted'].append({'uri': str(uri)}) + + return operation_by_type + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/docs/source/examples/index.md b/python/docs/source/examples/index.md index f8a309423..3cea7f02d 100644 --- a/python/docs/source/examples/index.md +++ b/python/docs/source/examples/index.md @@ -2,9 +2,13 @@ The [Time-Centric Calculations example](./time_centric.ipynb) shows how to work with time and produce past training examples and recent results for applying models. +The [Bluesky Firehose example](./bluesky.md) shows how to read and aggregate messages from the Bluesky firehose. +This demonstrates how to use Kaskada to connect in real-time and parse messages as part of the query. + ```{toctree} :hidden: :maxdepth: 2 time_centric +bluesky ``` \ No newline at end of file diff --git a/python/poetry.lock b/python/poetry.lock index abd26d098..a07d84a71 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -744,6 +744,7 @@ files = [ {file = "greenlet-2.0.2-cp27-cp27m-win32.whl", hash = "sha256:6c3acb79b0bfd4fe733dff8bc62695283b57949ebcca05ae5c129eb606ff2d74"}, {file = "greenlet-2.0.2-cp27-cp27m-win_amd64.whl", hash = "sha256:283737e0da3f08bd637b5ad058507e578dd462db259f7f6e4c5c365ba4ee9343"}, {file = "greenlet-2.0.2-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d27ec7509b9c18b6d73f2f5ede2622441de812e7b1a80bbd446cb0633bd3d5ae"}, + {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d967650d3f56af314b72df7089d96cda1083a7fc2da05b375d2bc48c82ab3f3c"}, {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:30bcf80dda7f15ac77ba5af2b961bdd9dbc77fd4ac6105cee85b0d0a5fcf74df"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26fbfce90728d82bc9e6c38ea4d038cba20b7faf8a0ca53a9c07b67318d46088"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9190f09060ea4debddd24665d6804b995a9c122ef5917ab26e1566dcc712ceeb"}, @@ -752,6 +753,7 @@ files = [ {file = "greenlet-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:76ae285c8104046b3a7f06b42f29c7b73f77683df18c49ab5af7983994c2dd91"}, {file = "greenlet-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:2d4686f195e32d36b4d7cf2d166857dbd0ee9f3d20ae349b6bf8afc8485b3645"}, {file = "greenlet-2.0.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c4302695ad8027363e96311df24ee28978162cdcdd2006476c43970b384a244c"}, + {file = "greenlet-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d4606a527e30548153be1a9f155f4e283d109ffba663a15856089fb55f933e47"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c48f54ef8e05f04d6eff74b8233f6063cb1ed960243eacc474ee73a2ea8573ca"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1846f1b999e78e13837c93c778dcfc3365902cfb8d1bdb7dd73ead37059f0d0"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a06ad5312349fec0ab944664b01d26f8d1f05009566339ac6f63f56589bc1a2"}, @@ -781,6 +783,7 @@ files = [ {file = "greenlet-2.0.2-cp37-cp37m-win32.whl", hash = "sha256:3f6ea9bd35eb450837a3d80e77b517ea5bc56b4647f5502cd28de13675ee12f7"}, {file = "greenlet-2.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:7492e2b7bd7c9b9916388d9df23fa49d9b88ac0640db0a5b4ecc2b653bf451e3"}, {file = "greenlet-2.0.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:b864ba53912b6c3ab6bcb2beb19f19edd01a6bfcbdfe1f37ddd1778abfe75a30"}, + {file = "greenlet-2.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1087300cf9700bbf455b1b97e24db18f2f77b55302a68272c56209d5587c12d1"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:ba2956617f1c42598a308a84c6cf021a90ff3862eddafd20c3333d50f0edb45b"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3a569657468b6f3fb60587e48356fe512c1754ca05a564f11366ac9e306526"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8eab883b3b2a38cc1e050819ef06a7e6344d4a990d24d45bc6f2cf959045a45b"}, @@ -789,6 +792,7 @@ files = [ {file = "greenlet-2.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0ef99cdbe2b682b9ccbb964743a6aca37905fda5e0452e5ee239b1654d37f2a"}, {file = "greenlet-2.0.2-cp38-cp38-win32.whl", hash = "sha256:b80f600eddddce72320dbbc8e3784d16bd3fb7b517e82476d8da921f27d4b249"}, {file = "greenlet-2.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:4d2e11331fc0c02b6e84b0d28ece3a36e0548ee1a1ce9ddde03752d9b79bba40"}, + {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8512a0c38cfd4e66a858ddd1b17705587900dd760c6003998e9472b77b56d417"}, {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:88d9ab96491d38a5ab7c56dd7a3cc37d83336ecc564e4e8816dbed12e5aaefc8"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:561091a7be172ab497a3527602d467e2b3fbe75f9e783d8b8ce403fa414f71a6"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971ce5e14dc5e73715755d0ca2975ac88cfdaefcaab078a284fea6cfabf866df"}, @@ -3429,4 +3433,4 @@ plot = ["plotly"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "780b315a537828983a35ed72af1fd8510e2c2e0bb78f9e65f3bd8c7e9faf4a39" +content-hash = "a9d06a2fe71fe5fd71edd82ebfe2125db917a8a42f6feafc277e3042c585f17e" diff --git a/python/pyproject.toml b/python/pyproject.toml index ec4c8b16b..815549697 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -34,6 +34,7 @@ python = ">=3.9,<4.0" pyarrow = "^12.0.1" typing-extensions = "^4.7.1" plotly = {version = "^5.16.1", optional = true} +flake8 = "^6.1.0" [tool.poetry.extras] plot = ["plotly"]