From e2ea6b417be00f38f77bbf6d41e47a681e3dfb88 Mon Sep 17 00:00:00 2001 From: trantorian <114066155+Trantorian1@users.noreply.github.com> Date: Thu, 16 Jan 2025 17:50:29 +0100 Subject: [PATCH] feat(pending): pending block sealed instead of re-added to mempool --- .../madara/client/block_production/src/lib.rs | 96 +++++++++++++++---- .../madara/client/db/src/storage_updates.rs | 4 - crates/madara/client/devnet/src/lib.rs | 22 +++-- .../node/src/service/block_production.rs | 3 +- crates/madara/primitives/block/src/lib.rs | 2 +- 5 files changed, 95 insertions(+), 32 deletions(-) diff --git a/crates/madara/client/block_production/src/lib.rs b/crates/madara/client/block_production/src/lib.rs index e7355e1d4..73ad4ca06 100644 --- a/crates/madara/client/block_production/src/lib.rs +++ b/crates/madara/client/block_production/src/lib.rs @@ -32,7 +32,7 @@ use mp_class::compile::ClassCompilationError; use mp_class::ConvertedClass; use mp_convert::ToFelt; use mp_receipt::from_blockifier_execution_info; -use mp_state_update::{ContractStorageDiffItem, StateDiff, StorageEntry}; +use mp_state_update::{ContractStorageDiffItem, DeclaredClassItem, StateDiff, StorageEntry}; use mp_transactions::TransactionWithHash; use mp_utils::service::ServiceContext; use opentelemetry::KeyValue; @@ -125,37 +125,101 @@ impl BlockProductionTask { self.current_pending_tick = n; } - /// Continue the pending block state by re-adding all of its transactions back into the mempool. - /// This function will always clear the pending block in db, even if the transactions could not be added to the mempool. - pub fn re_add_pending_block_txs_to_mempool( + /// Closes the last pending block store in db (if any). + /// + /// This avoids re-executing transaction by re-adding them to the [Mempool], + /// as was done before. + pub async fn close_pending_block( backend: &MadaraBackend, - mempool: &Mempool, + importer: &Arc, + metrics: &Arc, ) -> Result<(), Cow<'static, str>> { - let Some(current_pending_block) = - backend.get_block(&DbBlockId::Pending).map_err(|err| format!("Getting pending block: {err:#}"))? - else { + let err_pending_block = |err| format!("Getting pending block: {err:#}"); + let err_pending_state_diff = |err| format!("Getting pending state update: {err:#}"); + let err_pending_visited_segments = |err| format!("Getting pending visited segments: {err:#}"); + let err_pending_clear = |err| format!("Clearing pending block: {err:#}"); + let err_latest_block_n = |err| format!("Failed to get latest block number: {err:#}"); + + let start_time = Instant::now(); + + let pending_block = backend + .get_block(&DbBlockId::Pending) + .map_err(err_pending_block)? + .map(|block| MadaraPendingBlock::try_from(block).expect("Ready block stored in place of pending")); + let Some(pending_block) = pending_block else { // No pending block return Ok(()); }; - backend.clear_pending_block().map_err(|err| format!("Clearing pending block: {err:#}"))?; - let n_txs = re_add_finalized_to_blockifier::re_add_txs_to_mempool(current_pending_block, mempool, backend) - .map_err(|err| format!("Re-adding transactions to mempool: {err:#}"))?; + let pending_state_diff = backend.get_pending_block_state_update().map_err(err_pending_state_diff)?; + let pending_visited_segments = + backend.get_pending_block_segments().map_err(err_pending_visited_segments)?.unwrap_or_default(); + + let declared_classes = pending_state_diff.declared_classes.iter().try_fold( + vec![], + |mut acc, DeclaredClassItem { class_hash, .. }| match backend + .get_converted_class(&BlockId::Tag(BlockTag::Pending), class_hash) + { + Ok(Some(class)) => { + acc.push(class); + Ok(acc) + } + Ok(None) => { + Err(format!("Failed to retrieve pending declared class at hash {class_hash:x?}: not found in db")) + } + Err(err) => Err(format!("Failed to retrieve pending declared class at hash {class_hash:x?}: {err:#}")), + }, + )?; + + // NOTE: we disabled the Write Ahead Log when clearing the pending block + // so this will be done atomically at the same time as we close the next + // block, after we manually flush the db. + backend.clear_pending_block().map_err(err_pending_clear)?; + + let block_n = backend.get_latest_block_n().map_err(err_latest_block_n)?.unwrap_or(0); + let n_txs = pending_block.inner.transactions.len(); + + // Close and import the pending block + close_block( + &importer, + pending_block, + &pending_state_diff, + backend.chain_config().chain_id.clone(), + block_n, + declared_classes, + pending_visited_segments, + ) + .await + .map_err(|err| format!("Failed to close pending block: {err:#}"))?; + + // Flush changes to disk, pending block removal and adding the next + // block happens atomically + backend.flush().map_err(|err| format!("DB flushing error: {err:#}"))?; + + let end_time = start_time.elapsed(); + tracing::info!("⛏️ Closed block #{} with {} transactions - {:?}", block_n, n_txs, end_time); + + // Record metrics + let attributes = [ + KeyValue::new("transactions_added", n_txs.to_string()), + KeyValue::new("closing_time", end_time.as_secs_f32().to_string()), + ]; + + metrics.block_counter.add(1, &[]); + metrics.block_gauge.record(block_n, &attributes); + metrics.transaction_counter.add(n_txs as u64, &[]); - if n_txs > 0 { - tracing::info!("🔁 Re-added {n_txs} transactions from the pending block back into the mempool"); - } Ok(()) } - pub fn new( + pub async fn new( backend: Arc, importer: Arc, mempool: Arc, metrics: Arc, l1_data_provider: Arc, ) -> Result { - if let Err(err) = Self::re_add_pending_block_txs_to_mempool(&backend, &mempool) { + if let Err(err) = Self::close_pending_block(&backend, &importer, &metrics).await { // This error should not stop block production from working. If it happens, that's too bad. We drop the pending state and start from // a fresh one. tracing::error!("Failed to continue the pending block state: {err:#}"); diff --git a/crates/madara/client/db/src/storage_updates.rs b/crates/madara/client/db/src/storage_updates.rs index b5f5ae59e..2476a5f03 100644 --- a/crates/madara/client/db/src/storage_updates.rs +++ b/crates/madara/client/db/src/storage_updates.rs @@ -40,13 +40,9 @@ impl MadaraBackend { }; let task_contract_db = || { - // let nonces_from_deployed = - // state_diff.deployed_contracts.iter().map(|&DeployedContractItem { address, .. }| (address, Felt::ZERO)); - let nonces_from_updates = state_diff.nonces.into_iter().map(|NonceUpdate { contract_address, nonce }| (contract_address, nonce)); - // let nonce_map: HashMap = nonces_from_deployed.chain(nonces_from_updates).collect(); // set nonce to zero when contract deployed let nonce_map: HashMap = nonces_from_updates.collect(); let contract_class_updates_replaced = state_diff diff --git a/crates/madara/client/devnet/src/lib.rs b/crates/madara/client/devnet/src/lib.rs index 36ddf28b0..041885cca 100644 --- a/crates/madara/client/devnet/src/lib.rs +++ b/crates/madara/client/devnet/src/lib.rs @@ -311,8 +311,9 @@ mod tests { let importer = Arc::new(BlockImporter::new(Arc::clone(&backend), None).unwrap()); tracing::debug!("{:?}", block.state_diff); - tokio::runtime::Runtime::new() - .unwrap() + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime .block_on( importer.add_block( block, @@ -335,14 +336,15 @@ mod tests { let mempool = Arc::new(Mempool::new(Arc::clone(&backend), Arc::clone(&l1_data_provider), mempool_limits)); let metrics = BlockProductionMetrics::register(); - let block_production = BlockProductionTask::new( - Arc::clone(&backend), - Arc::clone(&importer), - Arc::clone(&mempool), - Arc::new(metrics), - Arc::clone(&l1_data_provider), - ) - .unwrap(); + let block_production = runtime + .block_on(BlockProductionTask::new( + Arc::clone(&backend), + Arc::clone(&importer), + Arc::clone(&mempool), + Arc::new(metrics), + Arc::clone(&l1_data_provider), + )) + .unwrap(); DevnetForTesting { backend, contracts, block_production, mempool } } diff --git a/crates/madara/node/src/service/block_production.rs b/crates/madara/node/src/service/block_production.rs index 0d6b4dbbe..5f6146e64 100644 --- a/crates/madara/node/src/service/block_production.rs +++ b/crates/madara/node/src/service/block_production.rs @@ -52,7 +52,8 @@ impl Service for BlockProductionService { Arc::clone(mempool), Arc::clone(metrics), Arc::clone(l1_data_provider), - )?; + ) + .await?; runner.service_loop(move |ctx| block_production_task.block_production_task(ctx)); diff --git a/crates/madara/primitives/block/src/lib.rs b/crates/madara/primitives/block/src/lib.rs index 4db2aa545..6e1c3fb58 100644 --- a/crates/madara/primitives/block/src/lib.rs +++ b/crates/madara/primitives/block/src/lib.rs @@ -285,7 +285,7 @@ impl From for MadaraMaybePendingBlock { /// Visited segments are the class segments that are visited during the execution of the block. /// This info is an input of SNOS and used for proving. -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct VisitedSegments(pub Vec); #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]