From 667361875288d2fa466645190b22bc8ef150b432 Mon Sep 17 00:00:00 2001 From: sugyan Date: Fri, 9 Feb 2024 00:12:49 +0900 Subject: [PATCH] Fix firehose --- examples/firehose/Cargo.toml | 3 +- examples/firehose/src/main.rs | 55 +++++++++++++++-------------------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/examples/firehose/Cargo.toml b/examples/firehose/Cargo.toml index bb85f2a7..e75054e3 100644 --- a/examples/firehose/Cargo.toml +++ b/examples/firehose/Cargo.toml @@ -7,9 +7,10 @@ edition = "2021" [dependencies] atrium-xrpc-server = { path = "../../atrium-xrpc-server" } -atrium-api = "0.15" +atrium-api = { path = "../../atrium-api", default-features = false, features = ["dag-cbor"]} ciborium = "0.2.1" futures = "0.3.28" rs-car = "0.4.1" tokio = { version = "1.28.1", features = ["full"] } tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } +serde_ipld_dagcbor = { git = "https://github.com/sugyan/serde_ipld_dagcbor.git", rev = "345b240" } diff --git a/examples/firehose/src/main.rs b/examples/firehose/src/main.rs index 6f93afee..8fef74d8 100644 --- a/examples/firehose/src/main.rs +++ b/examples/firehose/src/main.rs @@ -1,14 +1,14 @@ use atrium_api::app::bsky::feed::post::Record; -use atrium_api::com::atproto::sync::subscribe_repos::Message; +use atrium_api::com::atproto::sync::subscribe_repos::{Message, NSID}; +use atrium_api::types::CidLink; use atrium_xrpc_server::stream::frames::Frame; use futures::StreamExt; use tokio_tungstenite::{connect_async, tungstenite}; #[tokio::main] async fn main() -> Result<(), Box> { - let (mut stream, _) = - connect_async("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos").await?; - + let bgs = "bsky.network"; + let (mut stream, _) = connect_async(format!("wss://{bgs}/xrpc/{NSID}")).await?; while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { process_message(&message).await.unwrap(); } @@ -18,35 +18,28 @@ async fn main() -> Result<(), Box> { async fn process_message(message: &[u8]) -> Result<(), Box> { match Frame::try_from(message)? { Frame::Message(message) => { - match message.body { - Message::Commit(commit) => { - for op in commit.ops { - let collection = op.path.split('/').next().expect("op.path is empty"); - if op.action != "create" || collection != "app.bsky.feed.post" { - continue; - } - let (items, _) = - rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; - if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) - { - if let Ok(value) = - ciborium::de::from_reader::(&mut item.as_slice()) - { - println!("{}: {}", value.created_at, value.text); - } else { - println!("FAILED: could not deserialize post from item of length: {}", item.len()); - - } - } else { - println!( - "FAILED: could not find item with operation cid {:?} out of {} items", - op.cid, - items.len() - ); - } + if let Message::Commit(commit) = message.body { + for op in commit.ops { + let collection = op.path.split('/').next().expect("op.path is empty"); + if op.action != "create" || collection != "app.bsky.feed.post" { + continue; + } + let (items, _) = + rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; + if let Some((_, item)) = + items.iter().find(|(cid, _)| Some(CidLink(*cid)) == op.cid) + { + let record = + serde_ipld_dagcbor::from_reader::(&mut item.as_slice())?; + println!("{}: {}", record.created_at, record.text); + } else { + panic!( + "FAILED: could not find item with operation cid {:?} out of {} items", + op.cid, + items.len() + ); } } - _ => unimplemented!("{:?}", message.body), } } Frame::Error(err) => panic!("{err:?}"),