Skip to content

Commit

Permalink
fix: change pubsub/ls to return base64url encoded topics
Browse files Browse the repository at this point in the history
Prior to this change the pubsub/ls endpoint would return the raw topic
names when the should be base64url encoded names.

Additionally the subscription would remain even after the subscription
connection had closed. This is now fixed.
  • Loading branch information
nathanielc committed Oct 11, 2023
1 parent 16a50d8 commit 10b6efc
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions kubo-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ repository.workspace = true
publish = false

[features]
http = ["dep:ceramic-kubo-rpc-server", "dep:serde", "dep:serde_json", "dep:hyper"]
http = [
"dep:ceramic-kubo-rpc-server",
"dep:hyper",
"dep:pin-project",
"dep:serde",
"dep:serde_json",
]

[dependencies]
anyhow.workspace = true
Expand All @@ -36,15 +42,16 @@ libp2p-identity.workspace = true
libp2p-tls.workspace = true
libp2p.workspace = true
multiaddr.workspace = true
pin-project = { version = "1.1.3", optional = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
swagger.workspace = true
thiserror = "1.0.47"
tokio-stream = "0.1.14"
tokio-util.workspace = true
tokio.workspace = true
tracing-opentelemetry.workspace = true
tracing.workspace = true
thiserror = "1.0.47"

[dev-dependencies]
expect-test.workspace = true
Expand Down
58 changes: 50 additions & 8 deletions kubo-rpc/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
//! Provides an http implementation of the Kubo RPC methods.
mod stream_drop;

use std::{collections::HashSet, io::Cursor, marker::PhantomData, str::FromStr};

use async_trait::async_trait;
Expand Down Expand Up @@ -26,9 +29,11 @@ use libp2p::{gossipsub::Message, Multiaddr, PeerId};
use multiaddr::Protocol;
use serde::Serialize;
use swagger::{ApiError, ByteArray};
use tracing::{instrument, warn};

use crate::{
block, dag, id, pin, pubsub, swarm, version, Bytes, GossipsubEvent, IpfsDep, IpfsPath,
block, dag, http::stream_drop::StreamDrop, id, pin, pubsub, swarm, version, Bytes,
GossipsubEvent, IpfsDep, IpfsPath,
};

/// Kubo RPC API Server implementation.
Expand All @@ -39,7 +44,7 @@ pub struct Server<I, C> {
}
impl<I, C> Server<I, C>
where
I: IpfsDep,
I: IpfsDep + Send + Sync + 'static,
{
/// Construct a new Server
pub fn new(ipfs: I) -> Self {
Expand Down Expand Up @@ -76,9 +81,10 @@ macro_rules! try_or_bad_request {
#[async_trait]
impl<I, C> Api<C> for Server<I, C>
where
I: IpfsDep + Send + Sync,
I: IpfsDep + Send + Sync + 'static,
C: Send + Sync,
{
#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn block_get_post(
&self,
arg: String,
Expand All @@ -91,6 +97,7 @@ where
Ok(BlockGetPostResponse::Success(ByteArray(data)))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn block_put_post(
&self,
file: ByteArray,
Expand Down Expand Up @@ -135,6 +142,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn block_stat_post(
&self,
arg: String,
Expand All @@ -150,6 +158,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn dag_get_post(
&self,
arg: String,
Expand All @@ -174,6 +183,7 @@ where
}
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn dag_import_post(
&self,
file: swagger::ByteArray,
Expand All @@ -193,6 +203,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn dag_put_post(
&self,
file: ByteArray,
Expand Down Expand Up @@ -237,6 +248,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn dag_resolve_post(
&self,
arg: String,
Expand All @@ -254,6 +266,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn id_post(&self, arg: Option<String>, _context: &C) -> Result<IdPostResponse, ApiError> {
let info = if let Some(id) = &arg {
let peer_id = try_or_bad_request!(PeerId::from_str(id), IdPostResponse);
Expand All @@ -278,6 +291,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn pin_add_post(
&self,
arg: String,
Expand Down Expand Up @@ -308,6 +322,7 @@ where
pins: vec![cid.to_string()],
}))
}
#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn pin_rm_post(&self, arg: String, _context: &C) -> Result<PinRmPostResponse, ApiError> {
let ipfs_path = try_or_bad_request!(IpfsPath::from_str(&arg), PinRmPostResponse);
let cid = pin::remove(self.ipfs.clone(), &ipfs_path)
Expand All @@ -318,15 +333,20 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn pubsub_ls_post(&self, _context: &C) -> Result<PubsubLsPostResponse, ApiError> {
let topics = pubsub::topics(self.ipfs.clone())
.await
.map_err(to_api_error)?;
.map_err(to_api_error)?
.into_iter()
.map(|topic| multibase::encode(Base::Base64Url, topic))
.collect();
Ok(PubsubLsPostResponse::Success(PubsubLsPost200Response {
strings: topics,
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn pubsub_pub_post(
&self,
arg: String,
Expand All @@ -344,6 +364,7 @@ where
Ok(PubsubPubPostResponse::Success)
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn pubsub_sub_post(
&self,
arg: String,
Expand All @@ -354,7 +375,7 @@ where

let topic = try_or_bad_request!(String::from_utf8(topic_bytes), PubsubSubPostResponse);

let subscription = pubsub::subscribe(self.ipfs.clone(), topic)
let subscription = pubsub::subscribe(self.ipfs.clone(), topic.clone())
.await
.map_err(to_api_error)?;

Expand Down Expand Up @@ -407,10 +428,18 @@ where
})
.map_err(Box::<dyn std::error::Error + Send + Sync>::from)
});
let ipfs = self.ipfs.clone();
let messages = StreamDrop::new(messages, async move {
let ret = pubsub::unsubscribe(ipfs.clone(), topic.clone()).await;
if let Err(error) = ret {
warn!(topic, %error, "failed to unsubscribe");
};
});

Ok(PubsubSubPostResponse::Success(Box::pin(messages)))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn swarm_connect_post(
&self,
arg: &Vec<String>,
Expand Down Expand Up @@ -472,6 +501,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn swarm_peers_post(&self, _context: &C) -> Result<SwarmPeersPostResponse, ApiError> {
let peers: Vec<SwarmPeersPost200ResponsePeersInner> = swarm::peers(self.ipfs.clone())
.await
Expand All @@ -490,6 +520,7 @@ where
}))
}

#[instrument(skip(self, _context), ret, level = tracing::Level::DEBUG)]
async fn version_post(&self, _context: &C) -> Result<VersionPostResponse, ApiError> {
let v = version::version(self.ipfs.clone())
.await
Expand Down Expand Up @@ -1507,8 +1538,8 @@ mod tests {
Success(
PubsubLsPost200Response {
strings: [
"topic1",
"topic2",
"udG9waWMx",
"udG9waWMy",
],
},
)
Expand Down Expand Up @@ -1551,11 +1582,12 @@ mod tests {
let topic_encoded = multibase::encode(Base::Base64, topic.as_bytes());

let mut mock_ipfs = MockIpfsDepTest::new();
let t = topic.clone();
mock_ipfs.expect_clone().once().return_once(move || {
let mut m = MockIpfsDepTest::new();
m.expect_subscribe()
.once()
.with(predicate::eq(topic))
.with(predicate::eq(t))
.return_once(|_| {
let first = Ok(GossipsubEvent::Message {
from: PeerId::from_str(
Expand Down Expand Up @@ -1601,6 +1633,15 @@ mod tests {
});
m
});
let t = topic.clone();
mock_ipfs.expect_clone().once().return_once(move || {
let mut m = MockIpfsDepTest::new();
m.expect_unsubscribe()
.once()
.with(predicate::eq(t))
.return_once(|_| Ok(()));
m
});
let server = Server::new(mock_ipfs);
let resp = server
.pubsub_sub_post(topic_encoded, &Context)
Expand Down Expand Up @@ -1639,6 +1680,7 @@ mod tests {
panic!("did not get success from server");
}
}

#[tokio::test]
#[traced_test]
async fn swarm_connect() {
Expand Down
57 changes: 57 additions & 0 deletions kubo-rpc/src/http/stream_drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures_util::{Future, Stream};
use pin_project::{pin_project, pinned_drop};

/// Wraps a stream and on drop spawns a future as its own task.
/// The future is not gauranteed to complete.
#[pin_project(PinnedDrop)]
pub struct StreamDrop<S, D>
where
D: Future<Output = ()> + Send + 'static,
{
#[pin]
stream: S,
drop_fut: Option<D>,
}

impl<S, D> StreamDrop<S, D>
where
D: Future<Output = ()> + Send + 'static,
{
pub fn new(stream: S, drop_fn: D) -> Self {
Self {
stream,
drop_fut: Some(drop_fn),
}
}
}

impl<S, D> Stream for StreamDrop<S, D>
where
S: Stream,
D: Future<Output = ()> + Send + 'static,
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
Stream::poll_next(this.stream, cx)
}
}

#[pinned_drop]
impl<S, D> PinnedDrop for StreamDrop<S, D>
where
D: Future<Output = ()> + Send + 'static,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(drop_fn) = this.drop_fut.take() {
tokio::spawn(drop_fn);
}
}
}
Loading

0 comments on commit 10b6efc

Please sign in to comment.