diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 5ca1997ea..cd9c010dd 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -782,10 +782,36 @@ impl KaspaCli { .join(" "), ) } - SyncState::UtxoSync { total, .. } => { - Some([style("SYNC UTXO").red().to_string(), style(total.separated_string()).dim().to_string()].join(" ")) + SyncState::PruningPointUTXOs { processed, total } => { + let progress = (*processed as f64 / *total as f64).round() as u64; + Some( + [ + style("SYNC IBD Pruning Point UTXOs ").red().to_string(), + style(format!("{} ({}%)", processed.separated_string(), progress)).dim().to_string(), + ] + .join(" "), + ) + } + SyncState::VirtualUTXOs { processed, total } => { + let progress = (*processed as f64 / *total as f64).round() as u64; + Some( + [ + style("SYNC Virtual UTXOs").red().to_string(), + style(format!("{} ({}%)", processed.separated_string(), progress)).dim().to_string(), + ] + .join(" "), + ) + } + SyncState::UtxoIndexUTXOs { processed, total } => { + let progress = (*processed as f64 / *total as f64).round() as u64; + Some( + [ + style("SYNC UtxoIndex UTXOs").red().to_string(), + style(format!("{} ({}%)", processed.separated_string(), progress)).dim().to_string(), + ] + .join(" "), + ) } - SyncState::UtxoResync => Some([style("SYNC").red().to_string(), style("UTXO").black().to_string()].join(" ")), SyncState::NotSynced => Some([style("SYNC").red().to_string(), style("...").black().to_string()].join(" ")), SyncState::Synced { .. } => None, } diff --git a/components/consensusmanager/src/session.rs b/components/consensusmanager/src/session.rs index 2643739ee..5286c75b1 100644 --- a/components/consensusmanager/src/session.rs +++ b/components/consensusmanager/src/session.rs @@ -284,6 +284,10 @@ impl ConsensusSessionOwned { self.clone().spawn_blocking(move |c| c.get_virtual_utxos(from_outpoint, chunk_size, skip_first)).await } + pub async fn async_get_virtual_utxoset_count(&self) -> u64 { + self.clone().spawn_blocking(|c| c.get_virtual_utxoset_count()).await + } + pub async fn async_get_tips(&self) -> Vec { self.clone().spawn_blocking(|c| c.get_tips()).await } @@ -408,6 +412,10 @@ impl ConsensusSessionOwned { .await } + pub async fn async_get_pruning_point_utxoset_count(&self) -> u64 { + self.clone().spawn_blocking(|c| c.get_pruning_point_utxoset_count()).await + } + pub async fn async_get_missing_block_body_hashes(&self, high: Hash) -> ConsensusResult> { self.clone().spawn_blocking(move |c| c.get_missing_block_body_hashes(high)).await } diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index 91165b73d..098197924 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -187,6 +187,10 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn get_virtual_utxoset_count(&self) -> u64 { + unimplemented!() + } + fn get_tips(&self) -> Vec { unimplemented!() } @@ -324,6 +328,10 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn get_pruning_point_utxoset_count(&self) -> u64 { + unimplemented!() + } + fn get_missing_block_body_hashes(&self, high: Hash) -> ConsensusResult> { unimplemented!() } diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 1731729a3..65e0a4089 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -715,6 +715,10 @@ impl ConsensusApi for Consensus { iter.map(|item| item.unwrap()).collect() } + fn get_virtual_utxoset_count(&self) -> u64 { + self.virtual_stores.read().utxo_set.num_of_entries().unwrap() + } + fn get_tips(&self) -> Vec { self.body_tips_store.read().get().unwrap().read().iter().copied().collect_vec() } @@ -747,6 +751,10 @@ impl ConsensusApi for Consensus { Ok(utxos) } + fn get_pruning_point_utxoset_count(&self) -> u64 { + self.pruning_utxoset_stores.read().utxo_set.num_of_entries().unwrap() + } + fn modify_coinbase_payload(&self, payload: Vec, miner_data: &MinerData) -> CoinbaseResult> { self.services.coinbase_manager.modify_coinbase_payload(payload, miner_data) } diff --git a/consensus/src/consensus/storage.rs b/consensus/src/consensus/storage.rs index 89a0f5e26..fb92dce4c 100644 --- a/consensus/src/consensus/storage.rs +++ b/consensus/src/consensus/storage.rs @@ -19,6 +19,7 @@ use crate::{ tips::DbTipsStore, utxo_diffs::DbUtxoDiffsStore, utxo_multisets::DbUtxoMultisetsStore, + utxo_set::{self, UtxoSetStoreReader}, virtual_state::{LkgVirtualState, VirtualStores}, DB, }, @@ -28,6 +29,7 @@ use crate::{ use super::cache_policy_builder::CachePolicyBuilder as PolicyBuilder; use itertools::Itertools; use kaspa_consensus_core::{blockstatus::BlockStatus, BlockHashSet}; +use kaspa_core::info; use kaspa_database::registry::DatabaseStorePrefixes; use kaspa_hashes::Hash; use parking_lot::RwLock; @@ -239,6 +241,34 @@ impl ConsensusStorage { reachability::init(reachability_store.write().deref_mut()).unwrap(); relations::init(reachability_relations_store.write().deref_mut()); + // Ensure that the the `num_of_entries`` cached items are initialized for the utxo set stores. + // TODO: below inits should be removable with the next HF, a once off init should suffice. + if utxo_set::init(&mut pruning_utxoset_stores.write().utxo_set).unwrap() { + info!( + "Initialized the `num_of_entries` cached item db for the pruning utxoset with {0} entries", + pruning_utxoset_stores.write().utxo_set.num_of_entries().unwrap() + ); + } + + if utxo_set::init(&mut virtual_stores.write().utxo_set).unwrap() { + info!( + "Initialized the `num_of_entries` cached item db for the virtual utxoset with {0} entries", + virtual_stores.write().utxo_set.num_of_entries().unwrap() + ); + } + + // Sanity checks: + if config.enable_sanity_checks { + info!("Running sanity checks on the consensus storage, this may take a while..."); + assert_eq!( + pruning_utxoset_stores.read().utxo_set.num_of_entries().unwrap(), + pruning_utxoset_stores.read().utxo_set.iterator().count() as u64 + ); + assert_eq!( + virtual_stores.read().utxo_set.num_of_entries().unwrap(), + virtual_stores.read().utxo_set.iterator().count() as u64 + ); + } Arc::new(Self { db, statuses_store, diff --git a/consensus/src/model/stores/pruning_utxoset.rs b/consensus/src/model/stores/pruning_utxoset.rs index 116134514..03234bb47 100644 --- a/consensus/src/model/stores/pruning_utxoset.rs +++ b/consensus/src/model/stores/pruning_utxoset.rs @@ -19,7 +19,12 @@ pub struct PruningUtxosetStores { impl PruningUtxosetStores { pub fn new(db: Arc, utxoset_cache_policy: CachePolicy) -> Self { Self { - utxo_set: DbUtxoSetStore::new(db.clone(), utxoset_cache_policy, DatabaseStorePrefixes::PruningUtxoset.into()), + utxo_set: DbUtxoSetStore::new( + db.clone(), + utxoset_cache_policy, + DatabaseStorePrefixes::PruningUtxoset.into(), + DatabaseStorePrefixes::PruningUtxosetCount.into(), + ), utxoset_position_access: CachedDbItem::new(db, DatabaseStorePrefixes::PruningUtxosetPosition.into()), } } diff --git a/consensus/src/model/stores/utxo_set.rs b/consensus/src/model/stores/utxo_set.rs index 03add0948..f6f3fcfb9 100644 --- a/consensus/src/model/stores/utxo_set.rs +++ b/consensus/src/model/stores/utxo_set.rs @@ -5,12 +5,13 @@ use kaspa_consensus_core::{ utxo_view::UtxoView, }, }; -use kaspa_database::prelude::StoreResultExtensions; use kaspa_database::prelude::DB; use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}; use kaspa_database::prelude::{CachePolicy, StoreError}; +use kaspa_database::prelude::{CachedDbItem, StoreResultExtensions}; use kaspa_hashes::Hash; use rocksdb::WriteBatch; + use std::{error::Error, fmt::Display, sync::Arc}; type UtxoCollectionIterator<'a> = Box>> + 'a>; @@ -18,6 +19,7 @@ type UtxoCollectionIterator<'a> = Box Result, StoreError>; fn seek_iterator(&self, from_outpoint: Option, limit: usize, skip_first: bool) -> UtxoCollectionIterator; + fn num_of_entries(&self) -> Result; } pub trait UtxoSetStore: UtxoSetStoreReader { @@ -90,17 +92,44 @@ impl From for TransactionOutpoint { #[derive(Clone)] pub struct DbUtxoSetStore { db: Arc, + // Prefixes prefix: Vec, + num_of_entries_prefix: Vec, + // Accesses access: CachedDbAccess>, + // TODO: implement CachedAtomicDbItem store for such primitives. + // Should be no need to use a RwLock implicitly here. + num_of_entries: CachedDbItem, +} + +// TODO: this should be removable after the next HF. +pub fn init(store: &mut DbUtxoSetStore) -> Result { + // bool indicates if the store was initialized + match store.num_of_entries.read() { + Ok(_) => Ok(false), + Err(StoreError::KeyNotFound(_)) => { + store.num_of_entries.write(DirectDbWriter::new(&store.db), &(store.access.iterator().count() as u64))?; + Ok(true) + } + Err(e) => Err(e), + } } impl DbUtxoSetStore { - pub fn new(db: Arc, cache_policy: CachePolicy, prefix: Vec) -> Self { - Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, prefix.clone()), prefix } + pub fn new(db: Arc, cache_policy: CachePolicy, prefix: Vec, num_of_entries_prefix: Vec) -> Self { + let access = CachedDbAccess::new(db.clone(), cache_policy, prefix.clone()); + + Self { + db: db.clone(), + prefix: prefix.clone(), + access: access.clone(), + num_of_entries_prefix: num_of_entries_prefix.clone(), + num_of_entries: CachedDbItem::new(db.clone(), num_of_entries_prefix.clone()), + } } pub fn clone_with_new_cache(&self, cache_policy: CachePolicy) -> Self { - Self::new(Arc::clone(&self.db), cache_policy, self.prefix.clone()) + Self::new(Arc::clone(&self.db), cache_policy, self.prefix.clone(), self.num_of_entries_prefix.clone()) } /// See comment at [`UtxoSetStore::write_diff`] @@ -108,10 +137,14 @@ impl DbUtxoSetStore { let mut writer = BatchDbWriter::new(batch); self.access.delete_many(&mut writer, &mut utxo_diff.removed().keys().map(|o| (*o).into()))?; self.access.write_many(&mut writer, &mut utxo_diff.added().iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; + self.num_of_entries.update(&mut writer, |num_of_entries| { + (num_of_entries + utxo_diff.added().len() as u64) - utxo_diff.removed().len() as u64 + })?; Ok(()) } pub fn iterator(&self) -> impl Iterator), Box>> + '_ { + //TODO: exact size iterator should be implementable if we have a way to utilize the count. self.access.iterator().map(|iter_result| match iter_result { Ok((key_bytes, utxo_entry)) => match UtxoKey::try_from(key_bytes.as_ref()) { Ok(utxo_key) => { @@ -126,7 +159,12 @@ impl DbUtxoSetStore { /// Clear the store completely in DB and cache pub fn clear(&mut self) -> Result<(), StoreError> { - self.access.delete_all(DirectDbWriter::new(&self.db)) + let mut batch = WriteBatch::default(); // batch internally to keep consistency + let mut writer = BatchDbWriter::new(&mut batch); + self.access.delete_all(&mut writer)?; + self.num_of_entries.write(&mut writer, &0u64)?; + self.db.write(batch)?; + Ok(()) } /// Write directly from an iterator and do not cache any data. NOTE: this action also clears the cache @@ -134,8 +172,18 @@ impl DbUtxoSetStore { &mut self, utxos: impl IntoIterator)>, ) -> Result<(), StoreError> { - let mut writer = DirectDbWriter::new(&self.db); - self.access.write_many_without_cache(&mut writer, &mut utxos.into_iter().map(|(o, e)| (o.into(), e)))?; + let mut batch = WriteBatch::default(); // batch internally to keep consistency + let mut writer = BatchDbWriter::new(&mut batch); + let mut count = 0u64; + self.access.write_many_without_cache( + &mut writer, + &mut utxos.into_iter().map(|(o, e)| { + count += 1; + (o.into(), e) + }), + )?; + self.num_of_entries.update(&mut writer, |c| c + count)?; + self.db.write(batch)?; Ok(()) } } @@ -159,27 +207,45 @@ impl UtxoSetStoreReader for DbUtxoSetStore { Ok((outpoint, UtxoEntry::clone(&entry))) })) } + + fn num_of_entries(&self) -> Result { + self.num_of_entries.read() + } } impl UtxoSetStore for DbUtxoSetStore { fn write_diff(&mut self, utxo_diff: &UtxoDiff) -> Result<(), StoreError> { - let mut writer = DirectDbWriter::new(&self.db); + let mut batch = WriteBatch::default(); // batch internally to keep consistency + let mut writer = BatchDbWriter::new(&mut batch); self.access.delete_many(&mut writer, &mut utxo_diff.removed().keys().map(|o| (*o).into()))?; self.access.write_many(&mut writer, &mut utxo_diff.added().iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; + self.num_of_entries.update(&mut writer, |num_of_entries| { + (num_of_entries + utxo_diff.added().len() as u64) - utxo_diff.removed().len() as u64 + })?; + self.db.write(batch)?; Ok(()) } fn write_many(&mut self, utxos: &[(TransactionOutpoint, UtxoEntry)]) -> Result<(), StoreError> { - let mut writer = DirectDbWriter::new(&self.db); + let mut batch = WriteBatch::default(); // batch internally to keep consistency + let mut writer = BatchDbWriter::new(&mut batch); self.access.write_many(&mut writer, &mut utxos.iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; + self.num_of_entries.update(&mut writer, |num_of_entries| num_of_entries + utxos.len() as u64)?; + self.db.write(batch)?; Ok(()) } } #[cfg(test)] mod tests { + use std::collections::HashMap; + + use crate::test_helpers::{generate_random_outpoint, generate_random_utxo}; + use super::*; use itertools::Itertools; + use kaspa_database::{create_temp_db, prelude::ConnBuilder, registry::DatabaseStorePrefixes}; + use rand::{rngs::SmallRng, SeedableRng}; #[test] fn test_utxo_key_conversion() { @@ -193,4 +259,79 @@ mod tests { assert_eq!(key.0.to_vec(), tx_id.as_bytes().iter().copied().chain(index.to_le_bytes().iter().copied()).collect_vec()); }); } + + #[test] + fn test_num_of_entries() { + let (_db_lt, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let mut store = DbUtxoSetStore::new( + db.clone(), + CachePolicy::Empty, + DatabaseStorePrefixes::VirtualUtxoset.into(), + DatabaseStorePrefixes::VirtualUtxosetCount.into(), + ); + init(&mut store).unwrap(); + assert_eq!(store.num_of_entries().unwrap(), 0); + + let mut rng: SmallRng = SmallRng::seed_from_u64(42u64); + // test added only + let to_add = (0..2).map(|_| (generate_random_outpoint(&mut rng), generate_random_utxo(&mut rng))).collect(); + let mut utxo_diff = UtxoDiff::new(to_add, HashMap::new()); + + // Test write_diff + // write 2 + let mut batch = WriteBatch::default(); + store.write_diff_batch(&mut batch, &utxo_diff).unwrap(); + db.write(batch).unwrap(); + assert_eq!(store.num_of_entries().unwrap(), store.iterator().count() as u64); + assert_eq!(store.num_of_entries().unwrap(), 2); + + // Write 2 & Remove 2 + utxo_diff.add.iter().take(2).for_each(|(o, v)| { + utxo_diff.remove.insert(*o, v.clone()); + }); + utxo_diff.add.clear(); + utxo_diff.add = (0..2).map(|_| (generate_random_outpoint(&mut rng), generate_random_utxo(&mut rng))).collect(); + + let mut batch = WriteBatch::default(); + store.write_diff_batch(&mut batch, &utxo_diff).unwrap(); + db.write(batch).unwrap(); + assert_eq!(store.num_of_entries().unwrap(), store.iterator().count() as u64); + assert_eq!(store.num_of_entries().unwrap(), 2); + utxo_diff.remove.clear(); + // Remove 2 + + let mut batch = WriteBatch::default(); + utxo_diff.add.iter().take(2).for_each(|(o, v)| { + utxo_diff.remove.insert(*o, v.clone()); + }); + utxo_diff.add.clear(); + store.write_diff_batch(&mut batch, &utxo_diff).unwrap(); + db.write(batch).unwrap(); + assert_eq!(store.num_of_entries().unwrap(), store.iterator().count() as u64); + assert_eq!(store.num_of_entries().unwrap(), 0); + utxo_diff.remove.clear(); + + // Test write_many + // Write 2 + utxo_diff.add = (0..2).map(|_| (generate_random_outpoint(&mut rng), generate_random_utxo(&mut rng))).collect(); + store.write_many(&utxo_diff.add.iter().map(|(o, v)| (*o, v.clone())).collect_vec()).unwrap(); + assert_eq!(store.num_of_entries().unwrap(), store.iterator().count() as u64); + assert_eq!(store.num_of_entries().unwrap(), 2); + utxo_diff.add.clear(); + + // Test Iterator + // write 2 + store + .write_from_iterator_without_cache( + (0..2).map(|_| (generate_random_outpoint(&mut rng), Arc::new(generate_random_utxo(&mut rng)))), + ) + .unwrap(); + assert_eq!(store.num_of_entries().unwrap(), store.iterator().count() as u64); + assert_eq!(store.num_of_entries().unwrap(), 4); + + // Test clear + store.clear().unwrap(); + assert_eq!(store.num_of_entries().unwrap(), store.iterator().count() as u64); + assert_eq!(store.num_of_entries().unwrap(), 0); + } } diff --git a/consensus/src/model/stores/virtual_state.rs b/consensus/src/model/stores/virtual_state.rs index 62672a913..752e8be84 100644 --- a/consensus/src/model/stores/virtual_state.rs +++ b/consensus/src/model/stores/virtual_state.rs @@ -137,7 +137,12 @@ impl VirtualStores { pub fn new(db: Arc, lkg_virtual_state: LkgVirtualState, utxoset_cache_policy: CachePolicy) -> Self { Self { state: DbVirtualStateStore::new(db.clone(), lkg_virtual_state), - utxo_set: DbUtxoSetStore::new(db, utxoset_cache_policy, DatabaseStorePrefixes::VirtualUtxoset.into()), + utxo_set: DbUtxoSetStore::new( + db, + utxoset_cache_policy, + DatabaseStorePrefixes::VirtualUtxoset.into(), + DatabaseStorePrefixes::VirtualUtxosetCount.into(), + ), } } } diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 88fee97bf..baf2c9699 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -30,6 +30,7 @@ use crate::{ tips::{DbTipsStore, TipsStoreReader}, utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader}, utxo_multisets::{DbUtxoMultisetsStore, UtxoMultisetsStoreReader}, + utxo_set::UtxoSetStoreReader, virtual_state::{LkgVirtualState, VirtualState, VirtualStateStoreReader, VirtualStores}, DB, }, @@ -94,6 +95,7 @@ use std::{ collections::{BinaryHeap, HashMap, VecDeque}, ops::Deref, sync::{atomic::Ordering, Arc}, + time::Instant, }; pub struct VirtualStateProcessor { @@ -163,6 +165,8 @@ pub struct VirtualStateProcessor { } impl VirtualStateProcessor { + pub const IDENT: &'static str = "VirtualStateProcessor"; + #[allow(clippy::too_many_arguments)] pub fn new( receiver: CrossbeamReceiver, @@ -1085,8 +1089,26 @@ impl VirtualStateProcessor { let mut virtual_write = self.virtual_stores.write(); virtual_write.utxo_set.clear().unwrap(); - for chunk in &pruning_utxoset_read.utxo_set.iterator().map(|iter_result| iter_result.unwrap()).chunks(1000) { + let to_process = pruning_utxoset_read.utxo_set.num_of_entries().unwrap(); + let mut processed = 0; + let chunk_size = 1000; + let mut instant = Instant::now(); + info!("[{0}] Transfering {1} Utxos from pruning to virtual store..", Self::IDENT, to_process); + for chunk in &pruning_utxoset_read.utxo_set.iterator().map(|iter_result| iter_result.unwrap()).chunks(chunk_size) { + if instant.elapsed().as_secs() > 5 { + // This is fast, so time-bound it to every 5 secs. + info!( + "[{0}] Transfering from pruning to virtual store {1} + {2} / {3} UTXOs ({4:.0}%)", + Self::IDENT, + processed, + chunk_size, + to_process, + ((processed + chunk_size) as f64) * 100.0 / to_process as f64 + ); + instant = Instant::now(); + } virtual_write.utxo_set.write_from_iterator_without_cache(chunk).unwrap(); + processed += chunk_size; } } diff --git a/database/src/db.rs b/database/src/db.rs index b1d6bf24e..57638f5e7 100644 --- a/database/src/db.rs +++ b/database/src/db.rs @@ -5,6 +5,8 @@ use std::path::PathBuf; pub use conn_builder::ConnBuilder; use kaspa_utils::fd_budget::FDGuard; +use crate::prelude::StoreResult; + mod conn_builder; /// The DB type used for Kaspad stores @@ -17,6 +19,12 @@ impl DB { pub fn new(inner: DBWithThreadMode, fd_guard: FDGuard) -> Self { Self { inner, _fd_guard: fd_guard } } + + // Useful for testing if a key exists, and if it doesn't perform initialization logic + // Such as in cases when a new store is created for the node. + pub fn has_key(&self, key: &[u8]) -> StoreResult { + Ok(self.inner.get_pinned(key)?.is_some()) + } } impl DerefMut for DB { diff --git a/database/src/registry.rs b/database/src/registry.rs index 752efb97b..600c9cfc6 100644 --- a/database/src/registry.rs +++ b/database/src/registry.rs @@ -36,11 +36,14 @@ pub enum DatabaseStorePrefixes { UtxoMultisets = 26, VirtualUtxoset = 27, VirtualState = 28, + VirtualUtxosetCount = 29, //TODO: move this beside `VirtualUtxoset` whenever we get a chance to break the Db // ---- Decomposed reachability stores ---- ReachabilityTreeChildren = 30, ReachabilityFutureCoveringSet = 31, + PruningUtxosetCount = 32, //TODO: move this beside `PruningUtxoset` whenever we get a chance to break the Db + // ---- Metadata ---- MultiConsensusMetadata = 124, ConsensusEntries = 125, diff --git a/indexes/utxoindex/src/index.rs b/indexes/utxoindex/src/index.rs index b71935afa..4ef172eca 100644 --- a/indexes/utxoindex/src/index.rs +++ b/indexes/utxoindex/src/index.rs @@ -134,40 +134,69 @@ impl UtxoIndexApi for UtxoIndex { /// 1) There is an implicit expectation that the consensus store must have [VirtualParent] tips. i.e. consensus database must be initiated. /// 2) resyncing while consensus notifies of utxo differences, may result in a corrupted db. fn resync(&mut self) -> UtxoIndexResult<()> { - info!("Resyncing the utxoindex..."); + trace!("Resyncing the utxoindex..."); self.store.delete_all()?; let consensus = self.consensus_manager.consensus(); let session = futures::executor::block_on(consensus.session_blocking()); let consensus_tips = session.get_virtual_parents(); + let mut circulating_supply: CirculatingSupply = 0; + let mut from_outpoint = None; + + let to_process = session.get_virtual_utxoset_count(); + info!("[{0}] Resyncing {1} Utxos", IDENT, to_process); + + if to_process == 0 { + // We may return early after setting some initial values. + self.store.set_tips(consensus_tips, true)?; + return Ok(self.store.insert_circulating_supply(0u64, true)?); + } + let mut processed = (0u64, 0u64); // .0 holds the value of the former display + let mut percent_completed = (0f64, 0f64); // .0 holds the value of the former display + let percent_display_granularity = 1.0; // in percent + let mut instant = std::time::Instant::now(); + let mut is_start = true; + let mut is_end = false; - //Initial batch is without specified seek and none-skipping. - let mut virtual_utxo_batch = session.get_virtual_utxos(None, RESYNC_CHUNK_SIZE, false); - let mut current_chunk_size = virtual_utxo_batch.len(); - trace!("[{0}] resyncing with batch of {1} utxos from consensus db", IDENT, current_chunk_size); // While loop stops resync attempts from an empty utxo db, and unneeded processing when the utxo state size happens to be a multiple of [`RESYNC_CHUNK_SIZE`] - while current_chunk_size > 0 { + while !is_end { // Potential optimization TODO: iterating virtual utxos into an [UtxoIndexChanges] struct is a bit of overhead (i.e. a potentially unneeded loop), // but some form of pre-iteration is done to extract and commit circulating supply separately. - let mut utxoindex_changes = UtxoIndexChanges::new(); //reset changes. + let chunk = session.get_virtual_utxos(from_outpoint, RESYNC_CHUNK_SIZE, !is_start); + is_start = false; + from_outpoint = Some(chunk.last().expect("expected a last outpoint").0); + processed.1 += chunk.len() as u64; + is_end = processed.1 == to_process; - let next_outpoint_from = Some(virtual_utxo_batch.last().expect("expected a last outpoint").0); - utxoindex_changes.add_utxos_from_vector(virtual_utxo_batch); + percent_completed.1 = (processed.1 as f64 / to_process as f64) * 100.0; + + let mut utxoindex_changes = UtxoIndexChanges::new(); + utxoindex_changes.add_utxos_from_vector(chunk); circulating_supply += utxoindex_changes.supply_change as CirculatingSupply; self.store.update_utxo_state(&utxoindex_changes.utxo_changes.added, &utxoindex_changes.utxo_changes.removed, true)?; - if current_chunk_size < RESYNC_CHUNK_SIZE { - break; - }; - - virtual_utxo_batch = session.get_virtual_utxos(next_outpoint_from, RESYNC_CHUNK_SIZE, true); - current_chunk_size = virtual_utxo_batch.len(); - trace!("[{0}] resyncing with batch of {1} utxos from consensus db", IDENT, current_chunk_size); + if percent_completed.0 + percent_display_granularity <= percent_completed.1 || is_end { + let processed_diff = processed.1 - processed.0; + + info!( + "[{0}] Resyncing - Utxos: {1} + {2} / {3} ({4:.0}/s); Circulating Sompi Supply: {5}; {6:.0}%", + IDENT, + processed.1, + processed_diff, + to_process, + processed_diff as f64 / instant.elapsed().as_secs_f64(), + circulating_supply, + if is_end { 100.0 } else { percent_completed.1 }, + ); + processed.0 = processed.1; + percent_completed.0 = percent_completed.1; + instant = std::time::Instant::now(); + } } // Commit to the remaining stores. @@ -279,7 +308,7 @@ mod tests { let consensus_utxos = tc.get_virtual_utxos(None, usize::MAX, false); // `usize::MAX` to ensure to get all. let mut i = 0; let mut consensus_supply: CirculatingSupply = 0; - let consensus_utxo_set_size = consensus_utxos.len(); + let consensus_utxoset_count = consensus_utxos.len(); for (tx_outpoint, utxo_entry) in consensus_utxos.into_iter() { consensus_supply += utxo_entry.amount; let indexed_utxos = utxoindex @@ -296,7 +325,7 @@ mod tests { } } - assert_eq!(i, consensus_utxo_set_size); + assert_eq!(i, consensus_utxoset_count); assert_eq!(utxoindex.read().get_circulating_supply().expect("expected circulating supply"), consensus_supply); assert_eq!(*utxoindex.read().get_utxo_index_tips().expect("expected circulating supply"), tc.get_virtual_parents()); @@ -359,7 +388,7 @@ mod tests { // these utxos correspond the initial sync test. let consensus_utxos = tc.get_virtual_utxos(None, usize::MAX, false); // `usize::MAX` to ensure to get all. let mut i = 0; - let consensus_utxo_set_size = consensus_utxos.len(); + let consensus_utxoset_count = consensus_utxos.len(); for (tx_outpoint, utxo_entry) in consensus_utxos.into_iter() { let indexed_utxos = utxoindex .read() @@ -374,7 +403,7 @@ mod tests { i += 1; } } - assert_eq!(i, consensus_utxo_set_size); + assert_eq!(i, consensus_utxoset_count); assert_eq!(*utxoindex.read().get_utxo_index_tips().expect("expected circulating supply"), tc.get_virtual_parents()); // Deconstruct diff --git a/indexes/utxoindex/src/lib.rs b/indexes/utxoindex/src/lib.rs index b34b5c2ef..752901642 100644 --- a/indexes/utxoindex/src/lib.rs +++ b/indexes/utxoindex/src/lib.rs @@ -9,4 +9,4 @@ mod testutils; pub use crate::core::*; //Expose all things intended for external usage. pub use crate::index::UtxoIndex; //we expose this separately to initiate the index. -const IDENT: &str = "utxoindex"; +const IDENT: &str = "UtxoIndex"; diff --git a/indexes/utxoindex/src/stores/supply.rs b/indexes/utxoindex/src/stores/supply.rs index de45864c0..af922a20b 100644 --- a/indexes/utxoindex/src/stores/supply.rs +++ b/indexes/utxoindex/src/stores/supply.rs @@ -22,6 +22,8 @@ pub trait CirculatingSupplyStore: CirculatingSupplyStoreReader { #[derive(Clone)] pub struct DbCirculatingSupplyStore { db: Arc, + // TODO: implement CachedAtomicDbItem store for primitives. + // Should be no need to use a RwLock here. access: CachedDbItem, } diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 1e38deddd..b82f1c0f0 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -490,7 +490,7 @@ staging selected tip ({}) is too small or negative. Aborting IBD...", multiset = consensus .clone() .spawn_blocking(move |c| { - c.append_imported_pruning_point_utxos(&chunk, &mut multiset); + c.append_imported_pruning_point_utxos(&chunk.0, &mut multiset); multiset }) .await; diff --git a/protocol/flows/src/v5/ibd/streams.rs b/protocol/flows/src/v5/ibd/streams.rs index 4ddf717cd..a90ca4457 100644 --- a/protocol/flows/src/v5/ibd/streams.rs +++ b/protocol/flows/src/v5/ibd/streams.rs @@ -10,7 +10,7 @@ use kaspa_consensus_core::{ use kaspa_core::{debug, info}; use kaspa_p2p_lib::{ common::{ProtocolError, DEFAULT_TIMEOUT}, - convert::model::trusted::TrustedDataEntry, + convert::{error::ConversionError, model::trusted::TrustedDataEntry}, make_message, pb::{ kaspad_message::Payload, RequestNextHeadersMessage, RequestNextPruningPointAndItsAnticoneBlocksMessage, @@ -142,50 +142,72 @@ pub struct PruningPointUtxosetChunkStream<'a, 'b> { incoming_route: &'b mut IncomingRoute, i: usize, // Chunk index utxo_count: usize, + signaled_utxoset_size: usize, } impl<'a, 'b> PruningPointUtxosetChunkStream<'a, 'b> { + pub const IDENT: &'static str = "PruningPointUtxosetChunkStream"; + pub fn new(router: &'a Router, incoming_route: &'b mut IncomingRoute) -> Self { - Self { router, incoming_route, i: 0, utxo_count: 0 } + Self { router, incoming_route, i: 0, utxo_count: 0, signaled_utxoset_size: 0 } } - pub async fn next(&mut self) -> Result, ProtocolError> { - let res: Result, ProtocolError> = match timeout(DEFAULT_TIMEOUT, self.incoming_route.recv()).await { - Ok(op) => { - if let Some(msg) = op { - match msg.payload { - Some(Payload::PruningPointUtxoSetChunk(payload)) => Ok(Some(payload.try_into()?)), - Some(Payload::DonePruningPointUtxoSetChunks(_)) => { - info!("Finished receiving the UTXO set. Total UTXOs: {}", self.utxo_count); - Ok(None) - } - Some(Payload::UnexpectedPruningPoint(_)) => { - // Although this can happen also to an honest syncer (if his pruning point moves during the sync), - // we prefer erring and disconnecting to avoid possible exploits by a syncer repeating this failure - Err(ProtocolError::ConsensusError(ConsensusError::UnexpectedPruningPoint)) + pub async fn next(&mut self) -> Result, ProtocolError> { + let res: Result, ProtocolError> = + match timeout(DEFAULT_TIMEOUT, self.incoming_route.recv()).await { + Ok(op) => { + if let Some(msg) = op { + match msg.payload { + Some(Payload::PruningPointUtxoSetChunk(payload)) => { + Ok(Some(payload.try_into().map_err(|_| ConversionError::General)?)) + } + Some(Payload::DonePruningPointUtxoSetChunks(_)) => { + info!("[{0}] Finished receiving the UTXO set. Total UTXOs: {1}", Self::IDENT, self.utxo_count); + Ok(None) + } + Some(Payload::UnexpectedPruningPoint(_)) => { + // Although this can happen also to an honest syncer (if his pruning point moves during the sync), + // we prefer erring and disconnecting to avoid possible exploits by a syncer repeating this failure + Err(ProtocolError::ConsensusError(ConsensusError::UnexpectedPruningPoint)) + } + _ => Err(ProtocolError::UnexpectedMessage( + stringify!( + Payload::PruningPointUtxoSetChunk + | Payload::DonePruningPointUtxoSetChunks + | Payload::UnexpectedPruningPoint + ), + msg.payload.as_ref().map(|v| v.into()), + )), } - _ => Err(ProtocolError::UnexpectedMessage( - stringify!( - Payload::PruningPointUtxoSetChunk - | Payload::DonePruningPointUtxoSetChunks - | Payload::UnexpectedPruningPoint - ), - msg.payload.as_ref().map(|v| v.into()), - )), + } else { + Err(ProtocolError::ConnectionClosed) } - } else { - Err(ProtocolError::ConnectionClosed) } - } - Err(_) => Err(ProtocolError::Timeout(DEFAULT_TIMEOUT)), - }; + Err(_) => Err(ProtocolError::Timeout(DEFAULT_TIMEOUT)), + }; // Request the next batch only if the stream is still live if let Ok(Some(chunk)) = res { self.i += 1; - self.utxo_count += chunk.len(); - if self.i % IBD_BATCH_SIZE == 0 { - info!("Received {} UTXO set chunks so far, totaling in {} UTXOs", self.i, self.utxo_count); + self.utxo_count += chunk.0.len(); + if self.i == 1 && chunk.1 > 0 { + // We expect a signaled set size only in first chunk, and if `chunk.1 == 0`, we are probably ibding from a node without this feature. + info!("[{0}]: Start Streaming of pruning point Utxo set; signaled set size: {1}", Self::IDENT, chunk.1); + self.signaled_utxoset_size = chunk.1; + } + if self.i % IBD_BATCH_SIZE == 0 || self.utxo_count == self.signaled_utxoset_size { + info!( + "[{0}]: Received {1} + {2} / {3} signaled UTXOs ({4:.0}%)", + Self::IDENT, + self.utxo_count, + chunk.0.len() * IBD_BATCH_SIZE, + if self.signaled_utxoset_size > 0 { self.signaled_utxoset_size.to_string() } else { f64::NAN.to_string() }, + if self.signaled_utxoset_size > 0 { + (self.utxo_count as f64 / self.signaled_utxoset_size as f64) * 100.0 + } else { + f64::NAN + } + ); self.router .enqueue(make_message!( Payload::RequestNextPruningPointUtxoSetChunk, diff --git a/protocol/flows/src/v5/request_pruning_point_utxo_set.rs b/protocol/flows/src/v5/request_pruning_point_utxo_set.rs index e3eb5cc15..10dfa41f7 100644 --- a/protocol/flows/src/v5/request_pruning_point_utxo_set.rs +++ b/protocol/flows/src/v5/request_pruning_point_utxo_set.rs @@ -49,7 +49,9 @@ impl RequestPruningPointUtxoSetFlow { let consensus = self.ctx.consensus(); let mut session = consensus.session().await; - + let mut is_start = true; + let to_process = session.async_get_pruning_point_utxoset_count().await; + let mut processed = 0; loop { // We avoid keeping the consensus session across the limitless dequeue call below let pruning_point_utxos = @@ -57,7 +59,18 @@ impl RequestPruningPointUtxoSetFlow { Err(ConsensusError::UnexpectedPruningPoint) => return self.send_unexpected_pruning_point_message().await, res => res, }?; - debug!("Retrieved {} UTXOs for pruning point {}", pruning_point_utxos.len(), expected_pp); + + // Update the total processed count + processed += pruning_point_utxos.len() as u64; + + debug!( + "Retrieved {0} + {1} / {2} UTXOs from pp {3} ({4:.0}%)", + processed, + pruning_point_utxos.len(), + to_process, + expected_pp, + (processed as f64 / to_process as f64) * 100.0 + ); // Send the chunk self.router @@ -67,7 +80,8 @@ impl RequestPruningPointUtxoSetFlow { outpoint_and_utxo_entry_pairs: pruning_point_utxos .iter() .map(|(outpoint, entry)| { (outpoint, entry).into() }) - .collect_vec() + .collect_vec(), + utxoset_size: if is_start { to_process } else { 0u64 }, // Only send the size in the first chunk, see comment in p2p.proto for more information. } )) .await?; @@ -80,12 +94,14 @@ impl RequestPruningPointUtxoSetFlow { } // This indicates that there are no more entries to query - if pruning_point_utxos.len() < CHUNK_SIZE { + if to_process == processed { return self.send_done_message(expected_pp).await; } // Mark the beginning of the next chunk from_outpoint = Some(pruning_point_utxos.last().expect("not empty by prev condition").0); + + is_start = false; } } diff --git a/protocol/p2p/proto/p2p.proto b/protocol/p2p/proto/p2p.proto index f9395ed37..b1059c883 100644 --- a/protocol/p2p/proto/p2p.proto +++ b/protocol/p2p/proto/p2p.proto @@ -159,6 +159,17 @@ message RequestPruningPointUTXOSetMessage{ message PruningPointUtxoSetChunkMessage{ repeated OutpointAndUtxoEntryPair outpointAndUtxoEntryPairs = 1; + /// This represents the total amount of UTXOs to process. + /// + /// Note: because grpc + /// a) translates empty to default values in clients, + /// b) default values have no bandwidth cost, + /// It is optimized to only be be populated in the first chunk, which means: + /// 1) none-updated syncers are still able transmit the message without this data i.e. it is a none-breaking changes. + /// 2) We save on bandwidth by not sending this data in every chunk. + /// + /// This means the only drawback is both the client and syncer need to be updated to make use of this extra data. + uint64 utxosetSize = 2; } message OutpointAndUtxoEntryPair{ diff --git a/protocol/p2p/src/convert/messages.rs b/protocol/p2p/src/convert/messages.rs index fd695ec0c..c217ea02a 100644 --- a/protocol/p2p/src/convert/messages.rs +++ b/protocol/p2p/src/convert/messages.rs @@ -127,11 +127,14 @@ impl TryFrom for Vec> { } } -impl TryFrom for Vec<(TransactionOutpoint, UtxoEntry)> { +impl TryFrom for (Vec<(TransactionOutpoint, UtxoEntry)>, usize) { type Error = ConversionError; fn try_from(msg: protowire::PruningPointUtxoSetChunkMessage) -> Result { - msg.outpoint_and_utxo_entry_pairs.into_iter().map(|p| p.try_into()).collect() + Ok(( + msg.outpoint_and_utxo_entry_pairs.into_iter().map(|p| p.try_into()).collect::>()?, + msg.utxoset_size as usize, + )) } } diff --git a/rpc/core/src/model/message.rs b/rpc/core/src/model/message.rs index ba8d6abf7..84a497317 100644 --- a/rpc/core/src/model/message.rs +++ b/rpc/core/src/model/message.rs @@ -1047,9 +1047,12 @@ pub struct GetBlockDagInfoResponse { pub pruning_point_hash: RpcHash, pub virtual_daa_score: u64, pub sink: RpcHash, + pub virtual_utxo_count: u64, + pub pruning_point_utxo_count: u64, } impl GetBlockDagInfoResponse { + #[allow(clippy::too_many_arguments)] pub fn new( network: RpcNetworkId, block_count: u64, @@ -1061,6 +1064,8 @@ impl GetBlockDagInfoResponse { pruning_point_hash: RpcHash, virtual_daa_score: u64, sink: RpcHash, + virtual_utxo_count: u64, + pruning_point_utxo_count: u64, ) -> Self { Self { network, @@ -1073,6 +1078,8 @@ impl GetBlockDagInfoResponse { pruning_point_hash, virtual_daa_score, sink, + virtual_utxo_count, + pruning_point_utxo_count, } } } @@ -1090,7 +1097,8 @@ impl Serializer for GetBlockDagInfoResponse { store!(RpcHash, &self.pruning_point_hash, writer)?; store!(u64, &self.virtual_daa_score, writer)?; store!(RpcHash, &self.sink, writer)?; - + store!(u64, &self.virtual_utxo_count, writer)?; + store!(u64, &self.pruning_point_utxo_count, writer)?; Ok(()) } } @@ -1108,6 +1116,8 @@ impl Deserializer for GetBlockDagInfoResponse { let pruning_point_hash = load!(RpcHash, reader)?; let virtual_daa_score = load!(u64, reader)?; let sink = load!(RpcHash, reader)?; + let virtual_utxo_count = load!(u64, reader)?; + let pruning_point_utxo_count = load!(u64, reader)?; Ok(Self { network, @@ -1120,6 +1130,8 @@ impl Deserializer for GetBlockDagInfoResponse { pruning_point_hash, virtual_daa_score, sink, + virtual_utxo_count, + pruning_point_utxo_count, }) } } diff --git a/rpc/core/src/model/tests.rs b/rpc/core/src/model/tests.rs index d931f5ac2..665c5a926 100644 --- a/rpc/core/src/model/tests.rs +++ b/rpc/core/src/model/tests.rs @@ -717,6 +717,8 @@ mod mockery { pruning_point_hash: mock(), virtual_daa_score: mock(), sink: mock(), + virtual_utxo_count: mock(), + pruning_point_utxo_count: mock(), } } } diff --git a/rpc/grpc/core/proto/rpc.proto b/rpc/grpc/core/proto/rpc.proto index e218681b6..0e17f0261 100644 --- a/rpc/grpc/core/proto/rpc.proto +++ b/rpc/grpc/core/proto/rpc.proto @@ -439,6 +439,8 @@ message GetBlockDagInfoResponseMessage{ string networkName = 1; uint64 blockCount = 2; uint64 headerCount = 3; + uint64 virtualUtxoCount = 11; + uint64 pruningPointUtxoCount = 12; repeated string tipHashes = 4; double difficulty = 5; int64 pastMedianTime = 6; diff --git a/rpc/grpc/core/src/convert/message.rs b/rpc/grpc/core/src/convert/message.rs index c0e75cf03..e3c54a8fc 100644 --- a/rpc/grpc/core/src/convert/message.rs +++ b/rpc/grpc/core/src/convert/message.rs @@ -310,6 +310,8 @@ from!(item: RpcResult<&kaspa_rpc_core::GetBlockDagInfoResponse>, protowire::GetB pruning_point_hash: item.pruning_point_hash.to_string(), virtual_daa_score: item.virtual_daa_score, sink: item.sink.to_string(), + virtual_utxo_count: item.virtual_utxo_count, + pruning_point_utxo_count: item.pruning_point_utxo_count, error: None, } }); @@ -792,6 +794,8 @@ try_from!(item: &protowire::GetBlockDagInfoResponseMessage, RpcResult RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); - let (consensus_stats, tips, pruning_point, sink) = - join!(session.async_get_stats(), session.async_get_tips(), session.async_pruning_point(), session.async_get_sink()); + let (consensus_stats, tips, pruning_point, sink, virtual_utxo_count, pruning_point_utxo_count) = join!( + session.async_get_stats(), + session.async_get_tips(), + session.async_pruning_point(), + session.async_get_sink(), + session.async_get_virtual_utxoset_count(), + session.async_get_pruning_point_utxoset_count() + ); Ok(GetBlockDagInfoResponse::new( self.config.net, consensus_stats.block_counts.block_count, @@ -825,6 +831,8 @@ NOTE: This error usually indicates an RPC conversion error between the node and pruning_point, consensus_stats.virtual_stats.daa_score, sink, + virtual_utxo_count, + pruning_point_utxo_count, )) } diff --git a/testing/integration/src/consensus_integration_tests.rs b/testing/integration/src/consensus_integration_tests.rs index e66baaf69..123d18080 100644 --- a/testing/integration/src/consensus_integration_tests.rs +++ b/testing/integration/src/consensus_integration_tests.rs @@ -1080,6 +1080,11 @@ async fn json_test(file_path: &str, concurrency: bool) { assert_eq!(virtual_utxos.len(), utxoindex_utxos.len()); assert!(virtual_utxos.is_subset(&utxoindex_utxos)); assert!(utxoindex_utxos.is_subset(&virtual_utxos)); + + // check that [`DbUtxoSet::number_of_entries`] cache is correct: + assert_eq!(virtual_utxos.len() as u64, tc.get_virtual_utxoset_count()); + let pruning_point_utxoset_counted_size = tc.pruning_utxoset_stores.write().utxo_set.iterator().count() as u64; + assert_eq!(pruning_point_utxoset_counted_size, tc.get_pruning_point_utxoset_count()); } fn submit_header_chunk( diff --git a/wallet/core/src/events.rs b/wallet/core/src/events.rs index 63d7d5bca..1745d9e50 100644 --- a/wallet/core/src/events.rs +++ b/wallet/core/src/events.rs @@ -25,15 +25,25 @@ pub enum SyncState { blocks: u64, progress: u64, }, - UtxoSync { - chunks: u64, + PruningPointUTXOs { + // This refers to the initial download of the PruningPoint UTXO set. + processed: u64, total: u64, }, TrustSync { processed: u64, total: u64, }, - UtxoResync, + UtxoIndexUTXOs { + // This refers to the UTXO index sync with the virtual UTXO set. + processed: u64, + total: u64, + }, + VirtualUTXOs { + // This refers to the virtual UTXO store sync with the Initial PruningPoint UTXO set Download. + processed: u64, + total: u64, + }, /// General cases when the node is waiting /// for information from peers or waiting to /// connect to peers. diff --git a/wallet/core/src/utxo/sync.rs b/wallet/core/src/utxo/sync.rs index e9eece4f0..515ffd7b7 100644 --- a/wallet/core/src/utxo/sync.rs +++ b/wallet/core/src/utxo/sync.rs @@ -199,8 +199,9 @@ pub struct StateObserver { proof: Regex, ibd_headers: Regex, ibd_blocks: Regex, - utxo_resync: Regex, - utxo_sync: Regex, + utxoindex_utxos: Regex, // this refers to the UTXO index sync with the virtual UTXO set. + pruning_point_utxos: Regex, // this refers to the initial download of the PruningPoint UTXO set. + virtual_utxos: Regex, // this refers to the virtual UTXO store sync with the Initial PruningPoint UTXO set Download. trust_blocks: Regex, // accepted_block: Regex, } @@ -211,8 +212,13 @@ impl Default for StateObserver { proof: Regex::new(r"Validating level (\d+) from the pruning point proof").unwrap(), ibd_headers: Regex::new(r"IBD: Processed (\d+) block headers \((\d+)%\)").unwrap(), ibd_blocks: Regex::new(r"IBD: Processed (\d+) blocks \((\d+)%\)").unwrap(), - utxo_resync: Regex::new(r"Resyncing the utxoindex...").unwrap(), - utxo_sync: Regex::new(r"Received (\d+) UTXO set chunks so far, totaling in (\d+) UTXOs").unwrap(), + utxoindex_utxos: Regex::new( + r"[UtxoIndex] Resyncing - Utxos: (\d+) + (\d+) / (\d+) \((\d+)/s\); Circulating Sompi Supply: (\d+); (\d+)%", + ) + .unwrap(), + // For Review - last two digits may be displayed as `NaN`, might this need an edit? + pruning_point_utxos: Regex::new(r"Received (\d) + (\d) / (\d) signaled UTXOs \((\d)%\)").unwrap(), + virtual_utxos: Regex::new(r"Transfering from pruning to virtual store (\d) + (\d) / (\d) UTXOs \((\d)%\)").unwrap(), trust_blocks: Regex::new(r"Processed (\d) trusted blocks in the last .* (total (\d))").unwrap(), // accepted_block: Regex::new(r"Accepted block .* via").unwrap(), } @@ -235,10 +241,10 @@ impl StateObserver { state = Some(SyncState::Blocks { blocks, progress }); } } - } else if let Some(captures) = self.utxo_sync.captures(line) { - if let (Some(chunks), Some(total)) = (captures.get(1), captures.get(2)) { - if let (Ok(chunks), Ok(total)) = (chunks.as_str().parse::(), total.as_str().parse::()) { - state = Some(SyncState::UtxoSync { chunks, total }); + } else if let Some(captures) = self.pruning_point_utxos.captures(line) { + if let (Some(processed), Some(total)) = (captures.get(1), captures.get(3)) { + if let (Ok(processed), Ok(total)) = (processed.as_str().parse::(), total.as_str().parse::()) { + state = Some(SyncState::PruningPointUTXOs { processed, total }); } } } else if let Some(captures) = self.trust_blocks.captures(line) { @@ -253,8 +259,18 @@ impl StateObserver { state = Some(SyncState::Proof { level }); } } - } else if self.utxo_resync.is_match(line) { - state = Some(SyncState::UtxoResync); + } else if let Some(captures) = self.utxoindex_utxos.captures(line) { + if let (Some(processed), Some(total)) = (captures.get(1), captures.get(3)) { + if let (Ok(processed), Ok(total)) = (processed.as_str().parse::(), total.as_str().parse::()) { + state = Some(SyncState::UtxoIndexUTXOs { processed, total }) + } + } + } else if let Some(captues) = self.virtual_utxos.captures(line) { + if let (Some(processed), Some(total)) = (captues.get(1), captues.get(3)) { + if let (Ok(processed), Ok(total)) = (processed.as_str().parse::(), total.as_str().parse::()) { + state = Some(SyncState::VirtualUTXOs { processed, total }) + } + } } state