Skip to content

Commit

Permalink
fix(conductor, relayer): use sequencer chain id for sequencer blobs (#…
Browse files Browse the repository at this point in the history
…577)

## Summary
Updates `sequencer-relayer` and `conductor` to use the attached
sequencer nodes `chain-id` for blob namespace.

## Background
We were using a constant for the namespace, which would mean when
reading from a production network that sequencer blobs might need to be
filtered further based on chain-id, accessing and filtering data without
any savings for cost to submit (still would be seperate relayers). This
makes it such that our various chains through regenesis and all won't
have to do this filtering

## Changes
- Sequencer-relayer sequencer blob namespace derived from sequencer
block chain ID on header
- Conductor grabs chain id at startup from a block on rpc sequencer.
- Removed the default namespace

## Testing
CI/CD + manual testing in dev-cluster

## Breaking Changelist
- Posted data blobs to celestia will not match from previous releases.
  • Loading branch information
joroshiba committed Nov 14, 2023
1 parent 709c06c commit 7ca078b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 38 deletions.
33 changes: 21 additions & 12 deletions crates/astria-celestia-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,22 @@ pub trait CelestiaClientExt: BlobClient {
Ok(rollup_datas)
}

/// Submits sequencer `blocks` to celestia after converting and signing them, returning the
/// height at which they were included.
/// Submits sequencer `blocks` to celestia
///
/// `Blocks` after converted into celestia blobs and then posted. Rollup
/// data is posted to a namespace derived from the rollup chain id.
/// Sequencer data for each is posted to a namespace derived from the
/// sequencer block's chain ID.
///
/// This calls the `blob.Submit` celestia-node RPC.
///
/// Returns Result:
/// - Ok: the celestia block height blobs were included in.
/// - Errors:
/// - SubmitSequencerBlocksError::AssembleBlobs when failed to assemble blob
/// - SubmitSequencerBlocksError::JsonRpc when Celestia `blob.Submit` fails
async fn submit_sequencer_blocks(
&self,
namespace: Namespace,
blocks: Vec<SequencerBlockData>,
submit_options: SubmitOptions,
) -> Result<u64, SubmitSequencerBlocksError> {
Expand All @@ -165,13 +174,12 @@ pub trait CelestiaClientExt: BlobClient {

let mut all_blobs = Vec::with_capacity(num_expected_blobs);
for (i, block) in blocks.into_iter().enumerate() {
let mut blobs =
assemble_blobs_from_sequencer_block_data(namespace, block).map_err(|source| {
SubmitSequencerBlocksError::AssembleBlobs {
source,
index: i,
}
})?;
let mut blobs = assemble_blobs_from_sequencer_block_data(block).map_err(|source| {
SubmitSequencerBlocksError::AssembleBlobs {
source,
index: i,
}
})?;
all_blobs.append(&mut blobs);
}

Expand Down Expand Up @@ -206,7 +214,6 @@ pub enum BlobAssemblyError {
}

fn assemble_blobs_from_sequencer_block_data(
namespace: Namespace,
block_data: SequencerBlockData,
) -> Result<Vec<Blob>, BlobAssemblyError> {
use sequencer_validation::{
Expand Down Expand Up @@ -260,6 +267,7 @@ fn assemble_blobs_from_sequencer_block_data(
chain_ids.push(chain_id);
}

let sequencer_namespace = celestia_namespace_v0_from_hashed_bytes(header.chain_id.as_bytes());
let sequencer_namespace_data = SequencerNamespaceData {
block_hash,
header,
Expand All @@ -276,7 +284,8 @@ fn assemble_blobs_from_sequencer_block_data(
);

blobs.push(
Blob::new(namespace, data).map_err(BlobAssemblyError::ConstructBlobFromSequencerData)?,
Blob::new(sequencer_namespace, data)
.map_err(BlobAssemblyError::ConstructBlobFromSequencerData)?,
);
Ok(blobs)
}
Expand Down
3 changes: 0 additions & 3 deletions crates/astria-celestia-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,5 @@ pub use blob_space::{
pub use celestia_rpc;
pub use celestia_tendermint;
pub use celestia_types;
use celestia_types::nmt::Namespace;
pub use client::CelestiaClientExt;
pub use jsonrpsee;

pub const SEQUENCER_NAMESPACE: Namespace = Namespace::const_v0(*b"astriasequ");
77 changes: 73 additions & 4 deletions crates/astria-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ use std::{
};

use astria_sequencer_types::ChainId;
use base64::{
display::Base64Display,
engine::general_purpose::STANDARD,
};
use celestia_client::celestia_types::nmt::Namespace;
use color_eyre::eyre::{
self,
WrapErr as _,
Expand Down Expand Up @@ -40,7 +45,10 @@ use crate::{
self,
ClientProvider,
},
data_availability,
data_availability::{
self,
CelestiaReaderConfig,
},
executor::Executor,
sequencer,
Config,
Expand Down Expand Up @@ -152,13 +160,35 @@ impl Conductor {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
shutdown_channels.insert(Self::DATA_AVAILABILITY, shutdown_tx);
let block_verifier = BlockVerifier::new(sequencer_client_pool.clone());

// Sequencer namespace is defined by the chain id of attached sequencer node
// which can be fetched from any block header.
let sequencer_namespace = {
let client = sequencer_client_pool
.get()
.await
.wrap_err("failed to get a sequencer client from the pool")?;
get_sequencer_namespace(client)
.await
.wrap_err("failed to get sequencer namespace")?
};
info!(
celestia_namespace = %Base64Display::new(sequencer_namespace.as_bytes(), &STANDARD),
sequencer_chain_id = %cfg.chain_id,
"celestia namespace derived from sequencer chain id",
);

let celestia_config = CelestiaReaderConfig {
node_url: cfg.celestia_node_url,
bearer_token: Some(cfg.celestia_bearer_token),
poll_interval: std::time::Duration::from_secs(3),
};
// TODO ghi(https://github.com/astriaorg/astria/issues/470): add sync functionality to data availability reader
let reader = data_availability::Reader::new(
&cfg.celestia_node_url,
&cfg.celestia_bearer_token,
std::time::Duration::from_secs(3),
celestia_config,
executor_tx.clone(),
block_verifier,
sequencer_namespace,
celestia_client::blob_space::celestia_namespace_v0_from_hashed_bytes(
cfg.chain_id.as_ref(),
),
Expand Down Expand Up @@ -330,3 +360,42 @@ fn spawn_signal_handler() -> SignalReceiver {
stop_rx,
}
}

/// Get the sequencer namespace from the latest sequencer block.
async fn get_sequencer_namespace(
client: deadpool::managed::Object<ClientProvider>,
) -> eyre::Result<Namespace> {
use sequencer_client::SequencerClientExt as _;

let retry_config = tryhard::RetryFutureConfig::new(10)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(20))
.on_retry(
|attempt: u32,
next_delay: Option<Duration>,
error: &sequencer_client::extension_trait::Error| {
let error = error.clone();
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
async move {
let error = &error as &(dyn std::error::Error + 'static);
warn!(
attempt,
wait_duration,
error,
"attempt to grab sequencer block failed; retrying after backoff",
);
}
},
);

let block = tryhard::retry_fn(|| client.latest_sequencer_block())
.with_config(retry_config)
.await
.wrap_err("failed to get block from sequencer after 10 attempts")?;

let chain_id = block.into_raw().header.chain_id;

Ok(celestia_client::blob_space::celestia_namespace_v0_from_hashed_bytes(chain_id.as_bytes()))
}
42 changes: 25 additions & 17 deletions crates/astria-conductor/src/data_availability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use celestia_client::{
jsonrpsee::http_client::HttpClient,
CelestiaClientExt as _,
SequencerNamespaceData,
SEQUENCER_NAMESPACE,
};
use color_eyre::eyre::{
self,
Expand Down Expand Up @@ -100,8 +99,10 @@ pub(crate) struct Reader {

block_verifier: BlockVerifier,

/// Namespace ID
namespace: Namespace,
/// Sequencer Namespace ID
sequencer_namespace: Namespace,
/// Rollup Namespace ID
rollup_namespace: Namespace,

get_latest_height: Option<JoinHandle<eyre::Result<Height>>>,

Expand All @@ -116,22 +117,27 @@ pub(crate) struct Reader {
shutdown: oneshot::Receiver<()>,
}

pub(crate) struct CelestiaReaderConfig {
pub(crate) node_url: String,
pub(crate) bearer_token: Option<String>,
pub(crate) poll_interval: Duration,
}

impl Reader {
/// Creates a new Reader instance and returns a command sender.
pub(crate) async fn new(
celestia_node_url: &str,
celestia_bearer_token: &str,
celestia_poll_interval: Duration,
celestia_config: CelestiaReaderConfig,
executor_tx: executor::Sender,
block_verifier: BlockVerifier,
namespace: Namespace,
sequencer_namespace: Namespace,
rollup_namespace: Namespace,
shutdown: oneshot::Receiver<()>,
) -> eyre::Result<Self> {
use celestia_client::celestia_rpc::HeaderClient;

let celestia_client = celestia_client::celestia_rpc::client::new_http(
celestia_node_url,
Some(celestia_bearer_token),
&celestia_config.node_url,
celestia_config.bearer_token.as_deref(),
)
.wrap_err("failed constructing celestia http client")?;

Expand All @@ -149,13 +155,14 @@ impl Reader {
Ok(Self {
executor_tx,
celestia_client,
celestia_poll_interval,
celestia_poll_interval: celestia_config.poll_interval,
current_block_height,
get_latest_height: None,
fetch_sequencer_blobs_at_height: JoinMap::new(),
verify_sequencer_blobs_and_assemble_rollups: JoinMap::new(),
block_verifier,
namespace,
sequencer_namespace,
rollup_namespace,
shutdown,
})
}
Expand Down Expand Up @@ -263,10 +270,11 @@ impl Reader {
"getting sequencer data from celestia already in flight, not spawning"
);
} else {
let sequencer_namespace = self.sequencer_namespace;
self.fetch_sequencer_blobs_at_height
.spawn(height, async move {
client
.get_sequencer_data(height, SEQUENCER_NAMESPACE)
.get_sequencer_data(height, sequencer_namespace)
.await
.wrap_err("failed to fetch sequencer data from celestia")
.map(|rsp| rsp.datas)
Expand Down Expand Up @@ -324,7 +332,7 @@ impl Reader {
sequencer_data,
self.celestia_client.clone(),
self.block_verifier.clone(),
self.namespace,
self.rollup_namespace,
)
.in_current_span(),
);
Expand Down Expand Up @@ -365,7 +373,7 @@ async fn verify_sequencer_blobs_and_assemble_rollups(
sequencer_blobs: Vec<SequencerNamespaceData>,
client: HttpClient,
block_verifier: BlockVerifier,
namespace: Namespace,
rollup_namespace: Namespace,
) -> eyre::Result<Vec<SequencerBlockSubset>> {
// spawn the verification tasks
let mut verification_tasks = verify_all_datas(sequencer_blobs, &block_verifier);
Expand All @@ -390,7 +398,7 @@ async fn verify_sequencer_blobs_and_assemble_rollups(
client.clone(),
height,
data,
namespace,
rollup_namespace,
assembly_tx.clone(),
)
.in_current_span(),
Expand All @@ -417,11 +425,11 @@ async fn fetch_verify_rollup_blob_and_forward_to_assembly(
client: HttpClient,
height: Height,
data: SequencerNamespaceData,
namespace: Namespace,
rollup_namespace: Namespace,
block_tx: mpsc::Sender<SequencerBlockSubset>,
) {
let mut rollups = match client
.get_rollup_data_matching_sequencer_data(height, namespace, &data)
.get_rollup_data_matching_sequencer_data(height, rollup_namespace, &data)
.await
{
Err(e) => {
Expand Down
3 changes: 1 addition & 2 deletions crates/astria-sequencer-relayer/src/relayer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Duration;

use celestia_client::SEQUENCER_NAMESPACE;
use eyre::WrapErr as _;
use humantime::format_duration;
use sequencer_types::SequencerBlockData;
Expand Down Expand Up @@ -479,9 +478,9 @@ async fn submit_blocks_to_celestia(
num_blocks = sequencer_block_data.len(),
"submitting collected sequencer blocks to data availability layer",
);

let height = client
.submit_sequencer_blocks(
SEQUENCER_NAMESPACE,
sequencer_block_data,
SubmitOptions {
fee: Some(fee),
Expand Down

0 comments on commit 7ca078b

Please sign in to comment.