Skip to content

Commit

Permalink
compact-block: try to ensure response consistency
Browse files Browse the repository at this point in the history
(cherry picked from commit 516b1a1)
  • Loading branch information
hdevalence authored and conorsch committed Jun 20, 2024
1 parent 574aa05 commit 11ddb05
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions crates/core/component/compact-block/src/component/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use anyhow::bail;
use cnidarium::Storage;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use penumbra_proto::core::component::compact_block::v1::{
query_service_server::QueryService, CompactBlockRangeRequest, CompactBlockRangeResponse,
CompactBlockRequest, CompactBlockResponse,
query_service_server::QueryService, CompactBlock, CompactBlockRangeRequest,
CompactBlockRangeResponse, CompactBlockRequest, CompactBlockResponse,
};
use penumbra_sct::component::clock::EpochRead;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -103,6 +103,11 @@ impl QueryService for Server {

let (tx_blocks, rx_blocks) = mpsc::channel(10);
let tx_blocks_err = tx_blocks.clone();
// Wrap the block sender in a guard that ensures we only send the expected next block
let mut tx_blocks = BlockSender {
next_height: start_height,
inner: tx_blocks,
};
tokio::spawn(
async move {
let _guard = CompactBlockConnectionCounter::new();
Expand Down Expand Up @@ -142,7 +147,7 @@ impl QueryService for Server {
// Future iterations of this work should start by moving block serialization
// outside of the `send_op` future, and investigate if long blocking sends can
// happen for benign reasons (i.e not caused by the client).
tx_blocks.send(Ok(compact_block)).await?;
tx_blocks.send(compact_block).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}

Expand Down Expand Up @@ -171,10 +176,7 @@ impl QueryService for Server {
.await
.expect("no error fetching block")
.expect("compact block for in-range height must be present");
tx_blocks
.send(Ok(block))
.await
.map_err(|_| tonic::Status::cancelled("client closed connection"))?;
tx_blocks.send(block).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}

Expand All @@ -200,10 +202,7 @@ impl QueryService for Server {
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.expect("compact block for in-range height must be present");
tx_blocks
.send(Ok(block))
.await
.map_err(|_| tonic::Status::cancelled("channel closed"))?;
tx_blocks.send(block).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}
}
Expand Down Expand Up @@ -250,3 +249,24 @@ impl Drop for CompactBlockConnectionCounter {
metrics::gauge!(metrics::COMPACT_BLOCK_RANGE_ACTIVE_CONNECTIONS).decrement(1.0);
}
}

/// Stateful wrapper for a mpsc that tracks the outbound height
struct BlockSender {
next_height: u64,
inner: mpsc::Sender<Result<CompactBlock, tonic::Status>>,
}

impl BlockSender {
async fn send(&mut self, block: CompactBlock) -> anyhow::Result<()> {
if block.height != self.next_height {
bail!(
"block height mismatch while sending: expected {}, got {}",
self.next_height,
block.height
);
}
self.inner.send(Ok(block)).await?;
self.next_height += 1;
Ok(())
}
}

0 comments on commit 11ddb05

Please sign in to comment.