Skip to content

Commit

Permalink
feat(pending): pending block sealed instead of re-added to mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
Trantorian1 committed Jan 16, 2025
1 parent dfafa02 commit e2ea6b4
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 32 deletions.
96 changes: 80 additions & 16 deletions crates/madara/client/block_production/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use mp_class::compile::ClassCompilationError;
use mp_class::ConvertedClass;
use mp_convert::ToFelt;
use mp_receipt::from_blockifier_execution_info;
use mp_state_update::{ContractStorageDiffItem, StateDiff, StorageEntry};
use mp_state_update::{ContractStorageDiffItem, DeclaredClassItem, StateDiff, StorageEntry};
use mp_transactions::TransactionWithHash;
use mp_utils::service::ServiceContext;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -125,37 +125,101 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
self.current_pending_tick = n;
}

/// Continue the pending block state by re-adding all of its transactions back into the mempool.
/// This function will always clear the pending block in db, even if the transactions could not be added to the mempool.
pub fn re_add_pending_block_txs_to_mempool(
/// Closes the last pending block store in db (if any).
///
/// This avoids re-executing transaction by re-adding them to the [Mempool],
/// as was done before.
pub async fn close_pending_block(
backend: &MadaraBackend,
mempool: &Mempool,
importer: &Arc<BlockImporter>,
metrics: &Arc<BlockProductionMetrics>,
) -> Result<(), Cow<'static, str>> {
let Some(current_pending_block) =
backend.get_block(&DbBlockId::Pending).map_err(|err| format!("Getting pending block: {err:#}"))?
else {
let err_pending_block = |err| format!("Getting pending block: {err:#}");
let err_pending_state_diff = |err| format!("Getting pending state update: {err:#}");
let err_pending_visited_segments = |err| format!("Getting pending visited segments: {err:#}");
let err_pending_clear = |err| format!("Clearing pending block: {err:#}");
let err_latest_block_n = |err| format!("Failed to get latest block number: {err:#}");

let start_time = Instant::now();

let pending_block = backend
.get_block(&DbBlockId::Pending)
.map_err(err_pending_block)?
.map(|block| MadaraPendingBlock::try_from(block).expect("Ready block stored in place of pending"));
let Some(pending_block) = pending_block else {
// No pending block
return Ok(());
};
backend.clear_pending_block().map_err(|err| format!("Clearing pending block: {err:#}"))?;

let n_txs = re_add_finalized_to_blockifier::re_add_txs_to_mempool(current_pending_block, mempool, backend)
.map_err(|err| format!("Re-adding transactions to mempool: {err:#}"))?;
let pending_state_diff = backend.get_pending_block_state_update().map_err(err_pending_state_diff)?;
let pending_visited_segments =
backend.get_pending_block_segments().map_err(err_pending_visited_segments)?.unwrap_or_default();

let declared_classes = pending_state_diff.declared_classes.iter().try_fold(
vec![],
|mut acc, DeclaredClassItem { class_hash, .. }| match backend
.get_converted_class(&BlockId::Tag(BlockTag::Pending), class_hash)
{
Ok(Some(class)) => {
acc.push(class);
Ok(acc)
}
Ok(None) => {
Err(format!("Failed to retrieve pending declared class at hash {class_hash:x?}: not found in db"))
}
Err(err) => Err(format!("Failed to retrieve pending declared class at hash {class_hash:x?}: {err:#}")),
},
)?;

// NOTE: we disabled the Write Ahead Log when clearing the pending block
// so this will be done atomically at the same time as we close the next
// block, after we manually flush the db.
backend.clear_pending_block().map_err(err_pending_clear)?;

let block_n = backend.get_latest_block_n().map_err(err_latest_block_n)?.unwrap_or(0);
let n_txs = pending_block.inner.transactions.len();

// Close and import the pending block
close_block(
&importer,
pending_block,
&pending_state_diff,
backend.chain_config().chain_id.clone(),
block_n,
declared_classes,
pending_visited_segments,
)
.await
.map_err(|err| format!("Failed to close pending block: {err:#}"))?;

// Flush changes to disk, pending block removal and adding the next
// block happens atomically
backend.flush().map_err(|err| format!("DB flushing error: {err:#}"))?;

let end_time = start_time.elapsed();
tracing::info!("⛏️ Closed block #{} with {} transactions - {:?}", block_n, n_txs, end_time);

// Record metrics
let attributes = [
KeyValue::new("transactions_added", n_txs.to_string()),
KeyValue::new("closing_time", end_time.as_secs_f32().to_string()),
];

metrics.block_counter.add(1, &[]);
metrics.block_gauge.record(block_n, &attributes);
metrics.transaction_counter.add(n_txs as u64, &[]);

if n_txs > 0 {
tracing::info!("🔁 Re-added {n_txs} transactions from the pending block back into the mempool");
}
Ok(())
}

pub fn new(
pub async fn new(
backend: Arc<MadaraBackend>,
importer: Arc<BlockImporter>,
mempool: Arc<Mempool>,
metrics: Arc<BlockProductionMetrics>,
l1_data_provider: Arc<dyn L1DataProvider>,
) -> Result<Self, Error> {
if let Err(err) = Self::re_add_pending_block_txs_to_mempool(&backend, &mempool) {
if let Err(err) = Self::close_pending_block(&backend, &importer, &metrics).await {
// This error should not stop block production from working. If it happens, that's too bad. We drop the pending state and start from
// a fresh one.
tracing::error!("Failed to continue the pending block state: {err:#}");
Expand Down
4 changes: 0 additions & 4 deletions crates/madara/client/db/src/storage_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@ impl MadaraBackend {
};

let task_contract_db = || {
// let nonces_from_deployed =
// state_diff.deployed_contracts.iter().map(|&DeployedContractItem { address, .. }| (address, Felt::ZERO));

let nonces_from_updates =
state_diff.nonces.into_iter().map(|NonceUpdate { contract_address, nonce }| (contract_address, nonce));

// let nonce_map: HashMap<Felt, Felt> = nonces_from_deployed.chain(nonces_from_updates).collect(); // set nonce to zero when contract deployed
let nonce_map: HashMap<Felt, Felt> = nonces_from_updates.collect();

let contract_class_updates_replaced = state_diff
Expand Down
22 changes: 12 additions & 10 deletions crates/madara/client/devnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ mod tests {
let importer = Arc::new(BlockImporter::new(Arc::clone(&backend), None).unwrap());

tracing::debug!("{:?}", block.state_diff);
tokio::runtime::Runtime::new()
.unwrap()
let runtime = tokio::runtime::Runtime::new().unwrap();

runtime
.block_on(
importer.add_block(
block,
Expand All @@ -335,14 +336,15 @@ mod tests {
let mempool = Arc::new(Mempool::new(Arc::clone(&backend), Arc::clone(&l1_data_provider), mempool_limits));
let metrics = BlockProductionMetrics::register();

let block_production = BlockProductionTask::new(
Arc::clone(&backend),
Arc::clone(&importer),
Arc::clone(&mempool),
Arc::new(metrics),
Arc::clone(&l1_data_provider),
)
.unwrap();
let block_production = runtime
.block_on(BlockProductionTask::new(
Arc::clone(&backend),
Arc::clone(&importer),
Arc::clone(&mempool),
Arc::new(metrics),
Arc::clone(&l1_data_provider),
))
.unwrap();

DevnetForTesting { backend, contracts, block_production, mempool }
}
Expand Down
3 changes: 2 additions & 1 deletion crates/madara/node/src/service/block_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl Service for BlockProductionService {
Arc::clone(mempool),
Arc::clone(metrics),
Arc::clone(l1_data_provider),
)?;
)
.await?;

runner.service_loop(move |ctx| block_production_task.block_production_task(ctx));

Expand Down
2 changes: 1 addition & 1 deletion crates/madara/primitives/block/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl From<MadaraBlock> for MadaraMaybePendingBlock {

/// Visited segments are the class segments that are visited during the execution of the block.
/// This info is an input of SNOS and used for proving.
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Default, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct VisitedSegments(pub Vec<VisitedSegmentEntry>);

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand Down

0 comments on commit e2ea6b4

Please sign in to comment.