From df8e66c9fe22bc250a92e13542fcacde6c8fece2 Mon Sep 17 00:00:00 2001 From: David Estes <5317198+dav1do@users.noreply.github.com> Date: Thu, 23 May 2024 15:18:10 -0600 Subject: [PATCH] fix: recon protocol stall with no shared interests (#367) * fix: protocol hang with no interest overlap * chore: minor clean up * fix: remove unnecessary early return The writer will get the finished message and drop the sink, which will close the stream and end the read --- api/src/server.rs | 4 ++-- one/src/lib.rs | 1 - recon/src/libp2p.rs | 4 ++++ recon/src/protocol.rs | 39 +++++++++++++++++++++++---------------- recon/src/recon/tests.rs | 17 +++++++++++++++++ 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index 90274be78..39cdeb746 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -744,14 +744,14 @@ where async fn interests_sort_key_sort_value_post( &self, sep_key: String, - seplue: String, + sep_value: String, controller: Option, stream_id: Option, _context: &C, ) -> Result { let interest = models::Interest { sep: sep_key, - sep_value: seplue, + sep_value, controller, stream_id, }; diff --git a/one/src/lib.rs b/one/src/lib.rs index 219f61a0a..938648ede 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -490,7 +490,6 @@ impl Daemon { metrics::start(&opts.metrics_bind_address.parse()?); // Build HTTP server - let network = network.clone(); let ceramic_server = ceramic_api::Server::new( peer_id, network, diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index 2e6cb57d0..7743997b7 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -191,6 +191,7 @@ where status: info.status, }))) } else { + tracing::warn!(%peer_id, "peer not found in peers map when started syncronizing?"); None } } @@ -204,6 +205,7 @@ where status: info.status, }))) } else { + tracing::warn!(%peer_id, "peer not found in peers map when stopped syncronizing?"); None } } @@ -220,6 +222,7 @@ where status: info.status, }))) } else { + tracing::warn!(%peer_id, "peer not found in peers map when succeeded syncronizing?"); None } } @@ -237,6 +240,7 @@ where status: info.status, }))) } else { + tracing::warn!(%peer_id, "peer not found in peers map when failed syncronizing?"); None } } diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 90914cf50..19bbf2067 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -18,7 +18,7 @@ use ceramic_core::RangeOpen; use ceramic_metrics::Recorder; use futures::{pin_mut, stream::BoxStream, Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; -use tokio::{join, sync::mpsc, time::Instant}; +use tokio::{sync::mpsc, time::Instant}; use tokio_stream::once; use tracing::{instrument, trace, Level}; use uuid::Uuid; @@ -202,8 +202,8 @@ where // // The following sequence occurs to end the conversation: // - // 1. Initator Read determines there is no more work to do when it reads the final - // [`ResponderMessage::RangeResponse`] from the Responder. + // 1. Initator Read determines there is no more work to do when there are no interests in + // common, or it reads the final [`ResponderMessage::RangeResponse`] from the Responder. // 2. Initator Read sends [`ToWrite::Finish`] to the Initator Writer. // 3. Initiator Writer sends the [`InitiatorMessage::Finished`] to the Responder and // completes. @@ -216,9 +216,8 @@ where // This is analogous to the FIN -> FIN ACK sequence in TCP which ensures that boths ends of // the conversation agree it has completed. This prevents a class of bugs where the Initiator // may try and start a new conversation before Responder is aware the previous one has completed. - let (write, read) = join!(write, read); - write?; - read?; + let _res = tokio::try_join!(write, read) + .map_err(|e: anyhow::Error| anyhow!("protocol error: {}", e))?; metrics.record(&ProtocolRun(start.elapsed())); Ok(()) @@ -497,17 +496,25 @@ where ) -> Result { match message { ResponderMessage::InterestResponse(interests) => { - let mut ranges = Vec::with_capacity(interests.len()); - for interest in interests { - ranges.push( - self.common - .recon - .initial_range(interest) - .await - .context("querying initial range")?, - ); + if interests.is_empty() { + to_writer + .send(ToWriter::Finish) + .await + .map_err(|err| anyhow!("{err}")) + .context("sending finish")?; + } else { + let mut ranges = Vec::with_capacity(interests.len()); + for interest in interests { + ranges.push( + self.common + .recon + .initial_range(interest) + .await + .context("querying initial range")?, + ); + } + self.send_ranges(ranges.into_iter(), to_writer).await?; } - self.send_ranges(ranges.into_iter(), to_writer).await?; } ResponderMessage::RangeResponse(ranges) => { self.pending_ranges -= 1; diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index c73c5a4ab..cbd21d546 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -2028,3 +2028,20 @@ async fn partial_interest() { "#]]) .await; } + +#[test(tokio::test)] +async fn no_interest() { + recon_test(expect![[r#" + cat: <(a, d)> [a, b, c] + dog: <(x, z)> [x, y] + -> interest_req((a, d)) + cat: [a, b, c] + <- interest_resp() + dog: [x, y] + -> finished + cat: [a, b, c] + cat: [a, b, c] + dog: [x, y] + "#]]) + .await; +}