diff --git a/crates/core/component/compact-block/src/component/rpc.rs b/crates/core/component/compact-block/src/component/rpc.rs index 13b373bffa..b277e3a6bf 100644 --- a/crates/core/component/compact-block/src/component/rpc.rs +++ b/crates/core/component/compact-block/src/component/rpc.rs @@ -4,8 +4,8 @@ use anyhow::bail; use cnidarium::Storage; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use penumbra_proto::core::component::compact_block::v1::{ - query_service_server::QueryService, CompactBlockRangeRequest, CompactBlockRangeResponse, - CompactBlockRequest, CompactBlockResponse, + query_service_server::QueryService, CompactBlock, CompactBlockRangeRequest, + CompactBlockRangeResponse, CompactBlockRequest, CompactBlockResponse, }; use penumbra_sct::component::clock::EpochRead; use tokio::sync::mpsc; @@ -103,6 +103,11 @@ impl QueryService for Server { let (tx_blocks, rx_blocks) = mpsc::channel(10); let tx_blocks_err = tx_blocks.clone(); + // Wrap the block sender in a guard that ensures we only send the expected next block + let mut tx_blocks = BlockSender { + next_height: start_height, + inner: tx_blocks, + }; tokio::spawn( async move { let _guard = CompactBlockConnectionCounter::new(); @@ -142,7 +147,7 @@ impl QueryService for Server { // Future iterations of this work should start by moving block serialization // outside of the `send_op` future, and investigate if long blocking sends can // happen for benign reasons (i.e not caused by the client). - tx_blocks.send(Ok(compact_block)).await?; + tx_blocks.send(compact_block).await?; metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1); } @@ -171,10 +176,7 @@ impl QueryService for Server { .await .expect("no error fetching block") .expect("compact block for in-range height must be present"); - tx_blocks - .send(Ok(block)) - .await - .map_err(|_| tonic::Status::cancelled("client closed connection"))?; + tx_blocks.send(block).await?; metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1); } @@ -200,10 +202,7 @@ impl QueryService for Server { .await .map_err(|e| tonic::Status::internal(e.to_string()))? .expect("compact block for in-range height must be present"); - tx_blocks - .send(Ok(block)) - .await - .map_err(|_| tonic::Status::cancelled("channel closed"))?; + tx_blocks.send(block).await?; metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1); } } @@ -250,3 +249,24 @@ impl Drop for CompactBlockConnectionCounter { metrics::gauge!(metrics::COMPACT_BLOCK_RANGE_ACTIVE_CONNECTIONS).decrement(1.0); } } + +/// Stateful wrapper for a mpsc that tracks the outbound height +struct BlockSender { + next_height: u64, + inner: mpsc::Sender>, +} + +impl BlockSender { + async fn send(&mut self, block: CompactBlock) -> anyhow::Result<()> { + if block.height != self.next_height { + bail!( + "block height mismatch while sending: expected {}, got {}", + self.next_height, + block.height + ); + } + self.inner.send(Ok(block)).await?; + self.next_height += 1; + Ok(()) + } +}