diff --git a/crates/astria-celestia-client/src/client.rs b/crates/astria-celestia-client/src/client.rs index 35cee105cb..e888da471f 100644 --- a/crates/astria-celestia-client/src/client.rs +++ b/crates/astria-celestia-client/src/client.rs @@ -145,13 +145,22 @@ pub trait CelestiaClientExt: BlobClient { Ok(rollup_datas) } - /// Submits sequencer `blocks` to celestia after converting and signing them, returning the - /// height at which they were included. + /// Submits sequencer `blocks` to celestia + /// + /// `Blocks` after converted into celestia blobs and then posted. Rollup + /// data is posted to a namespace derived from the rollup chain id. + /// Sequencer data for each is posted to a namespace derived from the + /// sequencer block's chain ID. /// /// This calls the `blob.Submit` celestia-node RPC. + /// + /// Returns Result: + /// - Ok: the celestia block height blobs were included in. + /// - Errors: + /// - SubmitSequencerBlocksError::AssembleBlobs when failed to assemble blob + /// - SubmitSequencerBlocksError::JsonRpc when Celestia `blob.Submit` fails async fn submit_sequencer_blocks( &self, - namespace: Namespace, blocks: Vec, submit_options: SubmitOptions, ) -> Result { @@ -165,13 +174,12 @@ pub trait CelestiaClientExt: BlobClient { let mut all_blobs = Vec::with_capacity(num_expected_blobs); for (i, block) in blocks.into_iter().enumerate() { - let mut blobs = - assemble_blobs_from_sequencer_block_data(namespace, block).map_err(|source| { - SubmitSequencerBlocksError::AssembleBlobs { - source, - index: i, - } - })?; + let mut blobs = assemble_blobs_from_sequencer_block_data(block).map_err(|source| { + SubmitSequencerBlocksError::AssembleBlobs { + source, + index: i, + } + })?; all_blobs.append(&mut blobs); } @@ -206,7 +214,6 @@ pub enum BlobAssemblyError { } fn assemble_blobs_from_sequencer_block_data( - namespace: Namespace, block_data: SequencerBlockData, ) -> Result, BlobAssemblyError> { use sequencer_validation::{ @@ -260,6 +267,7 @@ fn assemble_blobs_from_sequencer_block_data( chain_ids.push(chain_id); } + let sequencer_namespace = celestia_namespace_v0_from_hashed_bytes(header.chain_id.as_bytes()); let sequencer_namespace_data = SequencerNamespaceData { block_hash, header, @@ -276,7 +284,8 @@ fn assemble_blobs_from_sequencer_block_data( ); blobs.push( - Blob::new(namespace, data).map_err(BlobAssemblyError::ConstructBlobFromSequencerData)?, + Blob::new(sequencer_namespace, data) + .map_err(BlobAssemblyError::ConstructBlobFromSequencerData)?, ); Ok(blobs) } diff --git a/crates/astria-celestia-client/src/lib.rs b/crates/astria-celestia-client/src/lib.rs index 0ad777f459..86d24cb857 100644 --- a/crates/astria-celestia-client/src/lib.rs +++ b/crates/astria-celestia-client/src/lib.rs @@ -8,8 +8,5 @@ pub use blob_space::{ pub use celestia_rpc; pub use celestia_tendermint; pub use celestia_types; -use celestia_types::nmt::Namespace; pub use client::CelestiaClientExt; pub use jsonrpsee; - -pub const SEQUENCER_NAMESPACE: Namespace = Namespace::const_v0(*b"astriasequ"); diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 20ebc2c372..ce8fca88e9 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -5,6 +5,11 @@ use std::{ }; use astria_sequencer_types::ChainId; +use base64::{ + display::Base64Display, + engine::general_purpose::STANDARD, +}; +use celestia_client::celestia_types::nmt::Namespace; use color_eyre::eyre::{ self, WrapErr as _, @@ -40,7 +45,10 @@ use crate::{ self, ClientProvider, }, - data_availability, + data_availability::{ + self, + CelestiaReaderConfig, + }, executor::Executor, sequencer, Config, @@ -152,13 +160,35 @@ impl Conductor { let (shutdown_tx, shutdown_rx) = oneshot::channel(); shutdown_channels.insert(Self::DATA_AVAILABILITY, shutdown_tx); let block_verifier = BlockVerifier::new(sequencer_client_pool.clone()); + + // Sequencer namespace is defined by the chain id of attached sequencer node + // which can be fetched from any block header. + let sequencer_namespace = { + let client = sequencer_client_pool + .get() + .await + .wrap_err("failed to get a sequencer client from the pool")?; + get_sequencer_namespace(client) + .await + .wrap_err("failed to get sequencer namespace")? + }; + info!( + celestia_namespace = %Base64Display::new(sequencer_namespace.as_bytes(), &STANDARD), + sequencer_chain_id = %cfg.chain_id, + "celestia namespace derived from sequencer chain id", + ); + + let celestia_config = CelestiaReaderConfig { + node_url: cfg.celestia_node_url, + bearer_token: Some(cfg.celestia_bearer_token), + poll_interval: std::time::Duration::from_secs(3), + }; // TODO ghi(https://github.com/astriaorg/astria/issues/470): add sync functionality to data availability reader let reader = data_availability::Reader::new( - &cfg.celestia_node_url, - &cfg.celestia_bearer_token, - std::time::Duration::from_secs(3), + celestia_config, executor_tx.clone(), block_verifier, + sequencer_namespace, celestia_client::blob_space::celestia_namespace_v0_from_hashed_bytes( cfg.chain_id.as_ref(), ), @@ -330,3 +360,42 @@ fn spawn_signal_handler() -> SignalReceiver { stop_rx, } } + +/// Get the sequencer namespace from the latest sequencer block. +async fn get_sequencer_namespace( + client: deadpool::managed::Object, +) -> eyre::Result { + use sequencer_client::SequencerClientExt as _; + + let retry_config = tryhard::RetryFutureConfig::new(10) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + |attempt: u32, + next_delay: Option, + error: &sequencer_client::extension_trait::Error| { + let error = error.clone(); + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + async move { + let error = &error as &(dyn std::error::Error + 'static); + warn!( + attempt, + wait_duration, + error, + "attempt to grab sequencer block failed; retrying after backoff", + ); + } + }, + ); + + let block = tryhard::retry_fn(|| client.latest_sequencer_block()) + .with_config(retry_config) + .await + .wrap_err("failed to get block from sequencer after 10 attempts")?; + + let chain_id = block.into_raw().header.chain_id; + + Ok(celestia_client::blob_space::celestia_namespace_v0_from_hashed_bytes(chain_id.as_bytes())) +} diff --git a/crates/astria-conductor/src/data_availability.rs b/crates/astria-conductor/src/data_availability.rs index b497adeba5..24fdcf7f52 100644 --- a/crates/astria-conductor/src/data_availability.rs +++ b/crates/astria-conductor/src/data_availability.rs @@ -13,7 +13,6 @@ use celestia_client::{ jsonrpsee::http_client::HttpClient, CelestiaClientExt as _, SequencerNamespaceData, - SEQUENCER_NAMESPACE, }; use color_eyre::eyre::{ self, @@ -100,8 +99,10 @@ pub(crate) struct Reader { block_verifier: BlockVerifier, - /// Namespace ID - namespace: Namespace, + /// Sequencer Namespace ID + sequencer_namespace: Namespace, + /// Rollup Namespace ID + rollup_namespace: Namespace, get_latest_height: Option>>, @@ -116,22 +117,27 @@ pub(crate) struct Reader { shutdown: oneshot::Receiver<()>, } +pub(crate) struct CelestiaReaderConfig { + pub(crate) node_url: String, + pub(crate) bearer_token: Option, + pub(crate) poll_interval: Duration, +} + impl Reader { /// Creates a new Reader instance and returns a command sender. pub(crate) async fn new( - celestia_node_url: &str, - celestia_bearer_token: &str, - celestia_poll_interval: Duration, + celestia_config: CelestiaReaderConfig, executor_tx: executor::Sender, block_verifier: BlockVerifier, - namespace: Namespace, + sequencer_namespace: Namespace, + rollup_namespace: Namespace, shutdown: oneshot::Receiver<()>, ) -> eyre::Result { use celestia_client::celestia_rpc::HeaderClient; let celestia_client = celestia_client::celestia_rpc::client::new_http( - celestia_node_url, - Some(celestia_bearer_token), + &celestia_config.node_url, + celestia_config.bearer_token.as_deref(), ) .wrap_err("failed constructing celestia http client")?; @@ -149,13 +155,14 @@ impl Reader { Ok(Self { executor_tx, celestia_client, - celestia_poll_interval, + celestia_poll_interval: celestia_config.poll_interval, current_block_height, get_latest_height: None, fetch_sequencer_blobs_at_height: JoinMap::new(), verify_sequencer_blobs_and_assemble_rollups: JoinMap::new(), block_verifier, - namespace, + sequencer_namespace, + rollup_namespace, shutdown, }) } @@ -263,10 +270,11 @@ impl Reader { "getting sequencer data from celestia already in flight, not spawning" ); } else { + let sequencer_namespace = self.sequencer_namespace; self.fetch_sequencer_blobs_at_height .spawn(height, async move { client - .get_sequencer_data(height, SEQUENCER_NAMESPACE) + .get_sequencer_data(height, sequencer_namespace) .await .wrap_err("failed to fetch sequencer data from celestia") .map(|rsp| rsp.datas) @@ -324,7 +332,7 @@ impl Reader { sequencer_data, self.celestia_client.clone(), self.block_verifier.clone(), - self.namespace, + self.rollup_namespace, ) .in_current_span(), ); @@ -365,7 +373,7 @@ async fn verify_sequencer_blobs_and_assemble_rollups( sequencer_blobs: Vec, client: HttpClient, block_verifier: BlockVerifier, - namespace: Namespace, + rollup_namespace: Namespace, ) -> eyre::Result> { // spawn the verification tasks let mut verification_tasks = verify_all_datas(sequencer_blobs, &block_verifier); @@ -390,7 +398,7 @@ async fn verify_sequencer_blobs_and_assemble_rollups( client.clone(), height, data, - namespace, + rollup_namespace, assembly_tx.clone(), ) .in_current_span(), @@ -417,11 +425,11 @@ async fn fetch_verify_rollup_blob_and_forward_to_assembly( client: HttpClient, height: Height, data: SequencerNamespaceData, - namespace: Namespace, + rollup_namespace: Namespace, block_tx: mpsc::Sender, ) { let mut rollups = match client - .get_rollup_data_matching_sequencer_data(height, namespace, &data) + .get_rollup_data_matching_sequencer_data(height, rollup_namespace, &data) .await { Err(e) => { diff --git a/crates/astria-sequencer-relayer/src/relayer.rs b/crates/astria-sequencer-relayer/src/relayer.rs index 3dbdda92a1..76ee8fd641 100644 --- a/crates/astria-sequencer-relayer/src/relayer.rs +++ b/crates/astria-sequencer-relayer/src/relayer.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use celestia_client::SEQUENCER_NAMESPACE; use eyre::WrapErr as _; use humantime::format_duration; use sequencer_types::SequencerBlockData; @@ -479,9 +478,9 @@ async fn submit_blocks_to_celestia( num_blocks = sequencer_block_data.len(), "submitting collected sequencer blocks to data availability layer", ); + let height = client .submit_sequencer_blocks( - SEQUENCER_NAMESPACE, sequencer_block_data, SubmitOptions { fee: Some(fee),