Skip to content

Commit

Permalink
document types and their fields
Browse files Browse the repository at this point in the history
  • Loading branch information
korewaChino committed Dec 6, 2024
1 parent f8eb724 commit 12609b2
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 10 deletions.
4 changes: 3 additions & 1 deletion skystreamer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio_tungstenite::{
};
use types::{operation::Operation, PostData, Subscription};
// use types::{CommitHandler, PostData, Subscription};

/// Error handling for the skystreamer crate
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to connect to websocket: {0}")]
Expand Down Expand Up @@ -140,6 +140,8 @@ fn is_post_creation(op: &atrium_api::com::atproto::sync::subscribe_repos::RepoOp
}

// convert a commit to
#[deprecated(note = "Please use [`Record::from_op`] instead.", since = "0.2.0")]
#[allow(deprecated)]
pub async fn handle_commit(commit: &Commit) -> Result<Vec<PostData>> {
let mut posts = vec![];
for op in &commit.ops {
Expand Down
6 changes: 6 additions & 0 deletions skystreamer/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ pub struct EventStream {
}

impl EventStream {
/// Create a new [`EventStream`] from a [`crate::RepoSubscription`].
pub fn new(inner: crate::RepoSubscription) -> Self {
EventStream {
subscription: inner,
}
}

/// Start streaming events from the firehose,
/// and flatten blocks of commits into individual records.
///
/// This function returns a [`futures::Stream`] of [`commit::Record`]s.
///
pub async fn stream(&mut self) -> Result<impl futures::Stream<Item = commit::Record> + '_> {
let block_stream = self.subscription.stream_commits();

Expand Down
17 changes: 17 additions & 0 deletions skystreamer/src/types/actor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Idiomatic Rust from the Bluesky API actor types
use crate::util::{conv_atrium_cid, datetime_to_chrono};
use atrium_api::{
app::bsky::{self},
Expand All @@ -10,19 +11,35 @@ use serde::{Deserialize, Serialize};
use super::Blob;

/// A user profile
///
/// A profile is, simply put, an account on Bluesky Social.
///
/// It contains information about the user, such as their display name, avatar, and
/// bio.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Profile {
/// The ATProto DID (Repo ID) of the user
pub did: Did,
/// Link to the user's profile image, if any
pub avatar: Option<Blob>,
/// The date the profile was created
pub created_at: Option<DateTime<FixedOffset>>,
/// The user's bio text, if any
pub description: Option<String>,
/// The user's display name (not their handle)
pub display_name: Option<String>,
// pub joined_via_starter_pack: Option<crate::com::atproto::repo::strong_ref::Main>,
/// Self-imposed labels this user has put on themselves
pub labels: Vec<String>,
/// A link to the user's pinned post on their profile,
/// Refers to [`crate::types::Post`]
pub pinned_post: Option<Cid>,
}

impl Profile {

/// Create a new profile from their record stream.
/// Used in the [`crate::stream::EventStream`] to create a profile from a commit.
pub fn new(did: Did, record: bsky::actor::profile::Record, cid: Option<CidLink>) -> Self {
if let Some(cid) = cid {
tracing::trace!("Profile cid: {:?}", cid);
Expand Down
38 changes: 32 additions & 6 deletions skystreamer/src/types/commit.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
//! Helper types for deserialize commit data from the firehose.
use super::{actor::Profile, feed::*, graph::*, operation::Operation, Post};
use crate::{Error, Result};
use atrium_api::{
app::bsky, com::atproto::sync::subscribe_repos::Commit as ACommit, types::CidLink,
};
use std::convert::From;

/// A record is an event that happens on ATProto.
/// It can be a post, or any kind of new event emitted from the network itself.
#[derive(Debug, Clone)]
pub enum Record {
/// A new post
Post(Box<Post>),
/// A user blocks another user
Block(Box<BlockEvent>),
/// Someone likes a post
Like(Box<LikeEvent>),
/// Someone follows another user
Follow(Box<FollowEvent>),
/// Someone reposts a post
Repost(Box<RepostEvent>),
/// A new list item
ListItem(Box<ListItemEvent>),
/// A new user being created on the network
Profile(Box<Profile>),
// Other(Box<serde::de::value::>),
/// Other, (yet) unsupported record types
///
/// This is a catch-all for any record type that is not yet supported by the library.
///
/// Returns a serde_json::Value, which can be used to inspect the raw data or manually
/// deserialize it if needed.
Other((Operation, Box<serde_json::Value>)),
}

impl Record {
/// Deserialize an operation into the Record enum, given a commit.
///
/// Returns all the records that can be extracted from the operation.
pub async fn from_op(op: &Operation, commit: &ACommit) -> Result<Vec<Self>> {
let mut blocks = commit.blocks.as_slice();
let mut records = vec![];
Expand All @@ -36,12 +56,6 @@ impl Record {
.ok_or_else(|| Error::ItemNotFound(op.get_cid(), items.len()))?;
match op {
Operation::Post(cidlink, _) => {
// let post_record: bsky::feed::post::Record =
// serde_ipld_dagcbor::from_reader(&mut item.as_slice())?;
// let post_record = bsky::feed::post::Record
// let post_record = bsky::feed::post::Record::from(post_record);
// records.push(Record::Post(post));
// println!("{:?}", post_record);
records.push(Record::Post(Box::new(Post::from_record(
commit.repo.clone(),
cidlink.clone().unwrap(),
Expand Down Expand Up @@ -118,6 +132,10 @@ impl Record {
}
}


/// A singular commit, containing a list of operations.
///
/// This is a wrapper around [`atrium_api::com::atproto::sync::subscribe_repos::Commit`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Commit {
pub operations: Vec<Operation>,
Expand All @@ -142,6 +160,11 @@ impl From<&ACommit> for Commit {
}
}

/// Extract a post record from a commit.
#[deprecated(
note = "Please use [`Record::from_op`] instead.",
since = "0.2.0"
)]
pub async fn extract_post_record(
op: &atrium_api::com::atproto::sync::subscribe_repos::RepoOp,
mut blocks: &[u8],
Expand All @@ -164,10 +187,13 @@ pub async fn extract_post_record(
}

impl Commit {

/// Get the inner commit data, in case you need to access the raw commit.
pub fn inner(&self) -> &ACommit {
&self.inner_commit
}

/// Extracts all records from the commit.
pub async fn extract_records(&self) -> Vec<Record> {
let mut records = vec![];

Expand Down
8 changes: 7 additions & 1 deletion skystreamer/src/types/feed.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Helper types for bsky graph events
//! Helper types for bsky feed events, detailing user interactions with posts.
//!
//! These events are emitted when a user interacts with a post, adding data to the feed.
use crate::util::{conv_atrium_cid, datetime_to_chrono};
use atrium_api::{
Expand All @@ -10,6 +12,8 @@ use cid::Cid;
use serde::{Deserialize, Serialize};

/// An event where someone likes a post
///
/// This event is emitted when someone likes a post on the network.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LikeEvent {
pub author: Did,
Expand All @@ -32,6 +36,8 @@ impl LikeEvent {
}

/// An event where someone reposts a post
///
/// This event is emitted when someone reposts a post on the network.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepostEvent {
pub author: Did,
Expand Down
6 changes: 5 additions & 1 deletion skystreamer/src/types/graph.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Helper types for bsky graph events
//! Helper types for bsky graph events, detailing user
//! connections. These events are emitted when a user does something related
//! to another user.
use crate::util::datetime_to_chrono;
use atrium_api::{
Expand All @@ -9,6 +11,8 @@ use chrono::{DateTime, FixedOffset};
use serde::{Deserialize, Serialize};

/// An event where someone blocks someone else :(
///
/// This event is emitted when someone blocks another user on the network.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlockEvent {
pub author: Did,
Expand Down
13 changes: 13 additions & 0 deletions skystreamer/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ impl From<RecordEmbedRefs> for Embed {
}
}

/// A post on the network
///
/// An idiomatic wrapper type around [`atrium_api::app::bsky::feed::post::Record`]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Post {
pub author: Did,
Expand All @@ -216,6 +219,14 @@ impl Post {
Self::from(PostData::new(author, cid, record))
}


/// Get media associated with the post, if any
///
/// Before this was implemented as a method,
/// you were required to match each variant of the Embed union type from
/// ATrium to get the media associated with the post.
///
/// This function abstracts that away and returns a vector of Media types
pub fn get_post_media(&self) -> Vec<Media> {
let mut media = vec![];
if let Some(embed) = self.embed.as_ref() {
Expand Down Expand Up @@ -341,6 +352,8 @@ impl PostData {
// pub fn
}


/// Downloads media associated with a post, returning bytes of the media
pub async fn download_media<C>(
client: &atrium_api::client::Service<C>,
did: &Did,
Expand Down
5 changes: 5 additions & 0 deletions skystreamer/src/types/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use atrium_api::{
types::{CidLink, Collection},
};

/// An operation in a commit.
///
/// Wrapper around [`atrium_api::com::atproto::sync::subscribe_repos::RepoOp`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Operation {
// app.bsky.feed.post
Expand Down Expand Up @@ -40,6 +43,7 @@ impl Operation {
}
}

/// Get the CID of the operation
pub fn get_cid(&self) -> Option<CidLink> {
match self {
Operation::Post(cid, _)
Expand All @@ -53,6 +57,7 @@ impl Operation {
}
}

/// Get the operation type
pub fn get_op(&self) -> RepoOp {
match self {
Operation::Post(_, op)
Expand Down
7 changes: 6 additions & 1 deletion skystreamer/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ use atrium_api::types::string::{Cid as ACid, Datetime};
use chrono::{DateTime, FixedOffset};
use cid::Cid;

/// Convert an ATProto datetime to a chrono datetime
///
/// Simply dereferences the ATProto datetime.
#[inline]
pub fn datetime_to_chrono(dt: &Datetime) -> DateTime<FixedOffset> {
DateTime::parse_from_rfc3339(dt.as_str()).unwrap()
*dt.as_ref()
}

/// Marshalls an older ATProto CID (CID 0.10) to a newer CID type (CID 0.11)
#[inline]
pub fn conv_atrium_cid(cid: &ACid) -> Cid {
Cid::from(cid.as_ref())
Expand Down

0 comments on commit 12609b2

Please sign in to comment.