diff --git a/Cargo.lock b/Cargo.lock index 5ae2d30ce7..b41d414f07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2793,6 +2793,7 @@ dependencies = [ "bytes", "cfg-if", "ethrex-core", + "ethrex-levm", "ethrex-metrics", "ethrex-rlp", "ethrex-storage", diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 5d7e63cde2..e6a497789c 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -117,7 +117,7 @@ impl Discv4Server { debug!("Received {read} bytes from {from}"); match Packet::decode(&buf[..read]) { - Err(e) => error!("Could not decode packet: {:?}", e), + Err(e) => debug!("Could not decode packet: {:?}", e), Ok(packet) => { let msg = packet.get_message(); let msg_name = msg.to_string(); diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index 6a7e3fbfbe..c963ba5268 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -16,7 +16,7 @@ use tokio::{ sync::Mutex, }; use tokio_util::task::TaskTracker; -use tracing::{error, info}; +use tracing::{debug, error, info}; use types::Node; pub(crate) mod discv4; @@ -139,7 +139,7 @@ async fn handle_peer_as_receiver(context: P2PContext, peer_addr: SocketAddr, str // TODO We should remove the peer from the table if connection failed // but currently it will make the tests fail // table.lock().await.replace_peer(node.node_id); - error!("Error creating tcp connection with peer at {peer_addr}: {e}") + debug!("Error creating tcp connection with peer at {peer_addr}: {e}") } } } @@ -152,7 +152,7 @@ async fn handle_peer_as_initiator(context: P2PContext, node: Node) { // TODO We should remove the peer from the table if connection failed // but currently it will make the tests fail // table.lock().await.replace_peer(node.node_id); - error!("Error establishing tcp connection with peer at {addr}: {e}"); + debug!("Error establishing tcp connection with peer at {addr}: {e}"); return; } }; @@ -163,7 +163,7 @@ async fn handle_peer_as_initiator(context: P2PContext, node: Node) { // TODO We should remove the peer from the table if connection failed // but currently it will make the tests fail // table.lock().await.replace_peer(node.node_id); - error!("Error creating tcp connection with peer at {addr}: {e}") + debug!("Error creating tcp connection with peer at {addr}: {e}") } }; } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index b9d7fb07af..e649114d2d 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -6,7 +6,7 @@ use ethrex_core::{ use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{error::StoreError, Store}; use ethrex_trie::{Nibbles, Node, TrieError, TrieState, EMPTY_TRIE_HASH}; -use std::{collections::BTreeMap, sync::Arc}; +use std::{cmp::min, collections::BTreeMap, sync::Arc}; use tokio::{ sync::{ mpsc::{self, error::SendError, Receiver, Sender}, @@ -203,9 +203,9 @@ impl SyncManager { let block = store .get_block_by_hash(*hash)? .ok_or(SyncError::CorruptDB)?; + ethrex_blockchain::add_block(&block, &store)?; store.set_canonical_block(block.header.number, *hash)?; store.update_latest_block_number(block.header.number)?; - ethrex_blockchain::add_block(&block, &store)?; } self.last_snap_pivot = pivot_header.number; // Finished a sync cycle without aborting halfway, clear current checkpoint @@ -268,7 +268,7 @@ async fn store_block_bodies( store: Store, ) -> Result<(), SyncError> { loop { - debug!("Requesting Block Headers "); + debug!("Requesting Block Bodies "); if let Some(block_bodies) = peers.request_block_bodies(block_hashes.clone()).await { debug!(" Received {} Block Bodies", block_bodies.len()); // Track which bodies we have already fetched @@ -296,7 +296,7 @@ async fn store_receipts( store: Store, ) -> Result<(), SyncError> { loop { - debug!("Requesting Block Headers "); + debug!("Requesting Receipts "); if let Some(receipts) = peers.request_receipts(block_hashes.clone()).await { debug!(" Received {} Receipts", receipts.len()); // Track which blocks we have already fetched receipts for @@ -432,15 +432,17 @@ async fn rebuild_state_trie( let pending_storages = !pending_storage_accounts.is_empty(); // Next cycle may have different storage roots for these accounts so we will leave them to healing if pending_storages { - let mut stored_pending_storages = store - .get_pending_storage_heal_accounts()? - .unwrap_or_default(); - stored_pending_storages.extend(pending_storage_accounts); - info!( + let mut stored_pending_storages = store.get_storage_heal_paths()?.unwrap_or_default(); + stored_pending_storages.extend( + pending_storage_accounts + .iter() + .map(|k| (*k, vec![Nibbles::default()])), + ); + debug!( "Current pending storage accounts: {}", stored_pending_storages.len() ); - store.set_pending_storage_heal_accounts(stored_pending_storages)?; + store.set_storage_heal_paths(stored_pending_storages)?; } if stale || pending_storages { // Skip healing and return stale status @@ -454,13 +456,12 @@ async fn rebuild_state_trie( let res = heal_state_trie( bytecode_sender.clone(), state_root, - current_state_root, store.clone(), peers.clone(), ) .await?; // Send empty batch to signal that no more batches are incoming - debug!("Account Trie fully rebuilt, signaling bytecode fetcher process"); + debug!("Account Trie healing ended signaling bytecode fetcher process"); bytecode_sender.send(vec![]).await?; bytecode_fetcher_handle.await??; Ok(res) @@ -549,18 +550,17 @@ async fn storage_fetcher( // or if we have no more incoming batches, spawn a fetch process // If the pivot became stale don't process anything and just save incoming requests while !stale - && (pending_storage.len() >= NODE_BATCH_SIZE - || !incoming && !pending_storage.is_empty()) + && (pending_storage.len() >= BATCH_SIZE || (!incoming && !pending_storage.is_empty())) { // We will be spawning multiple tasks and then collecting their results // This uses a loop inside the main loop as the result from these tasks may lead to more values in queue let mut storage_tasks = tokio::task::JoinSet::new(); while !stale - && (pending_storage.len() >= NODE_BATCH_SIZE - || !incoming && !pending_storage.is_empty()) + && (pending_storage.len() >= BATCH_SIZE + || (!incoming && !pending_storage.is_empty())) { let next_batch = pending_storage - .drain(..NODE_BATCH_SIZE.min(pending_storage.len())) + .drain(..BATCH_SIZE.min(pending_storage.len())) .collect::>(); storage_tasks.spawn(fetch_storage_batch( next_batch.clone(), @@ -706,38 +706,32 @@ async fn handle_large_storage_range( async fn heal_state_trie( bytecode_sender: Sender>, state_root: H256, - mut current_root: H256, store: Store, peers: PeerHandler, ) -> Result { + // Check if we have pending storages to heal from a previous cycle + let pending: BTreeMap> = store + .get_storage_heal_paths()? + .unwrap_or_default() + .into_iter() + .collect(); // Spawn a storage healer for this blocks's storage let (storage_sender, storage_receiver) = mpsc::channel::>(500); let storage_healer_handler = tokio::spawn(storage_healer( state_root, + pending, storage_receiver, peers.clone(), store.clone(), )); - // Check if we have pending storages to heal from a previous cycle - if let Some(pending) = store.get_pending_storage_heal_accounts()? { - debug!( - "Retrieved {} pending storage healing requests", - pending.len() - ); - storage_sender.send(pending).await?; - } + // Check if we have pending paths from a previous cycle + let mut paths = store.get_state_heal_paths()?.unwrap_or_default(); // Begin by requesting the root node - let mut paths = vec![Nibbles::default()]; + paths.push(Nibbles::default()); while !paths.is_empty() { - // Fetch the latests paths first to prioritize reaching leaves as soon as possible - let batch: Vec = paths - .drain(paths.len().saturating_sub(NODE_BATCH_SIZE)..) - .collect(); - - if let Some(nodes) = peers - .request_state_trienodes(state_root, batch.clone()) - .await - { + // Take at most one batch so we don't overload the peer + let batch = paths[0..min(paths.len(), NODE_BATCH_SIZE)].to_vec(); + if let Some(nodes) = peers.request_state_trienodes(state_root, batch).await { debug!("Received {} state nodes", nodes.len()); let mut hahsed_addresses = vec![]; let mut code_hashes = vec![]; @@ -745,11 +739,10 @@ async fn heal_state_trie( // - Add its children to the queue (if we don't have them already) // - If it is a leaf, request its bytecode & storage // - If it is a leaf, add its path & value to the trie - // Add unfetched nodes back to the queue (we do this first to ensure deph-focused fetching) - paths.extend_from_slice(&batch[nodes.len()..]); - for (node, path) in nodes.into_iter().zip(batch.into_iter()) { + for node in nodes { // We cannot keep the trie state open - let mut trie = store.open_state_trie(current_root); + let mut trie = store.open_state_trie(*EMPTY_TRIE_HASH); + let path = paths.remove(0); paths.extend(node_missing_children(&node, &path, trie.state())?); if let Node::Leaf(node) = &node { // Fetch bytecode & storage @@ -771,11 +764,10 @@ async fn heal_state_trie( { code_hashes.push(account.code_hash); } - // Write values to trie - trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; - // Update current root - current_root = trie.hash()?; } + // Add node to trie + let hash = node.compute_hash(); + trie.state_mut().write_node(node, hash)?; } // Send storage & bytecode requests if !hahsed_addresses.is_empty() { @@ -789,14 +781,20 @@ async fn heal_state_trie( } } debug!("State Healing stopped, signaling storage healer"); + // Save paths for the next cycle + if !paths.is_empty() { + debug!("Caching {} paths for the next cycle", paths.len()); + store.set_state_heal_paths(paths.clone())?; + } // Send empty batch to signal that no more batches are incoming storage_sender.send(vec![]).await?; - let pending_storage_heal_accounts = storage_healer_handler.await??; + let storage_heal_paths = storage_healer_handler.await??; // Update pending list // If a storage trie was left mid-healing we will heal it again - let storage_healing_succesful = pending_storage_heal_accounts.is_empty(); + let storage_healing_succesful = storage_heal_paths.is_empty(); if !storage_healing_succesful { - store.set_pending_storage_heal_accounts(pending_storage_heal_accounts)?; + debug!("{} storages with pending healing", storage_heal_paths.len()); + store.set_storage_heal_paths(storage_heal_paths.into_iter().collect())?; } Ok(paths.is_empty() && storage_healing_succesful) } @@ -804,54 +802,59 @@ async fn heal_state_trie( /// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval /// Also retrieves their children nodes until we have the full storage trie stored /// If the state becomes stale while fetching, returns its current queued account hashes +/// Receives the prending storages from a previous iteration async fn storage_healer( state_root: H256, + mut pending_paths: BTreeMap>, mut receiver: Receiver>, peers: PeerHandler, store: Store, -) -> Result, SyncError> { - // Pending list of storages to fetch - // Each entry is made up of AccountHash -> (CurrentRoot, Paths) - let mut pending_storages: BTreeMap)> = BTreeMap::new(); - //let mut pending_storages: Vec<(H256, Nibbles)> = vec![]; +) -> Result>, SyncError> { // The pivot may become stale while the fetcher is active, we will still keep the process // alive until the end signal so we don't lose queued messages let mut stale = false; let mut incoming = true; + // This boolean exists only so that we skip waiting for messages on the first loop iteration + // It will be removed on later optimizations + let mut startup = true; while incoming { - // Fetch incoming requests - match receiver.recv().await { - Some(account_paths) if !account_paths.is_empty() => { - // Add the root paths of each account trie to the queue - pending_storages.extend( - account_paths - .into_iter() - .map(|acc_path| (acc_path, (*EMPTY_TRIE_HASH, vec![Nibbles::default()]))), - ); + if startup { + startup = false; + } else { + // Fetch incoming requests + match receiver.recv().await { + Some(account_paths) if !account_paths.is_empty() => { + // Add the root paths of each account trie to the queue + pending_paths.extend( + account_paths + .into_iter() + .map(|acc_path| (acc_path, vec![Nibbles::default()])), + ); + } + // Disconnect / Empty message signaling no more bytecodes to sync + _ => incoming = false, } - // Disconnect / Empty message signaling no more bytecodes to sync - _ => incoming = false, } // If we have enough pending storages to fill a batch // or if we have no more incoming batches, spawn a fetch process // If the pivot became stale don't process anything and just save incoming requests - while !stale && !pending_storages.is_empty() { - let mut next_batch: BTreeMap)> = BTreeMap::new(); + while !stale && !pending_paths.is_empty() { + let mut next_batch: BTreeMap> = BTreeMap::new(); // Fill batch let mut batch_size = 0; - while batch_size < BATCH_SIZE { - let (key, val) = pending_storages.pop_first().unwrap(); - batch_size += val.1.len(); + while batch_size < NODE_BATCH_SIZE && !pending_paths.is_empty() { + let (key, val) = pending_paths.pop_first().unwrap(); + batch_size += val.len(); next_batch.insert(key, val); } let (return_batch, is_stale) = heal_storage_batch(state_root, next_batch.clone(), peers.clone(), store.clone()) .await?; - pending_storages.extend(return_batch.into_iter()); + pending_paths.extend(return_batch.into_iter()); stale |= is_stale; } } - Ok(pending_storages.into_keys().collect()) + Ok(pending_paths) } /// Receives a set of storage trie paths (grouped by their corresponding account's state trie path), @@ -859,34 +862,27 @@ async fn storage_healer( /// Also returns a boolean indicating if the pivot became stale during the request async fn heal_storage_batch( state_root: H256, - mut batch: BTreeMap)>, + mut batch: BTreeMap>, peers: PeerHandler, store: Store, -) -> Result<(BTreeMap)>, bool), SyncError> { - let req_batch = batch.iter().map(|(k, v)| (*k, v.1.clone())).collect(); - if let Some(mut nodes) = peers.request_storage_trienodes(state_root, req_batch).await { - debug!("Received {} nodes", nodes.len()); +) -> Result<(BTreeMap>, bool), SyncError> { + if let Some(mut nodes) = peers + .request_storage_trienodes(state_root, batch.clone()) + .await + { + debug!("Received {} storage nodes", nodes.len()); // Process the nodes for each account path - for (acc_path, (root, paths)) in batch.iter_mut() { - let mut trie = store.open_storage_trie(*acc_path, *root); + for (acc_path, paths) in batch.iter_mut() { + let mut trie = store.open_storage_trie(*acc_path, *EMPTY_TRIE_HASH); // Get the corresponding nodes for node in nodes.drain(..paths.len().min(nodes.len())) { let path = paths.remove(0); // Add children to batch let children = node_missing_children(&node, &path, trie.state())?; paths.extend(children); - // If it is a leaf node, insert values into the trie - if let Node::Leaf(leaf) = node { - let path = &path.concat(leaf.partial.clone()).to_bytes(); - if path.len() != 32 { - // Something went wrong - return Err(SyncError::CorruptPath); - } - trie.insert(path.to_vec(), leaf.value.encode_to_vec())?; - } + let hash = node.compute_hash(); + trie.state_mut().write_node(node, hash)?; } - // Update current root - *root = trie.hash()?; // Cut the loop if we ran out of nodes if nodes.is_empty() { break; @@ -894,7 +890,7 @@ async fn heal_storage_batch( } // Return remaining and added paths to be added to the queue // Filter out the storages we completely fetched - batch.retain(|_, v| !v.1.is_empty()); + batch.retain(|_, v| !v.is_empty()); return Ok((batch, false)); } // Pivot became stale, lets inform the fetcher diff --git a/crates/storage/store/engines/api.rs b/crates/storage/store/engines/api.rs index aca3c05fbe..57801104b5 100644 --- a/crates/storage/store/engines/api.rs +++ b/crates/storage/store/engines/api.rs @@ -7,7 +7,7 @@ use ethrex_core::types::{ use std::{fmt::Debug, panic::RefUnwindSafe}; use crate::error::StoreError; -use ethrex_trie::Trie; +use ethrex_trie::{Nibbles, Trie}; pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Add block header @@ -258,35 +258,34 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Gets the hash of the last header downloaded during a snap sync fn get_header_download_checkpoint(&self) -> Result, StoreError>; - /// Clears the hash of the last header downloaded during a snap sync - fn clear_header_download_checkpoint(&self) -> Result<(), StoreError>; - /// Sets the current state root of the state trie being rebuilt during snap sync fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError>; /// Gets the current state root of the state trie being rebuilt during snap sync fn get_state_trie_root_checkpoint(&self) -> Result, StoreError>; - /// Clears the current state root of the state trie being rebuilt during snap sync - fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError>; - /// Sets the last key fetched from the state trie being fetched during snap sync fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError>; /// Gets the last key fetched from the state trie being fetched during snap sync fn get_state_trie_key_checkpoint(&self) -> Result, StoreError>; - /// Clears the last key fetched from the state trie being fetched during snap sync - fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError>; + /// Sets the storage trie paths in need of healing, grouped by hashed address + fn set_storage_heal_paths(&self, accounts: Vec<(H256, Vec)>) + -> Result<(), StoreError>; + + /// Gets the storage trie paths in need of healing, grouped by hashed address + #[allow(clippy::type_complexity)] + fn get_storage_heal_paths(&self) -> Result)>>, StoreError>; - /// Sets the list of account hashes whose storage needs healing - fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError>; + /// Sets the state trie paths in need of healing + fn set_state_heal_paths(&self, paths: Vec) -> Result<(), StoreError>; - /// Gets the list of account hashes whos storage needs healing - fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError>; + /// Gets the state trie paths in need of healing + fn get_state_heal_paths(&self) -> Result>, StoreError>; - /// Clears the list of account hashes whose storage needs healing - fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError>; + /// Clears all checkpoint data created during the last snap sync + fn clear_snap_state(&self) -> Result<(), StoreError>; fn is_synced(&self) -> Result; diff --git a/crates/storage/store/engines/in_memory.rs b/crates/storage/store/engines/in_memory.rs index 23e8be55c4..0455e2fea7 100644 --- a/crates/storage/store/engines/in_memory.rs +++ b/crates/storage/store/engines/in_memory.rs @@ -4,7 +4,7 @@ use ethereum_types::{H256, U256}; use ethrex_core::types::{ BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt, }; -use ethrex_trie::{InMemoryTrieDB, Trie}; +use ethrex_trie::{InMemoryTrieDB, Nibbles, Trie}; use std::{ collections::HashMap, fmt::Debug, @@ -65,7 +65,9 @@ pub struct SnapState { /// Last downloaded key of the latest State Trie state_trie_key_checkpoint: Option, /// Accounts which storage needs healing - pending_storage_heal_accounts: Option>, + storage_heal_paths: Option)>>, + /// State trie Paths in need of healing + state_heal_paths: Option>, } impl Store { @@ -449,11 +451,6 @@ impl StoreEngine for Store { Ok(self.inner().snap_state.header_download_checkpoint) } - fn clear_header_download_checkpoint(&self) -> Result<(), StoreError> { - self.inner().snap_state.header_download_checkpoint = None; - Ok(()) - } - fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { self.inner().snap_state.state_trie_root_checkpoint = Some(current_root); Ok(()) @@ -463,11 +460,6 @@ impl StoreEngine for Store { Ok(self.inner().snap_state.state_trie_root_checkpoint) } - fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError> { - self.inner().snap_state.state_trie_root_checkpoint = None; - Ok(()) - } - fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { self.inner().snap_state.state_trie_key_checkpoint = Some(last_key); Ok(()) @@ -477,26 +469,20 @@ impl StoreEngine for Store { Ok(self.inner().snap_state.state_trie_key_checkpoint) } - fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError> { - self.inner().snap_state.state_trie_key_checkpoint = None; - Ok(()) - } - - fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { - self.inner().snap_state.pending_storage_heal_accounts = Some(accounts); + fn set_storage_heal_paths( + &self, + accounts: Vec<(H256, Vec)>, + ) -> Result<(), StoreError> { + self.inner().snap_state.storage_heal_paths = Some(accounts); Ok(()) } - fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { - Ok(self - .inner() - .snap_state - .pending_storage_heal_accounts - .clone()) + fn get_storage_heal_paths(&self) -> Result)>>, StoreError> { + Ok(self.inner().snap_state.storage_heal_paths.clone()) } - fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError> { - self.inner().snap_state.pending_storage_heal_accounts = None; + fn clear_snap_state(&self) -> Result<(), StoreError> { + self.inner().snap_state = Default::default(); Ok(()) } @@ -508,6 +494,15 @@ impl StoreEngine for Store { self.inner().chain_data.is_synced = status; Ok(()) } + + fn set_state_heal_paths(&self, paths: Vec) -> Result<(), StoreError> { + self.inner().snap_state.state_heal_paths = Some(paths); + Ok(()) + } + + fn get_state_heal_paths(&self) -> Result>, StoreError> { + Ok(self.inner().snap_state.state_heal_paths.clone()) + } } impl Debug for Store { diff --git a/crates/storage/store/engines/libmdbx.rs b/crates/storage/store/engines/libmdbx.rs index 703989d3c2..b932ea06e5 100644 --- a/crates/storage/store/engines/libmdbx.rs +++ b/crates/storage/store/engines/libmdbx.rs @@ -14,7 +14,7 @@ use ethrex_core::types::{ }; use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::{LibmdbxDupsortTrieDB, LibmdbxTrieDB, Trie}; +use ethrex_trie::{LibmdbxDupsortTrieDB, LibmdbxTrieDB, Nibbles, Trie}; use libmdbx::orm::{Decodable, Encodable, Table}; use libmdbx::{ dupsort, @@ -72,17 +72,6 @@ impl Store { txn.get::(key).map_err(StoreError::LibmdbxError) } - // Helper method to remove a value from a libmdbx table - fn delete(&self, key: T::Key) -> Result<(), StoreError> { - let txn = self - .db - .begin_readwrite() - .map_err(StoreError::LibmdbxError)?; - txn.delete::(key, None) - .map_err(StoreError::LibmdbxError)?; - txn.commit().map_err(StoreError::LibmdbxError) - } - fn get_block_hash_by_block_number( &self, number: BlockNumber, @@ -540,10 +529,6 @@ impl StoreEngine for Store { .map_err(StoreError::RLPDecode) } - fn clear_header_download_checkpoint(&self) -> Result<(), StoreError> { - self.delete::(SnapStateIndex::HeaderDownloadCheckpoint) - } - fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { self.write::( SnapStateIndex::StateTrieRootCheckpoint, @@ -558,10 +543,6 @@ impl StoreEngine for Store { .map_err(StoreError::RLPDecode) } - fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError> { - self.delete::(SnapStateIndex::StateTrieRootCheckpoint) - } - fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { self.write::( SnapStateIndex::StateTrieRootCheckpoint, @@ -576,37 +557,51 @@ impl StoreEngine for Store { .map_err(StoreError::RLPDecode) } - fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError> { - self.delete::(SnapStateIndex::StateTrieRootCheckpoint) - } - - fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { - self.write::( - SnapStateIndex::PendingStorageHealAccounts, - accounts.encode_to_vec(), - ) + fn set_storage_heal_paths( + &self, + accounts: Vec<(H256, Vec)>, + ) -> Result<(), StoreError> { + self.write::(SnapStateIndex::StorageHealPaths, accounts.encode_to_vec()) } - fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { - self.read::(SnapStateIndex::PendingStorageHealAccounts)? - .map(|ref h| >::decode(h)) + fn get_storage_heal_paths(&self) -> Result)>>, StoreError> { + self.read::(SnapStateIndex::StorageHealPaths)? + .map(|ref h| )>>::decode(h)) .transpose() .map_err(StoreError::RLPDecode) } - fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError> { - self.delete::(SnapStateIndex::PendingStorageHealAccounts) - } - fn is_synced(&self) -> Result { match self.read::(ChainDataIndex::IsSynced)? { None => Err(StoreError::Custom("Sync status not found".to_string())), Some(ref rlp) => RLPDecode::decode(rlp).map_err(|_| StoreError::DecodeError), } } + fn update_sync_status(&self, status: bool) -> Result<(), StoreError> { self.write::(ChainDataIndex::IsSynced, status.encode_to_vec()) } + + fn set_state_heal_paths(&self, paths: Vec) -> Result<(), StoreError> { + self.write::(SnapStateIndex::StateHealPaths, paths.encode_to_vec()) + } + + fn get_state_heal_paths(&self) -> Result>, StoreError> { + self.read::(SnapStateIndex::StateHealPaths)? + .map(|ref h| >::decode(h)) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_snap_state(&self) -> Result<(), StoreError> { + let txn = self + .db + .begin_readwrite() + .map_err(StoreError::LibmdbxError)?; + txn.clear_table::() + .map_err(StoreError::LibmdbxError)?; + txn.commit().map_err(StoreError::LibmdbxError) + } } impl Debug for Store { diff --git a/crates/storage/store/engines/redb.rs b/crates/storage/store/engines/redb.rs index c19f8c6dd5..71e1d812ca 100644 --- a/crates/storage/store/engines/redb.rs +++ b/crates/storage/store/engines/redb.rs @@ -7,6 +7,7 @@ use ethrex_core::{ }; use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; +use ethrex_trie::Nibbles; use ethrex_trie::{ db::{redb::RedBTrie, redb_multitable::RedBMultiTableTrieDB}, Trie, @@ -716,10 +717,6 @@ impl StoreEngine for RedBStore { .map_err(StoreError::RLPDecode) } - fn clear_header_download_checkpoint(&self) -> Result<(), StoreError> { - self.delete(SNAP_STATE_TABLE, SnapStateIndex::HeaderDownloadCheckpoint) - } - fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { self.write( SNAP_STATE_TABLE, @@ -735,10 +732,6 @@ impl StoreEngine for RedBStore { .map_err(StoreError::RLPDecode) } - fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError> { - self.delete(SNAP_STATE_TABLE, SnapStateIndex::StateTrieRootCheckpoint) - } - fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { self.write( SNAP_STATE_TABLE, @@ -754,29 +747,24 @@ impl StoreEngine for RedBStore { .map_err(StoreError::RLPDecode) } - fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError> { - self.delete(SNAP_STATE_TABLE, SnapStateIndex::StateTrieKeyCheckpoint) - } - - fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { + fn set_storage_heal_paths( + &self, + accounts: Vec<(H256, Vec)>, + ) -> Result<(), StoreError> { self.write( SNAP_STATE_TABLE, - SnapStateIndex::PendingStorageHealAccounts, + SnapStateIndex::StorageHealPaths, accounts.encode_to_vec(), ) } - fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { - self.read(SNAP_STATE_TABLE, SnapStateIndex::PendingStorageHealAccounts)? + fn get_storage_heal_paths(&self) -> Result)>>, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StorageHealPaths)? .map(|rlp| RLPDecode::decode(&rlp.value())) .transpose() .map_err(StoreError::RLPDecode) } - fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError> { - self.delete(SNAP_STATE_TABLE, SnapStateIndex::PendingStorageHealAccounts) - } - fn is_synced(&self) -> Result { match self.read(CHAIN_DATA_TABLE, ChainDataIndex::IsSynced)? { None => Err(StoreError::Custom("Sync status not found".to_string())), @@ -791,6 +779,29 @@ impl StoreEngine for RedBStore { status.encode_to_vec(), ) } + + fn set_state_heal_paths(&self, paths: Vec) -> Result<(), StoreError> { + self.write( + SNAP_STATE_TABLE, + SnapStateIndex::StateHealPaths, + paths.encode_to_vec(), + ) + } + + fn get_state_heal_paths(&self) -> Result>, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StateHealPaths)? + .map(|rlp| RLPDecode::decode(&rlp.value())) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_snap_state(&self) -> Result<(), StoreError> { + let write_txn = self.db.begin_write()?; + // Delete the whole table as it will be re-crated when we next open it + write_txn.delete_table(SNAP_STATE_TABLE)?; + write_txn.commit()?; + Ok(()) + } } impl redb::Value for ChainDataIndex { diff --git a/crates/storage/store/engines/utils.rs b/crates/storage/store/engines/utils.rs index cefd4f0fe6..6eca49eb74 100644 --- a/crates/storage/store/engines/utils.rs +++ b/crates/storage/store/engines/utils.rs @@ -44,12 +44,14 @@ impl From for ChainDataIndex { pub enum SnapStateIndex { // Hash of the last downloaded header in a previous sync cycle that was aborted HeaderDownloadCheckpoint = 0, - // Current root hash of the latest State Trie (Used for both fetch & heal) + // Current root hash of the latest State Trie (Used during state sync) StateTrieRootCheckpoint = 1, - // Accounts which storage needs healing - PendingStorageHealAccounts = 2, + // Paths from the storage trie in need of healing, grouped by hashed account address + StorageHealPaths = 2, // Last key fetched from the state trie StateTrieKeyCheckpoint = 3, + // Paths from the state trie in need of healing + StateHealPaths = 4, } impl From for SnapStateIndex { @@ -61,12 +63,11 @@ impl From for SnapStateIndex { x if x == SnapStateIndex::StateTrieRootCheckpoint as u8 => { SnapStateIndex::StateTrieRootCheckpoint } - x if x == SnapStateIndex::PendingStorageHealAccounts as u8 => { - SnapStateIndex::PendingStorageHealAccounts - } + x if x == SnapStateIndex::StorageHealPaths as u8 => SnapStateIndex::StorageHealPaths, x if x == SnapStateIndex::StateTrieKeyCheckpoint as u8 => { SnapStateIndex::StateTrieKeyCheckpoint } + x if x == SnapStateIndex::StateHealPaths as u8 => SnapStateIndex::StateHealPaths, _ => panic!("Invalid value when casting to SnapDataIndex: {}", value), } } diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index 74ed2cbea2..a7fdf13469 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -14,7 +14,7 @@ use ethrex_core::types::{ }; use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::Trie; +use ethrex_trie::{Nibbles, Trie}; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; use std::collections::{HashMap, HashSet}; @@ -1033,22 +1033,33 @@ impl Store { self.engine.get_state_trie_key_checkpoint() } - /// Sets the list of account hashes whose storage needs healing - pub fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { - self.engine.set_pending_storage_heal_accounts(accounts) + /// Sets the storage trie paths in need of healing, grouped by hashed address + pub fn set_storage_heal_paths( + &self, + accounts: Vec<(H256, Vec)>, + ) -> Result<(), StoreError> { + self.engine.set_storage_heal_paths(accounts) + } + + /// Gets the storage trie paths in need of healing, grouped by hashed address + #[allow(clippy::type_complexity)] + pub fn get_storage_heal_paths(&self) -> Result)>>, StoreError> { + self.engine.get_storage_heal_paths() + } + + /// Sets the state trie paths in need of healing + pub fn set_state_heal_paths(&self, paths: Vec) -> Result<(), StoreError> { + self.engine.set_state_heal_paths(paths) } - /// Gets the list of account hashes whose storage needs healing - pub fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { - self.engine.get_pending_storage_heal_accounts() + /// Gets the state trie paths in need of healing + pub fn get_state_heal_paths(&self) -> Result>, StoreError> { + self.engine.get_state_heal_paths() } - /// Clears all checkpoints written during a snap sync + /// Clears all checkpoint data created during the last snap sync pub fn clear_snap_state(&self) -> Result<(), StoreError> { - self.engine.clear_header_download_checkpoint()?; - self.engine.clear_pending_storage_heal_accounts()?; - self.engine.clear_state_trie_root_checkpoint()?; - self.engine.clear_state_trie_key_checkpoint() + self.engine.clear_snap_state() } pub fn is_synced(&self) -> Result {