Skip to content

Commit

Permalink
Add an example showing how to consume from BlueSky (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerinin authored Sep 14, 2023
2 parents bf294dd + 8a80de6 commit e5efd1e
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 1 deletion.
73 changes: 73 additions & 0 deletions python/docs/source/examples/bluesky.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Bluesky Firehose Example

Bluesky is a "distributed social network" that aims to improve on some of the perceived shortcomings of X (nee Twitter).
Bluesky uses a distributed protocol name the [AT Protocol](https://atproto.com/) to exchange messages between users, and provides a "firehose" delivering every message sent over the protocol in real-time.

In this example, we'll show how you can receive and process the firehose using Kaskada.

You can see the full example in the file [bluesky.py](https://github.com/kaskada-ai/kaskada/blob/main/python/docs/source/examples/bluesky.py).

## Setup the event data source

Before we can receive events from Bluesky, we need to create a data source to tell Kaskada how to handle the events.
We'll provide a schema and configure the time and entity fields.

```{literalinclude} bluesky.py
:language: python
:start-after: "[start_setup]"
:end-before: "[end_setup]"
:linenos:
:lineno-match:
:dedent: 4
```

## Define the incoming event handler

The `atproto` python library takes care of requesting and receiving events from Bluesky, all you need to do is create a handler to configure what to do with each event.
This handler parses the message to find [Commit](https://atproto.com/specs/repository#commit-objects) events.
For each Commit, we'll parse out any [Post](https://atproto.com/blog/create-post#post-record-structure) messages.
Finally we do some schema munging to get the Post into the event format we described when creating the data source.

```{literalinclude} bluesky.py
:language: python
:start-after: "[start_incoming]"
:end-before: "[end_incoming]"
:linenos:
:lineno-match:
:dedent: 4
```

## Construct a real-time query and result handler

Now we can use Kaskada to transform the events as they arrive.
First we'll use `with_key` to regroup events by language, then we'll apply a simple `count` aggregation.
Finally, we create a handler for the transformed results - here just printing them out.


```{literalinclude} bluesky.py
:language: python
:start-after: "[start_result]"
:end-before: "[end_result]"
:linenos:
:lineno-match:
:dedent: 4
```

## Final touches

Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing.

```{literalinclude} bluesky.py
:start-after: "[start_run]"
:end-before: "[end_run]"
:language: python
:linenos:
:lineno-match:
:dedent: 4
```

Try running it yourself and playing different transformations!

```bash
python bluesky.py
```
153 changes: 153 additions & 0 deletions python/docs/source/examples/bluesky.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/usr/bin/env python
#
# Bluesky Kaskada Consumer
#
# This script demonstrates the use of Kaskada to consume and compute over
# the BlueSky firehose.

import asyncio
import time
import kaskada as kd
import pyarrow as pa

from atproto.firehose import AsyncFirehoseSubscribeReposClient, parse_subscribe_repos_message
from atproto import CAR, AtUri, models
from atproto.xrpc_client.models import get_or_create, ids, is_record_type


async def main():
# Initialize the Kaskada session so we can use it for stream processing
kd.init_session()

# Create the BlueSky (At protocol) client.
# The firehose doesn't (currently) require authentication.
at_client = AsyncFirehoseSubscribeReposClient()

# [start_setup]
# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
#
# We'll push events into this source as they arrive in real-time.
posts = kd.sources.PyDict(
rows=[],
schema=pa.schema([
pa.field("record", pa.struct([
pa.field("created_at", pa.string()),
pa.field("text", pa.string()),
# pa.field("embed", pa.struct([...])),
pa.field("entities", pa.string()),
# pa.field("facets", pa.list_(...)),
pa.field("labels", pa.float64()),
pa.field("langs", pa.list_(pa.string())),
# pa.field("reply", pa.struct([...])),
pa.field("py_type", pa.string()),
])),
pa.field("uri", pa.string()),
pa.field("cid", pa.string()),
pa.field("author", pa.string()),
pa.field("ts", pa.float64()),
]),
time_column="ts",
key_column="author",
time_unit="s",
)
# [end_setup]

# [start_incoming]
# Handler for newly-arrived messages from BlueSky.
async def receive_at(message) -> None:
# Extract the contents of the message and bail if it's not a "commit"
commit = parse_subscribe_repos_message(message)
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return

# Get the operations included in the message
ops = _get_ops_by_type(commit)
for new_post in ops["posts"]["created"]:

# The atproto library's use of schemas is sort of confusing
# This isn't always the expected type and I'm not sure why...
if not isinstance(new_post['record'], models.app.bsky.feed.post.Main):
continue

# The parsing produces a hot mess of incompatible types, so we build
# a dict from scratch to simplify.
posts.add_rows({
"record": dict(new_post["record"]),
"uri": new_post["uri"],
"cid": new_post["cid"],
"author": new_post["author"],
"ts": time.time(),
})
# [end_incoming]

# [start_result]
# Handler for values emitted by Kaskada.
async def receive_outputs():

# We'll perform a very simple aggregation - key by language and count.
posts_by_first_lang = posts.with_key(posts.col("record").col("langs").index(0))

# Consume outputs as they're generated and print to STDOUT.
async for row in posts_by_first_lang.count().run_iter(kind="row", mode="live"):
print(f"{row['_key']} has posted {row['result']} times since startup")
# [end_result]

# [start_run]
# Kickoff the two async processes concurrently.
await asyncio.gather(at_client.start(receive_at), receive_outputs())
# [end_run]

# Copied from https://raw.githubusercontent.com/MarshalX/atproto/main/examples/firehose/process_commits.py
def _get_ops_by_type(commit: models.ComAtprotoSyncSubscribeRepos.Commit) -> dict: # noqa: C901, E302
operation_by_type = {
'posts': {'created': [], 'deleted': []},
'reposts': {'created': [], 'deleted': []},
'likes': {'created': [], 'deleted': []},
'follows': {'created': [], 'deleted': []},
}

car = CAR.from_bytes(commit.blocks)
for op in commit.ops:
uri = AtUri.from_str(f'at://{commit.repo}/{op.path}')

if op.action == 'update':
# not supported yet
continue

if op.action == 'create':
if not op.cid:
continue

create_info = {'uri': str(uri), 'cid': str(op.cid), 'author': commit.repo}

record_raw_data = car.blocks.get(op.cid)
if not record_raw_data:
continue

record = get_or_create(record_raw_data, strict=False)
if uri.collection == ids.AppBskyFeedLike and is_record_type(record, ids.AppBskyFeedLike):
operation_by_type['likes']['created'].append({'record': record, **create_info})
elif uri.collection == ids.AppBskyFeedPost and is_record_type(record, ids.AppBskyFeedPost):
operation_by_type['posts']['created'].append({'record': record, **create_info})
elif uri.collection == ids.AppBskyFeedRepost and is_record_type(record, ids.AppBskyFeedRepost):
operation_by_type['reposts']['created'].append({'record': record, **create_info})
elif uri.collection == ids.AppBskyGraphFollow and is_record_type(record, ids.AppBskyGraphFollow):
operation_by_type['follows']['created'].append({'record': record, **create_info})

if op.action == 'delete':
if uri.collection == ids.AppBskyFeedLike:
operation_by_type['likes']['deleted'].append({'uri': str(uri)})
elif uri.collection == ids.AppBskyFeedPost:
operation_by_type['posts']['deleted'].append({'uri': str(uri)})
elif uri.collection == ids.AppBskyFeedRepost:
operation_by_type['reposts']['deleted'].append({'uri': str(uri)})
elif uri.collection == ids.AppBskyGraphFollow:
operation_by_type['follows']['deleted'].append({'uri': str(uri)})

return operation_by_type


if __name__ == "__main__":
asyncio.run(main())
4 changes: 4 additions & 0 deletions python/docs/source/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

The [Time-Centric Calculations example](./time_centric.ipynb) shows how to work with time and produce past training examples and recent results for applying models.

The [Bluesky Firehose example](./bluesky.md) shows how to read and aggregate messages from the Bluesky firehose.
This demonstrates how to use Kaskada to connect in real-time and parse messages as part of the query.

```{toctree}
:hidden:
:maxdepth: 2
time_centric
bluesky
```
6 changes: 5 additions & 1 deletion python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ python = ">=3.9,<4.0"
pyarrow = "^12.0.1"
typing-extensions = "^4.7.1"
plotly = {version = "^5.16.1", optional = true}
flake8 = "^6.1.0"

[tool.poetry.extras]
plot = ["plotly"]
Expand Down

0 comments on commit e5efd1e

Please sign in to comment.