From d6b91a82dddfbdbce44315355a00eb23461ea10f Mon Sep 17 00:00:00 2001 From: Richard Janis Goldschmidt Date: Mon, 27 May 2024 19:22:30 +0200 Subject: [PATCH] feat(conductor)!: rate limit sequencer cometbft requests (#1068) ## Summary Limits the number of requests conductor sends to the Sequencer CometBFT endpoint to 100 per minute. ## Background During sync conductor can DOS Sequencer's CometBFT node by sending too many requests for commits and validator sets. With the batching logic introduced in https://github.com/astriaorg/astria/issues/1049 there can be dozens (or more) blocks stored in each Celestia blob, each of which needs to be checked separately. With several blobs being fetched at once during, this can quickly spiral into hundreds (if not thousands) requests per minute. Note that only calls to `/commit` and `/validators` are rate limited, because there is currently no way to enforce this at the transport layer, see this issue: https://github.com/informalsystems/tendermint-rs/issues/1420 However, the only other calls are to `/genesis` (once at startup), and `/abci_info` (every block-time period, usually every 2 seconds), which is rare enough to not need a rate limit. ## Changes - Use a tower `RateLimit` middleware around a tendermint-rs `HttpClient` only send up to 100 requests per minute. ## Breaking changes - Adds an environment variable `ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND` to configure rate-limiting of requests sent to the Sequencer CometBFT node for verification of Sequencer block data fetched from Celestia blobs ## Testing This needs to be observed end-to-end, potentially letting conductor run for a very long time with only soft commits, and then turning firm commits on. ## Related Issues closes https://github.com/astriaorg/astria/issues/1064 --------- Co-authored-by: Jordan Oroshiba --- Cargo.lock | 3 + charts/evm-rollup/Chart.yaml | 2 +- charts/evm-rollup/templates/configmap.yaml | 1 + crates/astria-conductor/Cargo.toml | 3 + crates/astria-conductor/local.env.example | 6 + .../astria-conductor/src/celestia/builder.rs | 5 +- crates/astria-conductor/src/celestia/mod.rs | 16 +- .../astria-conductor/src/celestia/verify.rs | 251 +++++++++++++++--- crates/astria-conductor/src/conductor.rs | 1 + crates/astria-conductor/src/config.rs | 3 + .../tests/blackbox/helpers/mod.rs | 1 + 11 files changed, 244 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cce2c78ebc..84c3eb3c4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,11 +605,14 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", + "tendermint 0.34.1", + "tendermint-rpc", "thiserror", "tokio", "tokio-stream", "tokio-util 0.7.10", "tonic 0.10.2", + "tower", "tracing", "tracing-futures", "tryhard", diff --git a/charts/evm-rollup/Chart.yaml b/charts/evm-rollup/Chart.yaml index 281aebb3dc..b79b9fac56 100644 --- a/charts/evm-rollup/Chart.yaml +++ b/charts/evm-rollup/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.18.3 +version: 0.18.4 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/evm-rollup/templates/configmap.yaml b/charts/evm-rollup/templates/configmap.yaml index b47a5081d0..c48315a8c0 100644 --- a/charts/evm-rollup/templates/configmap.yaml +++ b/charts/evm-rollup/templates/configmap.yaml @@ -34,6 +34,7 @@ data: OTEL_SERVICE_NAME: "{{ tpl .Values.config.rollup.otel.serviceNamePrefix . }}-conductor" {{- if not .Values.global.dev }} {{- else }} + ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND: "500" {{- end }} --- apiVersion: v1 diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index 4fa64f1f93..ff821f3ec9 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -43,6 +43,8 @@ rand = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = { workspace = true } +tendermint = { workspace = true } +tendermint-rpc = { workspace = true, features = ["http-client"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tokio-util = { workspace = true, features = ["rt"] } @@ -57,6 +59,7 @@ pin-project-lite = "0.2" tokio-stream = "0.1.14" tracing-futures = { version = "0.2.5", features = ["futures-03"] } moka = { version = "0.12.5", features = ["future"] } +tower = { version = "0.4.13", features = ["limit"] } ### Celestia specific imports # diff --git a/crates/astria-conductor/local.env.example b/crates/astria-conductor/local.env.example index deb57d0619..ee3c50e7f6 100644 --- a/crates/astria-conductor/local.env.example +++ b/crates/astria-conductor/local.env.example @@ -61,6 +61,12 @@ ASTRIA_CONDUCTOR_SEQUENCER_COMETBFT_URL="http://127.0.0.1:26657" # A block time of 2000 is the default for sequencer. ASTRIA_CONDUCTOR_SEQUENCER_BLOCK_TIME_MS=2000 +# The number of requests per second that will be sent to the connected sequencer node. +# Note that right now this is only rate limiting requests related to verifying +# Sequencer block information retrieved from Celestia, as to not overwhelm Sequencer's +# CometBFT node. +ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND=500 + # Set to true to enable prometheus metrics. ASTRIA_CONDUCTOR_NO_METRICS=true diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 9401f2e4dd..99cf090c92 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -7,7 +7,7 @@ use astria_eyre::eyre::{ WrapErr as _, }; use jsonrpsee::http_client::HttpClient as CelestiaClient; -use sequencer_client::HttpClient as SequencerClient; +use tendermint_rpc::HttpClient as SequencerClient; use tokio_util::sync::CancellationToken; use super::Reader; @@ -19,6 +19,7 @@ pub(crate) struct Builder { pub(crate) celestia_token: String, pub(crate) executor: executor::Handle, pub(crate) sequencer_cometbft_client: SequencerClient, + pub(crate) sequencer_requests_per_second: u32, pub(crate) shutdown: CancellationToken, } @@ -31,6 +32,7 @@ impl Builder { celestia_token, executor, sequencer_cometbft_client, + sequencer_requests_per_second, shutdown, } = self; @@ -42,6 +44,7 @@ impl Builder { celestia_client, executor, sequencer_cometbft_client, + sequencer_requests_per_second, shutdown, }) } diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index db32c69152..3979b0e337 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -83,7 +83,7 @@ use self::{ latest_height_stream::stream_latest_heights, reconstruct::reconstruct_blocks_from_verified_blobs, verify::{ - verify_headers, + verify_metadata, BlobVerifier, }, }; @@ -123,7 +123,7 @@ pub(super) struct ReconstructedBlocks { pub(crate) struct Reader { celestia_block_time: Duration, - // Client to fetch heights and blocks from Celestia. + /// Client to fetch heights and blocks from Celestia. celestia_client: CelestiaClient, /// The channel used to send messages to the executor task. @@ -132,6 +132,10 @@ pub(crate) struct Reader { /// The client to get the sequencer namespace and verify blocks. sequencer_cometbft_client: SequencerClient, + /// The number of requests per second that will be sent to Sequencer + /// (usually to verify block data retrieved from Celestia blobs). + sequencer_requests_per_second: u32, + /// Token to listen for Conductor being shut down. shutdown: CancellationToken, } @@ -249,6 +253,7 @@ impl RunningReader { celestia_client, sequencer_cometbft_client, shutdown, + sequencer_requests_per_second, .. } = exposed_reader; let block_cache = @@ -267,7 +272,10 @@ impl RunningReader { Ok(Self { block_cache, - blob_verifier: Arc::new(BlobVerifier::new(sequencer_cometbft_client)), + blob_verifier: Arc::new( + BlobVerifier::try_new(sequencer_cometbft_client, sequencer_requests_per_second) + .wrap_err("failed to construct blob verifier")?, + ), celestia_client, enqueued_block: Fuse::terminated(), executor, @@ -527,7 +535,7 @@ impl FetchConvertVerifyAndReconstruct { "decoded Sequencer header and rollup info from raw Celestia blobs", ); - let verified_blobs = verify_headers(blob_verifier, decoded_blobs).await; + let verified_blobs = verify_metadata(blob_verifier, decoded_blobs).await; info!( number_of_verified_header_blobs = verified_blobs.len_header_blobs(), diff --git a/crates/astria-conductor/src/celestia/verify.rs b/crates/astria-conductor/src/celestia/verify.rs index 48d658a46e..b0b814bf99 100644 --- a/crates/astria-conductor/src/celestia/verify.rs +++ b/crates/astria-conductor/src/celestia/verify.rs @@ -27,6 +27,12 @@ use sequencer_client::{ }; use telemetry::display::base64; use tokio_util::task::JoinMap; +use tower::{ + util::BoxService, + BoxError, + Service as _, + ServiceExt as _, +}; use tracing::{ info, instrument, @@ -44,7 +50,6 @@ use super::{ block_verifier, convert::ConvertedBlobs, }; -use crate::utils::flatten; pub(super) struct VerifiedBlobs { celestia_height: u64, @@ -86,7 +91,7 @@ struct VerificationTaskKey { /// /// Drops blobs that could not be verified. #[instrument(skip_all)] -pub(super) async fn verify_headers( +pub(super) async fn verify_metadata( blob_verifier: Arc, converted_blobs: ConvertedBlobs, ) -> VerifiedBlobs { @@ -102,13 +107,16 @@ pub(super) async fn verify_headers( block_hash: blob.block_hash(), sequencer_height: blob.height(), }, - blob_verifier.clone().verify_header(blob).in_current_span(), + blob_verifier + .clone() + .verify_metadata(blob) + .in_current_span(), ); } while let Some((key, verification_result)) = verification_tasks.join_next().await { - match flatten(verification_result) { - Ok(verified_blob) => { + match verification_result { + Ok(Some(verified_blob)) => { if let Some(dropped_entry) = verified_header_blobs.insert(verified_blob.block_hash(), verified_blob) { @@ -125,12 +133,13 @@ pub(super) async fn verify_headers( ); } } + Ok(None) => {} Err(error) => { info!( block_hash = %base64(&key.block_hash), sequencer_height = %key.sequencer_height, %error, - "verification of sequencer blob failed; dropping it" + "verification of sequencer blob was cancelled abruptly; dropping it" ); } } @@ -181,11 +190,11 @@ struct VerificationMeta { impl VerificationMeta { async fn fetch( - client: SequencerClient, + client: RateLimitedVerificationClient, height: SequencerHeight, - ) -> Result { + ) -> Result { if height.value() == 0 { - return Err(VerificationMetaError::CantVerifyHeightZero); + return Err(VerificationMetaError::CantVerifyHeightZero.into()); } let prev_height = SequencerHeight::try_from(height.value().saturating_sub(1)).expect( "BUG: should always be able to convert a decremented cometbft height back to its \ @@ -194,8 +203,8 @@ impl VerificationMeta { decades and the chain's height is greater u32::MAX)", ); let (commit_response, validators_response) = tokio::try_join!( - fetch_commit_with_retry(client.clone(), height), - fetch_validators_with_retry(client.clone(), prev_height, height), + client.clone().get_commit(height), + client.clone().get_validators(prev_height, height), )?; super::ensure_commit_has_quorum( &commit_response.signed_header.commit, @@ -216,53 +225,66 @@ impl VerificationMeta { pub(super) struct BlobVerifier { cache: Cache, - sequencer_cometbft_client: SequencerClient, + client: RateLimitedVerificationClient, } impl BlobVerifier { - pub(super) fn new(sequencer_cometbft_client: SequencerClient) -> Self { - Self { + pub(super) fn try_new( + client: SequencerClient, + requests_per_seconds: u32, + ) -> eyre::Result { + Ok(Self { // Cache for verifying 1_000 celestia heights, assuming 6 sequencer heights per Celestia // height cache: Cache::new(6_000), - sequencer_cometbft_client, - } + client: RateLimitedVerificationClient::try_new(client, requests_per_seconds) + .wrap_err("failed to construct Sequencer block client")?, + }) } - async fn verify_header( + /// Verifies `metadata` against a remote Sequencer CometBFT instance. + /// + /// *Implementation note:* because [`Cache::try_get_with`] returns an + /// `Arc` in error position (due to [`RateLimitedVerificationClient`]), + /// this method cannot return an `eyre::Result` but needs to emit all errors + /// it encounters. + #[instrument(skip_all)] + async fn verify_metadata( self: Arc, - header: SubmittedMetadata, - ) -> eyre::Result { - use base64::prelude::*; - let height = header.height(); - let meta = self + metadata: SubmittedMetadata, + ) -> Option { + let height = metadata.height(); + let cached = self .cache - .try_get_with( - height, - VerificationMeta::fetch(self.sequencer_cometbft_client.clone(), height), - ) + .try_get_with(height, VerificationMeta::fetch(self.client.clone(), height)) .await - .wrap_err("failed getting data necessary to verify the sequencer header blob")?; - ensure!( - &meta.commit_header.header.chain_id == header.cometbft_chain_id(), - "expected cometbft chain ID `{}`, got `{}`", - meta.commit_header.header.chain_id, - header.cometbft_chain_id(), - ); - ensure!( - meta.commit_header.commit.block_id.hash.as_bytes() == header.block_hash(), - "block hash `{}` stored in blob does not match block hash `{}` of sequencer block", - BASE64_STANDARD.encode(header.block_hash()), - BASE64_STANDARD.encode(meta.commit_header.commit.block_id.hash.as_bytes()), - ); - Ok(header) + .inspect_err(|e| { + warn!( + error = %e.as_ref(), + "failed getting data necessary to verify the sequencer metadata retrieved from Celestia", + ); + }) + .ok()?; + if let Err(error) = ensure_chain_ids_match( + cached.commit_header.header.chain_id.as_str(), + metadata.cometbft_chain_id().as_str(), + ) + .and_then(|()| { + ensure_block_hashes_match( + cached.commit_header.commit.block_id.hash.as_bytes(), + &metadata.block_hash(), + ) + }) { + info!(reason = %error, "failed to verify metadata retrieved from Celestia; dropping it"); + } + Some(metadata) } } async fn fetch_commit_with_retry( client: SequencerClient, height: SequencerHeight, -) -> Result { +) -> Result { let retry_config = RetryFutureConfig::new(u32::MAX) .custom_backoff(CometBftRetryStrategy::new(Duration::from_millis(100))) .max_delay(Duration::from_secs(10)) @@ -286,6 +308,7 @@ async fn fetch_commit_with_retry( }) .with_config(retry_config) .await + .map(Into::into) .map_err(|source| VerificationMetaError::FetchCommit { height, source, @@ -296,7 +319,7 @@ async fn fetch_validators_with_retry( client: SequencerClient, prev_height: SequencerHeight, height: SequencerHeight, -) -> Result { +) -> Result { let retry_config = RetryFutureConfig::new(u32::MAX) .custom_backoff(CometBftRetryStrategy::new(Duration::from_millis(100))) .max_delay(Duration::from_secs(10)) @@ -324,6 +347,7 @@ async fn fetch_validators_with_retry( }) .with_config(retry_config) .await + .map(Into::into) .map_err(|source| VerificationMetaError::FetchValidators { height, prev_height, @@ -368,3 +392,146 @@ fn should_retry(error: &tendermint_rpc::Error) -> bool { Http(..) | HttpRequestFailed(..) | Timeout(..) ) } + +enum VerificationRequest { + Commit { + height: SequencerHeight, + }, + Validators { + prev_height: SequencerHeight, + height: SequencerHeight, + }, +} + +#[derive(Debug)] +enum VerificationResponse { + Commit(Box), + Validators(Box), +} + +impl From for VerificationResponse { + fn from(value: tendermint_rpc::endpoint::commit::Response) -> Self { + Self::Commit(Box::new(value)) + } +} + +impl From for VerificationResponse { + fn from(value: tendermint_rpc::endpoint::validators::Response) -> Self { + Self::Validators(Box::new(value)) + } +} + +#[derive(Clone)] +struct RateLimitedVerificationClient { + inner: tower::buffer::Buffer< + BoxService, + VerificationRequest, + >, +} + +impl RateLimitedVerificationClient { + async fn get_commit( + mut self, + height: SequencerHeight, + ) -> Result, BoxError> { + // allow: it is desired that the wildcard matches all future added variants because + // this call must only return a single specific variant, panicking otherwise. + #[allow(clippy::match_wildcard_for_single_variants)] + match self + .inner + .ready() + .await? + .call(VerificationRequest::Commit { + height, + }) + .await? + { + VerificationResponse::Commit(commit) => Ok(commit), + other => panic!("expected VerificationResponse::Commit, got {other:?}"), + } + } + + async fn get_validators( + mut self, + prev_height: SequencerHeight, + height: SequencerHeight, + ) -> Result, BoxError> { + // allow: it is desired that the wildcard matches all future added variants because + // this call must only return a single specific variant, panicking otherwise. + #[allow(clippy::match_wildcard_for_single_variants)] + match self + .inner + .ready() + .await? + .call(VerificationRequest::Validators { + prev_height, + height, + }) + .await? + { + VerificationResponse::Validators(validators) => Ok(validators), + other => panic!("expected VerificationResponse::Validators, got {other:?}"), + } + } + + fn try_new(client: SequencerClient, requests_per_second: u32) -> eyre::Result { + // XXX: the construction in here is a bit strange: + // the straight forward way to create a type-erased tower service is to use + // ServiceBuilder::boxed_clone(). + // + // However, this gives a BoxCloneService which is always Clone + Send, but !Sync. + // Therefore we can't use the ServiceBuilder::buffer adapter. + // + // We can however work around it: ServiceBuilder::boxed gives a BoxService, which is + // Send + Sync, but not Clone. We then manually evoke Buffer::new to create a + // Buffer, which is Send + Sync + Clone. + let service = tower::ServiceBuilder::new() + .boxed() + .rate_limit(requests_per_second.into(), Duration::from_secs(1)) + .service_fn(move |req: VerificationRequest| { + let client = client.clone(); + async move { + match req { + VerificationRequest::Commit { + height, + } => fetch_commit_with_retry(client, height).await, + VerificationRequest::Validators { + prev_height, + height, + } => fetch_validators_with_retry(client, prev_height, height).await, + } + } + }); + // XXX: This number is arbitarily set to the same number os the rate-limit. Does that + // make sense? Should the number be set higher? + let inner = tower::buffer::Buffer::new( + service, + requests_per_second + .try_into() + .wrap_err("failed to convert u32 requests-per-second to usize")?, + ); + Ok(Self { + inner, + }) + } +} + +fn ensure_chain_ids_match(in_commit: &str, in_header: &str) -> eyre::Result<()> { + ensure!( + in_commit == in_header, + "expected cometbft chain ID `{in_commit}` (from commit), but found `{in_header}` in \ + retrieved metadata" + ); + Ok(()) +} + +fn ensure_block_hashes_match(in_commit: &[u8], in_header: &[u8]) -> eyre::Result<()> { + use base64::prelude::*; + ensure!( + in_commit == in_header, + "expected block hash `{}` (from commit), but found `{}` in retrieved metadata", + BASE64_STANDARD.encode(in_commit), + BASE64_STANDARD.encode(in_header), + ); + Ok(()) +} diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 5c44971eee..4945fae975 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -141,6 +141,7 @@ impl Conductor { celestia_block_time: Duration::from_millis(cfg.celestia_block_time_ms), executor: executor_handle.clone(), sequencer_cometbft_client: sequencer_cometbft_client.clone(), + sequencer_requests_per_second: cfg.sequencer_requests_per_second, shutdown: shutdown.clone(), } .build() diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index 081834830d..699f95f1c9 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -55,6 +55,9 @@ pub struct Config { pub sequencer_block_time_ms: u64, + /// The number of requests per second that will be sent to Sequencer. + pub sequencer_requests_per_second: u32, + /// Address of the RPC server for execution pub execution_rpc_url: String, diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 1f43ce7ab7..3f9c2701dc 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -458,6 +458,7 @@ fn make_config() -> Config { celestia_bearer_token: CELESTIA_BEARER_TOKEN.into(), sequencer_grpc_url: "http://127.0.0.1:8080".into(), sequencer_cometbft_url: "http://127.0.0.1:26657".into(), + sequencer_requests_per_second: 500, sequencer_block_time_ms: 2000, execution_rpc_url: "http://127.0.0.1:50051".into(), log: "info".into(),