Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated docs for 2 examples #833

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/docs/examples/_metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
execute:
daemon: false
warning: false

filters:
- include-code-files

format:
html:
link-external-icon: true
link-external-newwindow: true

jupyter: python3
10 changes: 10 additions & 0 deletions python/docs/examples/bluesky.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def main():
# The firehose doesn't (currently) require authentication.
at_client = AsyncFirehoseSubscribeReposClient()

# [start_setup]
# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
Expand Down Expand Up @@ -61,7 +62,9 @@ async def main():
key_column="author",
time_unit="s",
)
# [end_setup]

# [start_incoming]
# Handler for newly-arrived messages from BlueSky.
async def receive_at(message) -> None:
# Extract the contents of the message and bail if it's not a "commit"
Expand Down Expand Up @@ -89,6 +92,9 @@ async def receive_at(message) -> None:
}
)

# [end_incoming]

# [start_result]
# Handler for values emitted by Kaskada.
async def receive_outputs():
# We'll perform a very simple aggregation - key by language and count.
Expand All @@ -98,8 +104,12 @@ async def receive_outputs():
async for row in posts_by_first_lang.count().run_iter(kind="row", mode="live"):
print(f"{row['_key']} has posted {row['result']} times since startup")

# [end_result]

# [start_run]
# Kickoff the two async processes concurrently.
await asyncio.gather(at_client.start(receive_at), receive_outputs())
# [end_run]


# Copied from https://raw.githubusercontent.com/MarshalX/atproto/main/examples/firehose/process_commits.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ You can see the full example in the file [bluesky.py](https://github.com/kaskada
Before we can receive events from Bluesky, we need to create a data source to tell Kaskada how to handle the events.
We'll provide a schema and configure the time and entity fields.

```{.python include="bluesky.py" code-line-numbers="true" start-line=26 end-line=53 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=30 end-line=64 dedent=4}
```

## Define the incoming event handler
Expand All @@ -27,7 +27,7 @@ This handler parses the message to find [Commit](https://atproto.com/specs/repos
For each Commit, we'll parse out any [Post](https://atproto.com/blog/create-post#post-record-structure) messages.
Finally we do some schema munging to get the Post into the event format we described when creating the data source.

```{.python include="bluesky.py" code-line-numbers="true" start-line=55 end-line=79 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=68 end-line=93 dedent=4}
```

## Construct a real-time query and result handler
Expand All @@ -37,14 +37,14 @@ First we'll use `with_key` to regroup events by language, then we'll apply a sim
Finally, we create a handler for the transformed results - here just printing them out.


```{.python include="bluesky.py" code-line-numbers="true" start-line=81 end-line=89 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=98 end-line=105 dedent=4}
```

## Final touches

Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing.

```{.python include="bluesky.py" code-line-numbers="true" start-line=91 end-line=92 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=110 end-line=111 dedent=4}
```

Try running it yourself and playing different transformations!
Expand Down
Loading
Loading