Skip to content
This repository has been archived by the owner on Feb 6, 2025. It is now read-only.

Commit

Permalink
feat: calculate state root in background
Browse files Browse the repository at this point in the history
  • Loading branch information
Keefe Liu authored and keefel committed Nov 18, 2024
1 parent 328fbf9 commit c64b71c
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 46 deletions.
10 changes: 10 additions & 0 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,16 @@ impl ExecutedBlock {
pub fn trie_updates(&self) -> &TrieUpdates {
&self.trie
}

/// Returns a state root of the block.
pub fn state_root(&self) -> B256 {
self.block.header.header().state_root
}

/// Sets the trie updates for the block.
pub fn set_trie_updates(&mut self, trie: TrieUpdates) {
self.trie = Arc::new(trie);
}
}

/// Non-empty chain of blocks.
Expand Down
2 changes: 2 additions & 0 deletions crates/cli/commands/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ impl<
pruning,
enable_prefetch,
skip_state_root_validation: performance_optimization.skip_state_root_validation,
compute_state_root_in_background: performance_optimization
.compute_state_root_in_background,
enable_execution_cache: performance_optimization.enable_execution_cache,
};

Expand Down
11 changes: 9 additions & 2 deletions crates/engine/local/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ where
mode: MiningMode,
payload_attributes_builder: B,
skip_state_root_validation: bool,
compute_state_root_in_background: bool,
enable_prefetch: bool,
enable_execution_cache: bool,
) -> Self
Expand All @@ -87,8 +88,13 @@ where
let engine_kind =
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };

let persistence_handle =
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx, false);
let persistence_handle = PersistenceHandle::spawn_service(
provider,
blockchain_db.clone(),
pruner,
sync_metrics_tx,
false,
);
let payload_validator = ExecutionPayloadValidator::new(chain_spec);

let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
Expand All @@ -105,6 +111,7 @@ where
invalid_block_hook,
engine_kind,
skip_state_root_validation,
compute_state_root_in_background,
enable_prefetch,
enable_execution_cache,
);
Expand Down
4 changes: 4 additions & 0 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook>,
sync_metrics_tx: MetricEventsSender,
skip_state_root_validation: bool,
compute_state_root_in_background: bool,
enable_prefetch: bool,
enable_execution_cache: bool,
) -> Self {
Expand All @@ -90,6 +91,7 @@ where

let persistence_handle = PersistenceHandle::spawn_service(
provider,
blockchain_db.clone(),
pruner,
sync_metrics_tx,
enable_execution_cache,
Expand All @@ -110,6 +112,7 @@ where
invalid_block_hook,
engine_kind,
skip_state_root_validation,
compute_state_root_in_background,
enable_prefetch,
enable_execution_cache,
);
Expand Down Expand Up @@ -228,6 +231,7 @@ mod tests {
false,
false,
false,
false,
);
}
}
194 changes: 187 additions & 7 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use crate::metrics::PersistenceMetrics;
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_primitives::GotExpected;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
providers::{ConsistentDbView, ProviderNodeTypes},
writer::UnifiedStorageWriter,
BlockHashReader, BlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory,
StateProviderFactory, StateReader, StaticFileProviderFactory,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError};
use std::{
sync::mpsc::{Receiver, SendError, Sender},
time::Instant,
Expand All @@ -24,9 +30,11 @@ use tracing::{debug, error};
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking I/O operations in an endless loop.
#[derive(Debug)]
pub struct PersistenceService<N: ProviderNodeTypes> {
pub struct PersistenceService<N: ProviderNodeTypes, P> {
/// The provider factory to use
provider: ProviderFactory<N>,
/// The view provider
view_provider: P,
/// Incoming requests
incoming: Receiver<PersistenceAction>,
/// The pruner
Expand All @@ -39,17 +47,23 @@ pub struct PersistenceService<N: ProviderNodeTypes> {
enable_state_cache: bool,
}

impl<N: ProviderNodeTypes> PersistenceService<N> {
impl<N: ProviderNodeTypes, P> PersistenceService<N, P>
where
P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader,
{
/// Create a new persistence service
pub fn new(
provider: ProviderFactory<N>,
view_provider: P,
incoming: Receiver<PersistenceAction>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
enable_state_cache: bool,
) -> Self {
Self {
provider,
view_provider,
incoming,
pruner,
metrics: PersistenceMetrics::default(),
Expand All @@ -70,7 +84,11 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
}
}

impl<N: ProviderNodeTypes> PersistenceService<N> {
impl<N: ProviderNodeTypes, P> PersistenceService<N, P>
where
P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader,
{
/// This is the main loop, that will listen to database events and perform the requested
/// database actions
pub fn run(mut self) -> Result<(), PersistenceError> {
Expand Down Expand Up @@ -104,6 +122,30 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
}
}
}
PersistenceAction::SaveBlocksWithStateRootCalculation(
blocks,
parent_hash,
sender,
) => {
let result =
self.on_save_block_with_state_root_calculation(blocks, parent_hash)?;
let result_number = result.0.map(|r| r.number);

// we ignore the error because the caller may or may not care about the result
let _ = sender.send(result);

if let Some(block_number) = result_number {
// send new sync metrics based on saved blocks
let _ = self
.sync_metrics_tx
.send(MetricEvent::SyncHeight { height: block_number });

if self.pruner.is_pruning_needed(block_number) {
// We log `PrunerOutput` inside the `Pruner`
let _ = self.prune_before(block_number)?;
}
}
}
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
let provider = self.provider.database_provider_rw()?;
provider.save_finalized_block_number(finalized_block)?;
Expand Down Expand Up @@ -168,6 +210,85 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash_num)
}

fn on_save_block_with_state_root_calculation(
&self,
mut blocks: Vec<ExecutedBlock>,
parent_hash: B256,
) -> Result<(Option<BlockNumHash>, TrieUpdates), PersistenceError> {
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");

let state_root_result = self
.compute_state_root_in_batch_blocks(blocks.clone(), parent_hash)
.map_err(PersistenceError::StateRootError)?;

if let Some(last_block) = blocks.last_mut() {
last_block.set_trie_updates(state_root_result.1.clone());
}

let save_blocks_result = self.on_save_blocks(blocks)?;
Ok((save_blocks_result, state_root_result.1))
}

fn compute_state_root_in_batch_blocks(
&self,
blocks: Vec<ExecutedBlock>,
parent_hash: B256,
) -> Result<(B256, TrieUpdates), AdvanceCalculateStateRootError> {
let mut hashed_state = HashedPostState::default();
for block in &blocks {
hashed_state.extend(block.hashed_state().clone());
}
let block_number = blocks.last().unwrap().block().number;
let block_hash = blocks.last().unwrap().block().hash();
let target_state_root = blocks.last().unwrap().state_root();

let root_time = Instant::now();
debug!(target: "engine::persistence", ?block_number, ?block_hash, "Computing state root");
let state_root_result = match self.compute_state_root_parallel(parent_hash, &hashed_state) {
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine::persistence", %error, "Parallel state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(AdvanceCalculateStateRootError::ComputeFailed(error)),
};

let (state_root, trie_output) = if let Some(result) = state_root_result {
result
} else {
return Err(AdvanceCalculateStateRootError::ResultNotFound());
};

let root_elapsed = root_time.elapsed();
debug!(target: "engine::persistence", ?block_number, ?block_hash, ?state_root, ?root_elapsed, "Computed state root");

if state_root != target_state_root {
return Err(AdvanceCalculateStateRootError::StateRootDiff(GotExpected {
got: state_root,
expected: target_state_root,
}))
}

Ok((state_root, trie_output))
}

fn compute_state_root_parallel(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.view_provider.clone())?;
let mut input = TrieInput::default();

let revert_state = consistent_view.revert_state(parent_hash)?;
input.append(revert_state);

// Extend with block we are validating root for.
input.append_ref(hashed_state);

ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
}
}

/// One of the errors that can happen when using the persistence service.
Expand All @@ -180,6 +301,28 @@ pub enum PersistenceError {
/// A provider error
#[error(transparent)]
ProviderError(#[from] ProviderError),

/// A state root error
#[error(transparent)]
StateRootError(#[from] AdvanceCalculateStateRootError),
}

/// This is an error that can come from advancing state root calculation. Either this can be a
/// [`ProviderError`], or this can be a [`GotExpected`]
#[derive(Debug, Error)]
pub enum AdvanceCalculateStateRootError {
/// A provider error
#[error(transparent)]
Provider(#[from] ProviderError),
/// An error that can come from a state root diff
#[error(transparent)]
StateRootDiff(#[from] GotExpected<B256>),
/// An error that can come from a parallel state root error
#[error(transparent)]
ComputeFailed(#[from] ParallelStateRootError),
/// An error that can come from a trie update
#[error("Result not found")]
ResultNotFound(),
}

/// A signal to the persistence service that part of the tree state can be persisted.
Expand All @@ -192,6 +335,15 @@ pub enum PersistenceAction {
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<BlockNumHash>>),

/// The section of tree state that should be persisted. These blocks are expected in order of
/// increasing block number.
/// This action will also calculate the state root for the given blocks.
SaveBlocksWithStateRootCalculation(
Vec<ExecutedBlock>,
B256,
oneshot::Sender<(Option<BlockNumHash>, TrieUpdates)>,
),

/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
Expand Down Expand Up @@ -219,12 +371,22 @@ impl PersistenceHandle {
}

/// Create a new [`PersistenceHandle`], and spawn the persistence service.
pub fn spawn_service<N: ProviderNodeTypes>(
pub fn spawn_service<N: ProviderNodeTypes, P>(
provider_factory: ProviderFactory<N>,
view_provider: P,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
enable_state_cache: bool,
) -> Self {
) -> Self
where
P: DatabaseProviderFactory
+ BlockReader
+ StateProviderFactory
+ StateReader
+ Clone
+ 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader,
{
// create the initial channels
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();

Expand All @@ -234,6 +396,7 @@ impl PersistenceHandle {
// spawn the persistence service
let db_service = PersistenceService::new(
provider_factory,
view_provider,
db_service_rx,
pruner,
sync_metrics_tx,
Expand Down Expand Up @@ -276,6 +439,23 @@ impl PersistenceHandle {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}

/// Persists the finalized block number on disk.
/// This will also calculate the state root for the given blocks.
/// The resulting [`TrieUpdates`] is returned in the receiver end of the sender argument.
/// The new tip hash is returned in the receiver end of the sender argument.
pub fn save_blocks_with_state_root_calculation(
&self,
blocks: Vec<ExecutedBlock>,
parent_hash: B256,
tx: oneshot::Sender<(Option<BlockNumHash>, TrieUpdates)>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::SaveBlocksWithStateRootCalculation(
blocks,
parent_hash,
tx,
))
}

/// Persists the finalized block number on disk.
pub fn save_finalized_block_number(
&self,
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/tree/src/tree/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Engine tree configuration.
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 16;

/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 2;
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 16;

const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
Expand Down
Loading

0 comments on commit c64b71c

Please sign in to comment.