Skip to content

Commit

Permalink
feat(conductor)!: rate limit sequencer cometbft requests (#1068)
Browse files Browse the repository at this point in the history
## Summary
Limits the number of requests conductor sends to the Sequencer CometBFT
endpoint to 100 per minute.

## Background
During sync conductor can DOS Sequencer's CometBFT node by sending too
many requests for commits and validator sets. With the batching logic
introduced in #1049 there can
be dozens (or more) blocks stored in each Celestia blob, each of which
needs to be checked separately. With several blobs being fetched at once
during, this can quickly spiral into hundreds (if not thousands)
requests per minute.

Note that only calls to `/commit` and `/validators` are rate limited,
because there is currently no way to enforce this at the transport
layer, see this issue:
informalsystems/tendermint-rs#1420

However, the only other calls are to `/genesis` (once at startup), and
`/abci_info` (every block-time period, usually every 2 seconds), which
is rare enough to not need a rate limit.

## Changes
- Use a tower `RateLimit` middleware around a tendermint-rs `HttpClient`
only send up to 100 requests per minute.

## Breaking changes
- Adds an environment variable
`ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND` to configure
rate-limiting of requests sent to the Sequencer CometBFT node for
verification of Sequencer block data fetched from Celestia blobs

## Testing
This needs to be observed end-to-end, potentially letting conductor run
for a very long time with only soft commits, and then turning firm
commits on.

## Related Issues
closes #1064

---------

Co-authored-by: Jordan Oroshiba <[email protected]>
  • Loading branch information
SuperFluffy and joroshiba committed May 27, 2024
1 parent 8067367 commit d6b91a8
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 48 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/evm-rollup/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.18.3
version: 0.18.4

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
1 change: 1 addition & 0 deletions charts/evm-rollup/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.config.rollup.otel.serviceNamePrefix . }}-conductor"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND: "500"
{{- end }}
---
apiVersion: v1
Expand Down
3 changes: 3 additions & 0 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
tendermint = { workspace = true }
tendermint-rpc = { workspace = true, features = ["http-client"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = { workspace = true, features = ["rt"] }
Expand All @@ -57,6 +59,7 @@ pin-project-lite = "0.2"
tokio-stream = "0.1.14"
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
moka = { version = "0.12.5", features = ["future"] }
tower = { version = "0.4.13", features = ["limit"] }

### Celestia specific imports
#
Expand Down
6 changes: 6 additions & 0 deletions crates/astria-conductor/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ ASTRIA_CONDUCTOR_SEQUENCER_COMETBFT_URL="http://127.0.0.1:26657"
# A block time of 2000 is the default for sequencer.
ASTRIA_CONDUCTOR_SEQUENCER_BLOCK_TIME_MS=2000

# The number of requests per second that will be sent to the connected sequencer node.
# Note that right now this is only rate limiting requests related to verifying
# Sequencer block information retrieved from Celestia, as to not overwhelm Sequencer's
# CometBFT node.
ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND=500

# Set to true to enable prometheus metrics.
ASTRIA_CONDUCTOR_NO_METRICS=true

Expand Down
5 changes: 4 additions & 1 deletion crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use astria_eyre::eyre::{
WrapErr as _,
};
use jsonrpsee::http_client::HttpClient as CelestiaClient;
use sequencer_client::HttpClient as SequencerClient;
use tendermint_rpc::HttpClient as SequencerClient;
use tokio_util::sync::CancellationToken;

use super::Reader;
Expand All @@ -19,6 +19,7 @@ pub(crate) struct Builder {
pub(crate) celestia_token: String,
pub(crate) executor: executor::Handle,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) shutdown: CancellationToken,
}

Expand All @@ -31,6 +32,7 @@ impl Builder {
celestia_token,
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
shutdown,
} = self;

Expand All @@ -42,6 +44,7 @@ impl Builder {
celestia_client,
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
shutdown,
})
}
Expand Down
16 changes: 12 additions & 4 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use self::{
latest_height_stream::stream_latest_heights,
reconstruct::reconstruct_blocks_from_verified_blobs,
verify::{
verify_headers,
verify_metadata,
BlobVerifier,
},
};
Expand Down Expand Up @@ -123,7 +123,7 @@ pub(super) struct ReconstructedBlocks {
pub(crate) struct Reader {
celestia_block_time: Duration,

// Client to fetch heights and blocks from Celestia.
/// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
Expand All @@ -132,6 +132,10 @@ pub(crate) struct Reader {
/// The client to get the sequencer namespace and verify blocks.
sequencer_cometbft_client: SequencerClient,

/// The number of requests per second that will be sent to Sequencer
/// (usually to verify block data retrieved from Celestia blobs).
sequencer_requests_per_second: u32,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,
}
Expand Down Expand Up @@ -249,6 +253,7 @@ impl RunningReader {
celestia_client,
sequencer_cometbft_client,
shutdown,
sequencer_requests_per_second,
..
} = exposed_reader;
let block_cache =
Expand All @@ -267,7 +272,10 @@ impl RunningReader {

Ok(Self {
block_cache,
blob_verifier: Arc::new(BlobVerifier::new(sequencer_cometbft_client)),
blob_verifier: Arc::new(
BlobVerifier::try_new(sequencer_cometbft_client, sequencer_requests_per_second)
.wrap_err("failed to construct blob verifier")?,
),
celestia_client,
enqueued_block: Fuse::terminated(),
executor,
Expand Down Expand Up @@ -527,7 +535,7 @@ impl FetchConvertVerifyAndReconstruct {
"decoded Sequencer header and rollup info from raw Celestia blobs",
);

let verified_blobs = verify_headers(blob_verifier, decoded_blobs).await;
let verified_blobs = verify_metadata(blob_verifier, decoded_blobs).await;

info!(
number_of_verified_header_blobs = verified_blobs.len_header_blobs(),
Expand Down
Loading

0 comments on commit d6b91a8

Please sign in to comment.