Skip to content

Commit

Permalink
chore(sequencer-relayer): change blob submitter to use boxed blocks (#…
Browse files Browse the repository at this point in the history
…1863)

## Summary
Changed block channel to use boxed sequencer blocks.

## Background
Running clippy with Rust 1.83.0 in #1857 triggered a lint for large
error variant in the send methods for `BlobSubmitterHandle`. Large enum
variants (including in Results) should be avoided because they are only
as small as their largest variant:
https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err.
Creating the channel with a boxed block tackles this problem at its
source.

## Changes
- Boxed `SequencerBlock` in `BlobSubmitterHandle`, which tackles the
problem of potentially large send errors at its source.

## Testing
Passing all tests

## Changelogs
No updates required.

## Breaking Changes
Overridden code freeze since this is a very small, non breaking change
that shouldn't have any bearing since our previous audit.

## Related Issues
closes #1860
  • Loading branch information
ethanoroshiba authored Dec 16, 2024
1 parent bc2569a commit 85d356c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
11 changes: 6 additions & 5 deletions crates/astria-sequencer-relayer/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ use crate::{
IncludeRollup,
};

type ForwardFut<'a> =
Fuse<BoxFuture<'a, Result<(), tokio::sync::mpsc::error::SendError<Box<SequencerBlock>>>>>;

pub(crate) struct Relayer {
/// A token to notify relayer that it should shut down.
#[expect(
Expand Down Expand Up @@ -172,7 +175,7 @@ impl Relayer {
// future to forward a sequencer block to the celestia-submission-task.
// gets set in the select-loop if the task is at capacity.
let mut forward_once_free: Fuse<
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<SequencerBlock>>>,
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<Box<SequencerBlock>>>>,
> = Fuse::terminated();

self.state.set_ready();
Expand Down Expand Up @@ -282,17 +285,15 @@ impl Relayer {
block: SequencerBlock,
block_stream: &mut read::BlockStream,
submitter: write::BlobSubmitterHandle,
forward: &mut Fuse<
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<SequencerBlock>>>,
>,
forward: &mut ForwardFut,
) -> eyre::Result<()> {
assert!(
forward.is_terminated(),
"block stream must be paused and not yield blocks when the blob submitter is \
congested and this future is in-flight",
);

if let Err(error) = submitter.try_send(block) {
if let Err(error) = submitter.try_send(Box::new(block)) {
debug!(
// Just print the error directly: TrySendError has no cause chain.
%error,
Expand Down
14 changes: 7 additions & 7 deletions crates/astria-sequencer-relayer/src/relayer/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct StartedSubmissionAndFee {

#[derive(Clone)]
pub(super) struct BlobSubmitterHandle {
tx: mpsc::Sender<SequencerBlock>,
tx: mpsc::Sender<Box<SequencerBlock>>,
}

impl BlobSubmitterHandle {
Expand All @@ -95,8 +95,8 @@ impl BlobSubmitterHandle {
/// This is a thin wrapper around [`mpsc::Sender::try_send`].
pub(super) fn try_send(
&self,
block: SequencerBlock,
) -> Result<(), TrySendError<SequencerBlock>> {
block: Box<SequencerBlock>,
) -> Result<(), TrySendError<Box<SequencerBlock>>> {
self.tx.try_send(block)
}

Expand All @@ -105,8 +105,8 @@ impl BlobSubmitterHandle {
/// This is a thin wrapper around [`mpsc::Sender::send`].
pub(super) async fn send(
&self,
block: SequencerBlock,
) -> Result<(), SendError<SequencerBlock>> {
block: Box<SequencerBlock>,
) -> Result<(), SendError<Box<SequencerBlock>>> {
self.tx.send(block).await
}
}
Expand All @@ -116,7 +116,7 @@ pub(super) struct BlobSubmitter {
client_builder: CelestiaClientBuilder,

/// The channel over which sequencer blocks are received.
blocks: mpsc::Receiver<SequencerBlock>,
blocks: mpsc::Receiver<Box<SequencerBlock>>,

/// The accumulator of all data that will be submitted to Celestia on the next submission.
next_submission: NextSubmission,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl BlobSubmitter {
sequencer_height = %block.height(),
"skipping sequencer block as already included in previous submission"
));
} else if let Err(error) = self.add_sequencer_block_to_next_submission(block) {
} else if let Err(error) = self.add_sequencer_block_to_next_submission(*block) {
break Err(error).wrap_err(
"critically failed adding Sequencer block to next submission"
);
Expand Down

0 comments on commit 85d356c

Please sign in to comment.