From 10b6efcca1bae1d7628f564b3bbad7cb7f751d40 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 11 Oct 2023 15:20:05 -0600 Subject: [PATCH] fix: change pubsub/ls to return base64url encoded topics Prior to this change the pubsub/ls endpoint would return the raw topic names when the should be base64url encoded names. Additionally the subscription would remain even after the subscription connection had closed. This is now fixed. --- Cargo.lock | 1 + kubo-rpc/Cargo.toml | 11 ++++-- kubo-rpc/src/http.rs | 58 +++++++++++++++++++++++++++----- kubo-rpc/src/http/stream_drop.rs | 57 +++++++++++++++++++++++++++++++ kubo-rpc/src/lib.rs | 43 +++++++++++++++++++++-- kubo-rpc/src/pubsub.rs | 8 +++++ 6 files changed, 165 insertions(+), 13 deletions(-) create mode 100644 kubo-rpc/src/http/stream_drop.rs diff --git a/Cargo.lock b/Cargo.lock index ecc50ba62..f8152708c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1303,6 +1303,7 @@ dependencies = [ "libp2p-tls", "mockall", "multiaddr", + "pin-project", "serde", "serde_json", "swagger", diff --git a/kubo-rpc/Cargo.toml b/kubo-rpc/Cargo.toml index 9b28e00b9..5598c57ef 100644 --- a/kubo-rpc/Cargo.toml +++ b/kubo-rpc/Cargo.toml @@ -9,7 +9,13 @@ repository.workspace = true publish = false [features] -http = ["dep:ceramic-kubo-rpc-server", "dep:serde", "dep:serde_json", "dep:hyper"] +http = [ + "dep:ceramic-kubo-rpc-server", + "dep:hyper", + "dep:pin-project", + "dep:serde", + "dep:serde_json", +] [dependencies] anyhow.workspace = true @@ -36,15 +42,16 @@ libp2p-identity.workspace = true libp2p-tls.workspace = true libp2p.workspace = true multiaddr.workspace = true +pin-project = { version = "1.1.3", optional = true } serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } swagger.workspace = true +thiserror = "1.0.47" tokio-stream = "0.1.14" tokio-util.workspace = true tokio.workspace = true tracing-opentelemetry.workspace = true tracing.workspace = true -thiserror = "1.0.47" [dev-dependencies] expect-test.workspace = true diff --git a/kubo-rpc/src/http.rs b/kubo-rpc/src/http.rs index f4a544fca..f51cc2e3b 100644 --- a/kubo-rpc/src/http.rs +++ b/kubo-rpc/src/http.rs @@ -1,4 +1,7 @@ //! Provides an http implementation of the Kubo RPC methods. + +mod stream_drop; + use std::{collections::HashSet, io::Cursor, marker::PhantomData, str::FromStr}; use async_trait::async_trait; @@ -26,9 +29,11 @@ use libp2p::{gossipsub::Message, Multiaddr, PeerId}; use multiaddr::Protocol; use serde::Serialize; use swagger::{ApiError, ByteArray}; +use tracing::{instrument, warn}; use crate::{ - block, dag, id, pin, pubsub, swarm, version, Bytes, GossipsubEvent, IpfsDep, IpfsPath, + block, dag, http::stream_drop::StreamDrop, id, pin, pubsub, swarm, version, Bytes, + GossipsubEvent, IpfsDep, IpfsPath, }; /// Kubo RPC API Server implementation. @@ -39,7 +44,7 @@ pub struct Server { } impl Server where - I: IpfsDep, + I: IpfsDep + Send + Sync + 'static, { /// Construct a new Server pub fn new(ipfs: I) -> Self { @@ -76,9 +81,10 @@ macro_rules! try_or_bad_request { #[async_trait] impl Api for Server where - I: IpfsDep + Send + Sync, + I: IpfsDep + Send + Sync + 'static, C: Send + Sync, { + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn block_get_post( &self, arg: String, @@ -91,6 +97,7 @@ where Ok(BlockGetPostResponse::Success(ByteArray(data))) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn block_put_post( &self, file: ByteArray, @@ -135,6 +142,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn block_stat_post( &self, arg: String, @@ -150,6 +158,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn dag_get_post( &self, arg: String, @@ -174,6 +183,7 @@ where } } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn dag_import_post( &self, file: swagger::ByteArray, @@ -193,6 +203,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn dag_put_post( &self, file: ByteArray, @@ -237,6 +248,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn dag_resolve_post( &self, arg: String, @@ -254,6 +266,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn id_post(&self, arg: Option, _context: &C) -> Result { let info = if let Some(id) = &arg { let peer_id = try_or_bad_request!(PeerId::from_str(id), IdPostResponse); @@ -278,6 +291,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn pin_add_post( &self, arg: String, @@ -308,6 +322,7 @@ where pins: vec![cid.to_string()], })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn pin_rm_post(&self, arg: String, _context: &C) -> Result { let ipfs_path = try_or_bad_request!(IpfsPath::from_str(&arg), PinRmPostResponse); let cid = pin::remove(self.ipfs.clone(), &ipfs_path) @@ -318,15 +333,20 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn pubsub_ls_post(&self, _context: &C) -> Result { let topics = pubsub::topics(self.ipfs.clone()) .await - .map_err(to_api_error)?; + .map_err(to_api_error)? + .into_iter() + .map(|topic| multibase::encode(Base::Base64Url, topic)) + .collect(); Ok(PubsubLsPostResponse::Success(PubsubLsPost200Response { strings: topics, })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn pubsub_pub_post( &self, arg: String, @@ -344,6 +364,7 @@ where Ok(PubsubPubPostResponse::Success) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn pubsub_sub_post( &self, arg: String, @@ -354,7 +375,7 @@ where let topic = try_or_bad_request!(String::from_utf8(topic_bytes), PubsubSubPostResponse); - let subscription = pubsub::subscribe(self.ipfs.clone(), topic) + let subscription = pubsub::subscribe(self.ipfs.clone(), topic.clone()) .await .map_err(to_api_error)?; @@ -407,10 +428,18 @@ where }) .map_err(Box::::from) }); + let ipfs = self.ipfs.clone(); + let messages = StreamDrop::new(messages, async move { + let ret = pubsub::unsubscribe(ipfs.clone(), topic.clone()).await; + if let Err(error) = ret { + warn!(topic, %error, "failed to unsubscribe"); + }; + }); Ok(PubsubSubPostResponse::Success(Box::pin(messages))) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn swarm_connect_post( &self, arg: &Vec, @@ -472,6 +501,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn swarm_peers_post(&self, _context: &C) -> Result { let peers: Vec = swarm::peers(self.ipfs.clone()) .await @@ -490,6 +520,7 @@ where })) } + #[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)] async fn version_post(&self, _context: &C) -> Result { let v = version::version(self.ipfs.clone()) .await @@ -1507,8 +1538,8 @@ mod tests { Success( PubsubLsPost200Response { strings: [ - "topic1", - "topic2", + "udG9waWMx", + "udG9waWMy", ], }, ) @@ -1551,11 +1582,12 @@ mod tests { let topic_encoded = multibase::encode(Base::Base64, topic.as_bytes()); let mut mock_ipfs = MockIpfsDepTest::new(); + let t = topic.clone(); mock_ipfs.expect_clone().once().return_once(move || { let mut m = MockIpfsDepTest::new(); m.expect_subscribe() .once() - .with(predicate::eq(topic)) + .with(predicate::eq(t)) .return_once(|_| { let first = Ok(GossipsubEvent::Message { from: PeerId::from_str( @@ -1601,6 +1633,15 @@ mod tests { }); m }); + let t = topic.clone(); + mock_ipfs.expect_clone().once().return_once(move || { + let mut m = MockIpfsDepTest::new(); + m.expect_unsubscribe() + .once() + .with(predicate::eq(t)) + .return_once(|_| Ok(())); + m + }); let server = Server::new(mock_ipfs); let resp = server .pubsub_sub_post(topic_encoded, &Context) @@ -1639,6 +1680,7 @@ mod tests { panic!("did not get success from server"); } } + #[tokio::test] #[traced_test] async fn swarm_connect() { diff --git a/kubo-rpc/src/http/stream_drop.rs b/kubo-rpc/src/http/stream_drop.rs new file mode 100644 index 000000000..537a7a1cc --- /dev/null +++ b/kubo-rpc/src/http/stream_drop.rs @@ -0,0 +1,57 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::{Future, Stream}; +use pin_project::{pin_project, pinned_drop}; + +/// Wraps a stream and on drop spawns a future as its own task. +/// The future is not gauranteed to complete. +#[pin_project(PinnedDrop)] +pub struct StreamDrop +where + D: Future + Send + 'static, +{ + #[pin] + stream: S, + drop_fut: Option, +} + +impl StreamDrop +where + D: Future + Send + 'static, +{ + pub fn new(stream: S, drop_fn: D) -> Self { + Self { + stream, + drop_fut: Some(drop_fn), + } + } +} + +impl Stream for StreamDrop +where + S: Stream, + D: Future + Send + 'static, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + Stream::poll_next(this.stream, cx) + } +} + +#[pinned_drop] +impl PinnedDrop for StreamDrop +where + D: Future + Send + 'static, +{ + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + if let Some(drop_fn) = this.drop_fut.take() { + tokio::spawn(drop_fn); + } + } +} diff --git a/kubo-rpc/src/lib.rs b/kubo-rpc/src/lib.rs index 4e5836c92..3b627224d 100644 --- a/kubo-rpc/src/lib.rs +++ b/kubo-rpc/src/lib.rs @@ -10,7 +10,10 @@ use std::{ fmt::{self, Display, Formatter}, io::Cursor, path::PathBuf, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }, }; use std::{str::FromStr, sync::Arc}; @@ -157,13 +160,15 @@ pub trait IpfsDep: Clone { async fn peers(&self) -> Result>, Error>; /// Connect to a specific peer node. async fn connect(&self, peer_id: PeerId, addrs: Vec) -> Result<(), Error>; - /// Publish a message on a pub/sub Topic. + /// Publish a message on a pub/sub topic. async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error>; - /// Subscribe to a pub/sub Topic + /// Subscribe to a pub/sub topic. async fn subscribe( &self, topic: String, ) -> Result>, Error>; + /// Unsubscribe from a pub/sub topic. + async fn unsubscribe(&self, topic: String) -> Result<(), Error>; /// List topics to which, we are currently subscribed async fn topics(&self) -> Result, Error>; /// Current version of ceramic @@ -175,6 +180,7 @@ pub struct IpfsService { p2p: P2pClient, store: SQLiteBlockStore, resolver: Resolver, + topics: Mutex>, } impl IpfsService { @@ -190,6 +196,7 @@ impl IpfsService { p2p, store, resolver, + topics: Mutex::new(HashMap::new()), } } } @@ -288,6 +295,14 @@ impl IpfsDep for Arc { &self, topic: String, ) -> Result>, Error> { + { + self.topics + .lock() + .expect("should be able to lock topics set") + .entry(topic.clone()) + .and_modify(|counter| *counter = *counter + 1) + .or_insert(1); + } let topic = TopicHash::from_raw(topic); Ok(Box::pin( self.p2p @@ -297,6 +312,27 @@ impl IpfsDep for Arc { )) } #[instrument(skip(self))] + async fn unsubscribe(&self, topic: String) -> Result<(), Error> { + let count = { + *(self + .topics + .lock() + .expect("should be able to lock topics set") + .entry(topic.clone()) + .and_modify(|counter| *counter = *counter - 1) + .or_insert(1)) + }; + let topic = TopicHash::from_raw(topic); + // Only unsubscribe if this is the last subscription + if count <= 0 { + self.p2p + .gossipsub_unsubscribe(topic) + .await + .map_err(Error::Internal)?; + } + Ok(()) + } + #[instrument(skip(self))] async fn topics(&self) -> Result, Error> { Ok(self .p2p @@ -479,6 +515,7 @@ pub(crate) mod tests { async fn connect(&self, peer_id: PeerId, addrs: Vec) -> Result<(), Error>; async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error>; async fn subscribe( &self, topic: String) -> Result>, Error>; + async fn unsubscribe(&self, topic: String) -> Result<(), Error>; async fn topics(&self) -> Result, Error>; async fn version(&self) -> Result; } diff --git a/kubo-rpc/src/pubsub.rs b/kubo-rpc/src/pubsub.rs index 3d17114e7..eff1ee632 100644 --- a/kubo-rpc/src/pubsub.rs +++ b/kubo-rpc/src/pubsub.rs @@ -24,6 +24,14 @@ where { client.subscribe(topic).await } +/// Unsubscribe from a topic +#[tracing::instrument(skip(client), ret)] +pub async fn unsubscribe(client: T, topic: String) -> Result<(), Error> +where + T: IpfsDep, +{ + client.unsubscribe(topic).await +} /// Returns a list of topics, to which we are currently subscribed. #[tracing::instrument(skip(client))] pub async fn topics(client: T) -> Result, Error>