Skip to content

Commit

Permalink
fix: recon protocol stall with no shared interests (#367)
Browse files Browse the repository at this point in the history
* fix: protocol hang with no interest overlap

* chore: minor clean up

* fix: remove unnecessary early return

The writer will get the finished message and drop the sink, which will close the stream and end the read
  • Loading branch information
dav1do authored May 23, 2024
1 parent 0e5ea35 commit df8e66c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 19 deletions.
4 changes: 2 additions & 2 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,14 +744,14 @@ where
async fn interests_sort_key_sort_value_post(
&self,
sep_key: String,
seplue: String,
sep_value: String,
controller: Option<String>,
stream_id: Option<String>,
_context: &C,
) -> Result<InterestsSortKeySortValuePostResponse, ApiError> {
let interest = models::Interest {
sep: sep_key,
sep_value: seplue,
sep_value,
controller,
stream_id,
};
Expand Down
1 change: 0 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ impl Daemon {
metrics::start(&opts.metrics_bind_address.parse()?);

// Build HTTP server
let network = network.clone();
let ceramic_server = ceramic_api::Server::new(
peer_id,
network,
Expand Down
4 changes: 4 additions & 0 deletions recon/src/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ where
status: info.status,
})))
} else {
tracing::warn!(%peer_id, "peer not found in peers map when started syncronizing?");
None
}
}
Expand All @@ -204,6 +205,7 @@ where
status: info.status,
})))
} else {
tracing::warn!(%peer_id, "peer not found in peers map when stopped syncronizing?");
None
}
}
Expand All @@ -220,6 +222,7 @@ where
status: info.status,
})))
} else {
tracing::warn!(%peer_id, "peer not found in peers map when succeeded syncronizing?");
None
}
}
Expand All @@ -237,6 +240,7 @@ where
status: info.status,
})))
} else {
tracing::warn!(%peer_id, "peer not found in peers map when failed syncronizing?");
None
}
}
Expand Down
39 changes: 23 additions & 16 deletions recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ceramic_core::RangeOpen;
use ceramic_metrics::Recorder;
use futures::{pin_mut, stream::BoxStream, Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::{join, sync::mpsc, time::Instant};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::once;
use tracing::{instrument, trace, Level};
use uuid::Uuid;
Expand Down Expand Up @@ -202,8 +202,8 @@ where
//
// The following sequence occurs to end the conversation:
//
// 1. Initator Read determines there is no more work to do when it reads the final
// [`ResponderMessage::RangeResponse`] from the Responder.
// 1. Initator Read determines there is no more work to do when there are no interests in
// common, or it reads the final [`ResponderMessage::RangeResponse`] from the Responder.
// 2. Initator Read sends [`ToWrite::Finish`] to the Initator Writer.
// 3. Initiator Writer sends the [`InitiatorMessage::Finished`] to the Responder and
// completes.
Expand All @@ -216,9 +216,8 @@ where
// This is analogous to the FIN -> FIN ACK sequence in TCP which ensures that boths ends of
// the conversation agree it has completed. This prevents a class of bugs where the Initiator
// may try and start a new conversation before Responder is aware the previous one has completed.
let (write, read) = join!(write, read);
write?;
read?;
let _res = tokio::try_join!(write, read)
.map_err(|e: anyhow::Error| anyhow!("protocol error: {}", e))?;

metrics.record(&ProtocolRun(start.elapsed()));
Ok(())
Expand Down Expand Up @@ -497,17 +496,25 @@ where
) -> Result<RemoteStatus> {
match message {
ResponderMessage::InterestResponse(interests) => {
let mut ranges = Vec::with_capacity(interests.len());
for interest in interests {
ranges.push(
self.common
.recon
.initial_range(interest)
.await
.context("querying initial range")?,
);
if interests.is_empty() {
to_writer
.send(ToWriter::Finish)
.await
.map_err(|err| anyhow!("{err}"))
.context("sending finish")?;
} else {
let mut ranges = Vec::with_capacity(interests.len());
for interest in interests {
ranges.push(
self.common
.recon
.initial_range(interest)
.await
.context("querying initial range")?,
);
}
self.send_ranges(ranges.into_iter(), to_writer).await?;
}
self.send_ranges(ranges.into_iter(), to_writer).await?;
}
ResponderMessage::RangeResponse(ranges) => {
self.pending_ranges -= 1;
Expand Down
17 changes: 17 additions & 0 deletions recon/src/recon/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2028,3 +2028,20 @@ async fn partial_interest() {
"#]])
.await;
}

#[test(tokio::test)]
async fn no_interest() {
recon_test(expect![[r#"
cat: <(a, d)> [a, b, c]
dog: <(x, z)> [x, y]
-> interest_req((a, d))
cat: [a, b, c]
<- interest_resp()
dog: [x, y]
-> finished
cat: [a, b, c]
cat: [a, b, c]
dog: [x, y]
"#]])
.await;
}

0 comments on commit df8e66c

Please sign in to comment.