diff --git a/Cargo.lock b/Cargo.lock index cce2c78ebc..84c3eb3c4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,11 +605,14 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", + "tendermint 0.34.1", + "tendermint-rpc", "thiserror", "tokio", "tokio-stream", "tokio-util 0.7.10", "tonic 0.10.2", + "tower", "tracing", "tracing-futures", "tryhard", diff --git a/charts/evm-rollup/Chart.yaml b/charts/evm-rollup/Chart.yaml index 281aebb3dc..b79b9fac56 100644 --- a/charts/evm-rollup/Chart.yaml +++ b/charts/evm-rollup/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.18.3 +version: 0.18.4 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/evm-rollup/templates/configmap.yaml b/charts/evm-rollup/templates/configmap.yaml index b47a5081d0..c48315a8c0 100644 --- a/charts/evm-rollup/templates/configmap.yaml +++ b/charts/evm-rollup/templates/configmap.yaml @@ -34,6 +34,7 @@ data: OTEL_SERVICE_NAME: "{{ tpl .Values.config.rollup.otel.serviceNamePrefix . }}-conductor" {{- if not .Values.global.dev }} {{- else }} + ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND: "500" {{- end }} --- apiVersion: v1 diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index 4fa64f1f93..ff821f3ec9 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -43,6 +43,8 @@ rand = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = { workspace = true } +tendermint = { workspace = true } +tendermint-rpc = { workspace = true, features = ["http-client"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tokio-util = { workspace = true, features = ["rt"] } @@ -57,6 +59,7 @@ pin-project-lite = "0.2" tokio-stream = "0.1.14" tracing-futures = { version = "0.2.5", features = ["futures-03"] } moka = { version = "0.12.5", features = ["future"] } +tower = { version = "0.4.13", features = ["limit"] } ### Celestia specific imports # diff --git a/crates/astria-conductor/local.env.example b/crates/astria-conductor/local.env.example index deb57d0619..ee3c50e7f6 100644 --- a/crates/astria-conductor/local.env.example +++ b/crates/astria-conductor/local.env.example @@ -61,6 +61,12 @@ ASTRIA_CONDUCTOR_SEQUENCER_COMETBFT_URL="http://127.0.0.1:26657" # A block time of 2000 is the default for sequencer. ASTRIA_CONDUCTOR_SEQUENCER_BLOCK_TIME_MS=2000 +# The number of requests per second that will be sent to the connected sequencer node. +# Note that right now this is only rate limiting requests related to verifying +# Sequencer block information retrieved from Celestia, as to not overwhelm Sequencer's +# CometBFT node. +ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND=500 + # Set to true to enable prometheus metrics. ASTRIA_CONDUCTOR_NO_METRICS=true diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 9401f2e4dd..99cf090c92 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -7,7 +7,7 @@ use astria_eyre::eyre::{ WrapErr as _, }; use jsonrpsee::http_client::HttpClient as CelestiaClient; -use sequencer_client::HttpClient as SequencerClient; +use tendermint_rpc::HttpClient as SequencerClient; use tokio_util::sync::CancellationToken; use super::Reader; @@ -19,6 +19,7 @@ pub(crate) struct Builder { pub(crate) celestia_token: String, pub(crate) executor: executor::Handle, pub(crate) sequencer_cometbft_client: SequencerClient, + pub(crate) sequencer_requests_per_second: u32, pub(crate) shutdown: CancellationToken, } @@ -31,6 +32,7 @@ impl Builder { celestia_token, executor, sequencer_cometbft_client, + sequencer_requests_per_second, shutdown, } = self; @@ -42,6 +44,7 @@ impl Builder { celestia_client, executor, sequencer_cometbft_client, + sequencer_requests_per_second, shutdown, }) } diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index db32c69152..3979b0e337 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -83,7 +83,7 @@ use self::{ latest_height_stream::stream_latest_heights, reconstruct::reconstruct_blocks_from_verified_blobs, verify::{ - verify_headers, + verify_metadata, BlobVerifier, }, }; @@ -123,7 +123,7 @@ pub(super) struct ReconstructedBlocks { pub(crate) struct Reader { celestia_block_time: Duration, - // Client to fetch heights and blocks from Celestia. + /// Client to fetch heights and blocks from Celestia. celestia_client: CelestiaClient, /// The channel used to send messages to the executor task. @@ -132,6 +132,10 @@ pub(crate) struct Reader { /// The client to get the sequencer namespace and verify blocks. sequencer_cometbft_client: SequencerClient, + /// The number of requests per second that will be sent to Sequencer + /// (usually to verify block data retrieved from Celestia blobs). + sequencer_requests_per_second: u32, + /// Token to listen for Conductor being shut down. shutdown: CancellationToken, } @@ -249,6 +253,7 @@ impl RunningReader { celestia_client, sequencer_cometbft_client, shutdown, + sequencer_requests_per_second, .. } = exposed_reader; let block_cache = @@ -267,7 +272,10 @@ impl RunningReader { Ok(Self { block_cache, - blob_verifier: Arc::new(BlobVerifier::new(sequencer_cometbft_client)), + blob_verifier: Arc::new( + BlobVerifier::try_new(sequencer_cometbft_client, sequencer_requests_per_second) + .wrap_err("failed to construct blob verifier")?, + ), celestia_client, enqueued_block: Fuse::terminated(), executor, @@ -527,7 +535,7 @@ impl FetchConvertVerifyAndReconstruct { "decoded Sequencer header and rollup info from raw Celestia blobs", ); - let verified_blobs = verify_headers(blob_verifier, decoded_blobs).await; + let verified_blobs = verify_metadata(blob_verifier, decoded_blobs).await; info!( number_of_verified_header_blobs = verified_blobs.len_header_blobs(), diff --git a/crates/astria-conductor/src/celestia/verify.rs b/crates/astria-conductor/src/celestia/verify.rs index 48d658a46e..b0b814bf99 100644 --- a/crates/astria-conductor/src/celestia/verify.rs +++ b/crates/astria-conductor/src/celestia/verify.rs @@ -27,6 +27,12 @@ use sequencer_client::{ }; use telemetry::display::base64; use tokio_util::task::JoinMap; +use tower::{ + util::BoxService, + BoxError, + Service as _, + ServiceExt as _, +}; use tracing::{ info, instrument, @@ -44,7 +50,6 @@ use super::{ block_verifier, convert::ConvertedBlobs, }; -use crate::utils::flatten; pub(super) struct VerifiedBlobs { celestia_height: u64, @@ -86,7 +91,7 @@ struct VerificationTaskKey { /// /// Drops blobs that could not be verified. #[instrument(skip_all)] -pub(super) async fn verify_headers( +pub(super) async fn verify_metadata( blob_verifier: Arc, converted_blobs: ConvertedBlobs, ) -> VerifiedBlobs { @@ -102,13 +107,16 @@ pub(super) async fn verify_headers( block_hash: blob.block_hash(), sequencer_height: blob.height(), }, - blob_verifier.clone().verify_header(blob).in_current_span(), + blob_verifier + .clone() + .verify_metadata(blob) + .in_current_span(), ); } while let Some((key, verification_result)) = verification_tasks.join_next().await { - match flatten(verification_result) { - Ok(verified_blob) => { + match verification_result { + Ok(Some(verified_blob)) => { if let Some(dropped_entry) = verified_header_blobs.insert(verified_blob.block_hash(), verified_blob) { @@ -125,12 +133,13 @@ pub(super) async fn verify_headers( ); } } + Ok(None) => {} Err(error) => { info!( block_hash = %base64(&key.block_hash), sequencer_height = %key.sequencer_height, %error, - "verification of sequencer blob failed; dropping it" + "verification of sequencer blob was cancelled abruptly; dropping it" ); } } @@ -181,11 +190,11 @@ struct VerificationMeta { impl VerificationMeta { async fn fetch( - client: SequencerClient, + client: RateLimitedVerificationClient, height: SequencerHeight, - ) -> Result { + ) -> Result { if height.value() == 0 { - return Err(VerificationMetaError::CantVerifyHeightZero); + return Err(VerificationMetaError::CantVerifyHeightZero.into()); } let prev_height = SequencerHeight::try_from(height.value().saturating_sub(1)).expect( "BUG: should always be able to convert a decremented cometbft height back to its \ @@ -194,8 +203,8 @@ impl VerificationMeta { decades and the chain's height is greater u32::MAX)", ); let (commit_response, validators_response) = tokio::try_join!( - fetch_commit_with_retry(client.clone(), height), - fetch_validators_with_retry(client.clone(), prev_height, height), + client.clone().get_commit(height), + client.clone().get_validators(prev_height, height), )?; super::ensure_commit_has_quorum( &commit_response.signed_header.commit, @@ -216,53 +225,66 @@ impl VerificationMeta { pub(super) struct BlobVerifier { cache: Cache, - sequencer_cometbft_client: SequencerClient, + client: RateLimitedVerificationClient, } impl BlobVerifier { - pub(super) fn new(sequencer_cometbft_client: SequencerClient) -> Self { - Self { + pub(super) fn try_new( + client: SequencerClient, + requests_per_seconds: u32, + ) -> eyre::Result { + Ok(Self { // Cache for verifying 1_000 celestia heights, assuming 6 sequencer heights per Celestia // height cache: Cache::new(6_000), - sequencer_cometbft_client, - } + client: RateLimitedVerificationClient::try_new(client, requests_per_seconds) + .wrap_err("failed to construct Sequencer block client")?, + }) } - async fn verify_header( + /// Verifies `metadata` against a remote Sequencer CometBFT instance. + /// + /// *Implementation note:* because [`Cache::try_get_with`] returns an + /// `Arc` in error position (due to [`RateLimitedVerificationClient`]), + /// this method cannot return an `eyre::Result` but needs to emit all errors + /// it encounters. + #[instrument(skip_all)] + async fn verify_metadata( self: Arc, - header: SubmittedMetadata, - ) -> eyre::Result { - use base64::prelude::*; - let height = header.height(); - let meta = self + metadata: SubmittedMetadata, + ) -> Option { + let height = metadata.height(); + let cached = self .cache - .try_get_with( - height, - VerificationMeta::fetch(self.sequencer_cometbft_client.clone(), height), - ) + .try_get_with(height, VerificationMeta::fetch(self.client.clone(), height)) .await - .wrap_err("failed getting data necessary to verify the sequencer header blob")?; - ensure!( - &meta.commit_header.header.chain_id == header.cometbft_chain_id(), - "expected cometbft chain ID `{}`, got `{}`", - meta.commit_header.header.chain_id, - header.cometbft_chain_id(), - ); - ensure!( - meta.commit_header.commit.block_id.hash.as_bytes() == header.block_hash(), - "block hash `{}` stored in blob does not match block hash `{}` of sequencer block", - BASE64_STANDARD.encode(header.block_hash()), - BASE64_STANDARD.encode(meta.commit_header.commit.block_id.hash.as_bytes()), - ); - Ok(header) + .inspect_err(|e| { + warn!( + error = %e.as_ref(), + "failed getting data necessary to verify the sequencer metadata retrieved from Celestia", + ); + }) + .ok()?; + if let Err(error) = ensure_chain_ids_match( + cached.commit_header.header.chain_id.as_str(), + metadata.cometbft_chain_id().as_str(), + ) + .and_then(|()| { + ensure_block_hashes_match( + cached.commit_header.commit.block_id.hash.as_bytes(), + &metadata.block_hash(), + ) + }) { + info!(reason = %error, "failed to verify metadata retrieved from Celestia; dropping it"); + } + Some(metadata) } } async fn fetch_commit_with_retry( client: SequencerClient, height: SequencerHeight, -) -> Result { +) -> Result { let retry_config = RetryFutureConfig::new(u32::MAX) .custom_backoff(CometBftRetryStrategy::new(Duration::from_millis(100))) .max_delay(Duration::from_secs(10)) @@ -286,6 +308,7 @@ async fn fetch_commit_with_retry( }) .with_config(retry_config) .await + .map(Into::into) .map_err(|source| VerificationMetaError::FetchCommit { height, source, @@ -296,7 +319,7 @@ async fn fetch_validators_with_retry( client: SequencerClient, prev_height: SequencerHeight, height: SequencerHeight, -) -> Result { +) -> Result { let retry_config = RetryFutureConfig::new(u32::MAX) .custom_backoff(CometBftRetryStrategy::new(Duration::from_millis(100))) .max_delay(Duration::from_secs(10)) @@ -324,6 +347,7 @@ async fn fetch_validators_with_retry( }) .with_config(retry_config) .await + .map(Into::into) .map_err(|source| VerificationMetaError::FetchValidators { height, prev_height, @@ -368,3 +392,146 @@ fn should_retry(error: &tendermint_rpc::Error) -> bool { Http(..) | HttpRequestFailed(..) | Timeout(..) ) } + +enum VerificationRequest { + Commit { + height: SequencerHeight, + }, + Validators { + prev_height: SequencerHeight, + height: SequencerHeight, + }, +} + +#[derive(Debug)] +enum VerificationResponse { + Commit(Box), + Validators(Box), +} + +impl From for VerificationResponse { + fn from(value: tendermint_rpc::endpoint::commit::Response) -> Self { + Self::Commit(Box::new(value)) + } +} + +impl From for VerificationResponse { + fn from(value: tendermint_rpc::endpoint::validators::Response) -> Self { + Self::Validators(Box::new(value)) + } +} + +#[derive(Clone)] +struct RateLimitedVerificationClient { + inner: tower::buffer::Buffer< + BoxService, + VerificationRequest, + >, +} + +impl RateLimitedVerificationClient { + async fn get_commit( + mut self, + height: SequencerHeight, + ) -> Result, BoxError> { + // allow: it is desired that the wildcard matches all future added variants because + // this call must only return a single specific variant, panicking otherwise. + #[allow(clippy::match_wildcard_for_single_variants)] + match self + .inner + .ready() + .await? + .call(VerificationRequest::Commit { + height, + }) + .await? + { + VerificationResponse::Commit(commit) => Ok(commit), + other => panic!("expected VerificationResponse::Commit, got {other:?}"), + } + } + + async fn get_validators( + mut self, + prev_height: SequencerHeight, + height: SequencerHeight, + ) -> Result, BoxError> { + // allow: it is desired that the wildcard matches all future added variants because + // this call must only return a single specific variant, panicking otherwise. + #[allow(clippy::match_wildcard_for_single_variants)] + match self + .inner + .ready() + .await? + .call(VerificationRequest::Validators { + prev_height, + height, + }) + .await? + { + VerificationResponse::Validators(validators) => Ok(validators), + other => panic!("expected VerificationResponse::Validators, got {other:?}"), + } + } + + fn try_new(client: SequencerClient, requests_per_second: u32) -> eyre::Result { + // XXX: the construction in here is a bit strange: + // the straight forward way to create a type-erased tower service is to use + // ServiceBuilder::boxed_clone(). + // + // However, this gives a BoxCloneService which is always Clone + Send, but !Sync. + // Therefore we can't use the ServiceBuilder::buffer adapter. + // + // We can however work around it: ServiceBuilder::boxed gives a BoxService, which is + // Send + Sync, but not Clone. We then manually evoke Buffer::new to create a + // Buffer, which is Send + Sync + Clone. + let service = tower::ServiceBuilder::new() + .boxed() + .rate_limit(requests_per_second.into(), Duration::from_secs(1)) + .service_fn(move |req: VerificationRequest| { + let client = client.clone(); + async move { + match req { + VerificationRequest::Commit { + height, + } => fetch_commit_with_retry(client, height).await, + VerificationRequest::Validators { + prev_height, + height, + } => fetch_validators_with_retry(client, prev_height, height).await, + } + } + }); + // XXX: This number is arbitarily set to the same number os the rate-limit. Does that + // make sense? Should the number be set higher? + let inner = tower::buffer::Buffer::new( + service, + requests_per_second + .try_into() + .wrap_err("failed to convert u32 requests-per-second to usize")?, + ); + Ok(Self { + inner, + }) + } +} + +fn ensure_chain_ids_match(in_commit: &str, in_header: &str) -> eyre::Result<()> { + ensure!( + in_commit == in_header, + "expected cometbft chain ID `{in_commit}` (from commit), but found `{in_header}` in \ + retrieved metadata" + ); + Ok(()) +} + +fn ensure_block_hashes_match(in_commit: &[u8], in_header: &[u8]) -> eyre::Result<()> { + use base64::prelude::*; + ensure!( + in_commit == in_header, + "expected block hash `{}` (from commit), but found `{}` in retrieved metadata", + BASE64_STANDARD.encode(in_commit), + BASE64_STANDARD.encode(in_header), + ); + Ok(()) +} diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 5c44971eee..4945fae975 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -141,6 +141,7 @@ impl Conductor { celestia_block_time: Duration::from_millis(cfg.celestia_block_time_ms), executor: executor_handle.clone(), sequencer_cometbft_client: sequencer_cometbft_client.clone(), + sequencer_requests_per_second: cfg.sequencer_requests_per_second, shutdown: shutdown.clone(), } .build() diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index 081834830d..699f95f1c9 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -55,6 +55,9 @@ pub struct Config { pub sequencer_block_time_ms: u64, + /// The number of requests per second that will be sent to Sequencer. + pub sequencer_requests_per_second: u32, + /// Address of the RPC server for execution pub execution_rpc_url: String, diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 1f43ce7ab7..3f9c2701dc 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -458,6 +458,7 @@ fn make_config() -> Config { celestia_bearer_token: CELESTIA_BEARER_TOKEN.into(), sequencer_grpc_url: "http://127.0.0.1:8080".into(), sequencer_cometbft_url: "http://127.0.0.1:26657".into(), + sequencer_requests_per_second: 500, sequencer_block_time_ms: 2000, execution_rpc_url: "http://127.0.0.1:50051".into(), log: "info".into(),