diff --git a/.config/dictionaries/project.dic b/.config/dictionaries/project.dic index 4e80f2a81b..bbabf092e0 100644 --- a/.config/dictionaries/project.dic +++ b/.config/dictionaries/project.dic @@ -143,6 +143,7 @@ maindbname mapref mdlint mdns +MEMMAP memx Metadatum mgrybyk diff --git a/rust/cardano-chain-follower/Cargo.toml b/rust/cardano-chain-follower/Cargo.toml index 3461387741..753e24f349 100644 --- a/rust/cardano-chain-follower/Cargo.toml +++ b/rust/cardano-chain-follower/Cargo.toml @@ -20,7 +20,7 @@ mithril-client = { version = "0.10.4", default-features = false, features = [ "num-integer-backend", ] } cardano-blockchain-types = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250114-00" } -catalyst-types = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250108-00" } +catalyst-types = { version = "0.0.1", path = "../catalyst-types" } thiserror = "1.0.69" tokio = { version = "1.42.0", features = [ @@ -31,7 +31,7 @@ tokio = { version = "1.42.0", features = [ ] } tracing = "0.1.41" tracing-log = "0.2.0" -dashmap = "6.1.0" +dashmap = { version = "6.1.0", features = ["serde"] } url = "2.5.4" anyhow = "1.0.95" chrono = "0.4.39" @@ -48,7 +48,6 @@ serde = "1.0.217" serde_json = "1.0.134" mimalloc = { version = "0.1.43", optional = true } memx = "0.1.32" -fmmap = { version = "0.3.3", features = ["sync", "tokio-async"] } zstd = "0.13.2" logcall = "0.1.11" tar = "0.4.43" @@ -56,6 +55,7 @@ ureq = { version = "2.12.1", features = ["native-certs"] } http = "1.2.0" hickory-resolver = { version = "0.24.2", features = ["dns-over-rustls"] } moka = { version = "0.12.9", features = ["sync"] } +cpu-time = "1.0.0" [dev-dependencies] tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } diff --git a/rust/cardano-chain-follower/src/chain_sync.rs b/rust/cardano-chain-follower/src/chain_sync.rs index 4bc624c2f4..ec5bb1e558 100644 --- a/rust/cardano-chain-follower/src/chain_sync.rs +++ b/rust/cardano-chain-follower/src/chain_sync.rs @@ -528,7 +528,13 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver debug!("Tip Resynchronized to {tip}"), Err(error) => { diff --git a/rust/cardano-chain-follower/src/chain_sync_config.rs b/rust/cardano-chain-follower/src/chain_sync_config.rs index db6699979f..f38587720e 100644 --- a/rust/cardano-chain-follower/src/chain_sync_config.rs +++ b/rust/cardano-chain-follower/src/chain_sync_config.rs @@ -150,8 +150,13 @@ impl ChainSyncConfig { // Start the Mithril Snapshot Follower let rx = self.mithril_cfg.run().await?; + let config = self.clone(); // Start Chain Sync - *locked_handle = Some(tokio::spawn(chain_sync(self.clone(), rx))); + *locked_handle = Some(tokio::spawn(async move { + stats::start_thread(config.chain, stats::thread::name::CHAIN_SYNC, true); + chain_sync(config.clone(), rx).await; + stats::stop_thread(config.chain, stats::thread::name::CHAIN_SYNC); + })); // sync_map.insert(chain, handle); debug!("Chain Sync for {} : Started", self.chain); diff --git a/rust/cardano-chain-follower/src/chain_sync_ready.rs b/rust/cardano-chain-follower/src/chain_sync_ready.rs index 05dc61e8c3..51c33f0e51 100644 --- a/rust/cardano-chain-follower/src/chain_sync_ready.rs +++ b/rust/cardano-chain-follower/src/chain_sync_ready.rs @@ -12,7 +12,7 @@ use tokio::{ }; use tracing::error; -use crate::chain_update; +use crate::{chain_update, stats}; /// Data we hold related to sync being ready or not. struct SyncReady { @@ -88,6 +88,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter { let (tx, rx) = oneshot::channel::<()>(); tokio::spawn(async move { + stats::start_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY, true); // We are safe to use `expect` here because the SYNC_READY list is exhaustively // initialized. Its a Serious BUG if that not True, so panic is OK. #[allow(clippy::expect_used)] @@ -101,7 +102,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter { if let Ok(()) = rx.await { status.ready = true; } - + stats::stop_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY); // If the channel closes early, we can NEVER use the Blockchain data. }); diff --git a/rust/cardano-chain-follower/src/mithril_query.rs b/rust/cardano-chain-follower/src/mithril_query.rs index b67cf380b2..20093924da 100644 --- a/rust/cardano-chain-follower/src/mithril_query.rs +++ b/rust/cardano-chain-follower/src/mithril_query.rs @@ -2,25 +2,32 @@ use std::path::Path; -use cardano_blockchain_types::Point; +use cardano_blockchain_types::{Network, Point}; use pallas_hardano::storage::immutable::FallibleBlock; use tokio::task; -use crate::error::{Error, Result}; +use crate::{ + error::{Error, Result}, + stats, +}; /// Synchronous Immutable block iterator. pub(crate) type ImmutableBlockIterator = Box + Send + Sync>; /// Get a mithril snapshot iterator. pub(crate) async fn make_mithril_iterator( - path: &Path, start: &Point, + path: &Path, start: &Point, chain: Network, ) -> Result { let path = path.to_path_buf(); let start = start.clone(); // Initial input let res = task::spawn_blocking(move || { - pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into()) - .map_err(|error| Error::MithrilSnapshot(Some(error))) + stats::start_thread(chain, stats::thread::name::MITHRIL_ITERATOR, false); + let result = + pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into()) + .map_err(|error| Error::MithrilSnapshot(Some(error))); + stats::stop_thread(chain, stats::thread::name::MITHRIL_ITERATOR); + result }) .await; diff --git a/rust/cardano-chain-follower/src/mithril_snapshot_config.rs b/rust/cardano-chain-follower/src/mithril_snapshot_config.rs index e4c95a0911..5de240287f 100644 --- a/rust/cardano-chain-follower/src/mithril_snapshot_config.rs +++ b/rust/cardano-chain-follower/src/mithril_snapshot_config.rs @@ -24,6 +24,7 @@ use crate::{ mithril_snapshot_data::{latest_mithril_snapshot_id, SnapshotData}, mithril_snapshot_sync::background_mithril_update, snapshot_id::SnapshotId, + stats, turbo_downloader::DlConfig, }; @@ -413,7 +414,16 @@ impl MithrilSnapshotConfig { let (tx, rx) = mpsc::channel::(2); // let handle = tokio::spawn(background_mithril_update(chain, self.clone(), tx)); - *locked_handle = Some(tokio::spawn(background_mithril_update(self.clone(), tx))); + let config = self.clone(); + *locked_handle = Some(tokio::spawn(async move { + stats::start_thread( + config.chain, + stats::thread::name::MITHRIL_SNAPSHOT_UPDATER, + true, + ); + background_mithril_update(config.clone(), tx).await; + stats::stop_thread(config.chain, stats::thread::name::MITHRIL_SNAPSHOT_UPDATER); + })); // sync_map.insert(chain, handle); debug!( diff --git a/rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs b/rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs index 6e62507d0e..9877a34dab 100644 --- a/rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs +++ b/rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs @@ -73,7 +73,7 @@ impl MithrilSnapshotIterator { chain: Network, path: &Path, from: &Point, search_interval: u64, ) -> Option { let point = probe_point(from, search_interval); - let Ok(mut iterator) = make_mithril_iterator(path, &point).await else { + let Ok(mut iterator) = make_mithril_iterator(path, &point, chain).await else { return None; }; @@ -116,7 +116,7 @@ impl MithrilSnapshotIterator { let this = this?; // Remake the iterator, based on the new known point. - let Ok(iterator) = make_mithril_iterator(path, &this).await else { + let Ok(iterator) = make_mithril_iterator(path, &this, chain).await else { return None; }; @@ -176,7 +176,7 @@ impl MithrilSnapshotIterator { debug!("Actual Mithril Iterator Start: {}", from); - let iterator = make_mithril_iterator(path, from).await?; + let iterator = make_mithril_iterator(path, from, chain).await?; Ok(MithrilSnapshotIterator { inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner { diff --git a/rust/cardano-chain-follower/src/mithril_snapshot_sync.rs b/rust/cardano-chain-follower/src/mithril_snapshot_sync.rs index 624d1ce744..e77d88b5d7 100644 --- a/rust/cardano-chain-follower/src/mithril_snapshot_sync.rs +++ b/rust/cardano-chain-follower/src/mithril_snapshot_sync.rs @@ -300,9 +300,12 @@ async fn validate_mithril_snapshot( match tokio::spawn(async move { // This can be long running and CPU Intensive. // So we spawn it off to a background task. - MessageBuilder::new() + stats::start_thread(chain, stats::thread::name::COMPUTE_SNAPSHOT_MSG, true); + let result = MessageBuilder::new() .compute_snapshot_message(&cert, &mithril_path) - .await + .await; + stats::stop_thread(chain, stats::thread::name::COMPUTE_SNAPSHOT_MSG); + result }) .await { @@ -517,6 +520,7 @@ fn background_validate_mithril_snapshot( chain: Network, certificate: MithrilCertificate, tmp_path: PathBuf, ) -> tokio::task::JoinHandle { tokio::spawn(async move { + stats::start_thread(chain, stats::thread::name::VALIDATE_MITHRIL_SNAPSHOT, true); debug!( "Mithril Snapshot background updater for: {} : Check Certificate.", chain @@ -540,6 +544,7 @@ fn background_validate_mithril_snapshot( chain ); + stats::stop_thread(chain, stats::thread::name::VALIDATE_MITHRIL_SNAPSHOT); true }) } diff --git a/rust/cardano-chain-follower/src/mithril_turbo_downloader.rs b/rust/cardano-chain-follower/src/mithril_turbo_downloader.rs index f9b380b986..0b1671c386 100644 --- a/rust/cardano-chain-follower/src/mithril_turbo_downloader.rs +++ b/rust/cardano-chain-follower/src/mithril_turbo_downloader.rs @@ -14,9 +14,8 @@ use std::{ use anyhow::{anyhow, bail}; use async_trait::async_trait; -use catalyst_types::conversion::from_saturating; +use catalyst_types::{conversion::from_saturating, mmap_file::MemoryMapFile}; use dashmap::DashSet; -use fmmap::MmapFileExt; use memx::memcmp; use mithril_client::{ common::CompressionAlgorithm, snapshot_downloader::SnapshotDownloader, MithrilResult, @@ -134,8 +133,7 @@ impl Inner { self.ext_size.fetch_add(entry_size, Ordering::SeqCst); // Try and deduplicate the file if we can, otherwise just extract it. - if let Ok((prev_mmap, _)) = - Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref()) + if let Ok(prev_mmap) = Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref()) { let expected_file_size = from_saturating(entry_size); let mut buf: Vec = Vec::with_capacity(expected_file_size); @@ -225,7 +223,7 @@ impl Inner { /// Check if a given path from the archive is able to be deduplicated. fn can_deduplicate( rel_file: &Path, file_size: u64, prev_file: Option<&PathBuf>, - ) -> MithrilResult<(fmmap::MmapFile, u64)> { + ) -> MithrilResult { // Can't dedup if the current file is not de-dupable (must be immutable) if rel_file.starts_with("immutable") { // Can't dedup if we don't have a previous file to dedup against. @@ -234,8 +232,8 @@ impl Inner { // If the current file is not exactly the same as the previous file size, we // can't dedup. if file_size == current_size { - if let Ok(pref_file_loaded) = mmap_open_sync(prev_file) { - if pref_file_loaded.1 == file_size { + if let Ok(pref_file_loaded) = Self::mmap_open_sync(prev_file) { + if pref_file_loaded.size() == file_size { return Ok(pref_file_loaded); } } @@ -245,6 +243,17 @@ impl Inner { } bail!("Can not deduplicate."); } + + /// Open a file using mmap for performance. + fn mmap_open_sync(path: &Path) -> MithrilResult { + match MemoryMapFile::try_from(path) { + Ok(mmap_file) => Ok(mmap_file), + Err(error) => { + error!(error=%error, file=%path.to_string_lossy(), "Failed to open file"); + Err(error.into()) + }, + } + } } /// A snapshot downloader that accelerates Download using `aria2`. @@ -302,7 +311,17 @@ impl MithrilTurboDownloader { let target_dir = target_dir.to_owned(); // This is fully synchronous IO, so do it on a sync thread. - let result = spawn_blocking(move || inner.dl_and_dedup(&location, &target_dir)).await; + let result = spawn_blocking(move || { + stats::start_thread( + inner.cfg.chain, + stats::thread::name::MITHRIL_DL_DEDUP, + false, + ); + let result = inner.dl_and_dedup(&location, &target_dir); + stats::stop_thread(inner.cfg.chain, stats::thread::name::MITHRIL_DL_DEDUP); + result + }) + .await; if let Ok(result) = result { return result; @@ -321,20 +340,6 @@ fn get_file_size_sync(file: &Path) -> Option { Some(metadata.len()) } -/// Open a file using mmap for performance. -fn mmap_open_sync(path: &Path) -> MithrilResult<(fmmap::MmapFile, u64)> { - match fmmap::MmapFile::open_with_options(path, fmmap::Options::new().read(true).populate()) { - Ok(file) => { - let len = file.len() as u64; - Ok((file, len)) - }, - Err(error) => { - error!(error=%error, file=%path.to_string_lossy(), "Failed to open file"); - Err(error.into()) - }, - } -} - #[async_trait] impl SnapshotDownloader for MithrilTurboDownloader { async fn download_unpack( @@ -366,9 +371,9 @@ impl SnapshotDownloader for MithrilTurboDownloader { async fn probe(&self, location: &str) -> MithrilResult<()> { debug!("Probe Snapshot location='{location}'."); - let dl_config = self.inner.cfg.dl_config.clone().unwrap_or_default(); - let dl_processor = ParallelDownloadProcessor::new(location, dl_config).await?; + let dl_processor = + ParallelDownloadProcessor::new(location, dl_config, self.inner.cfg.chain).await?; // Decompress and extract and de-dupe each file in the archive. stats::mithril_extract_started(self.inner.cfg.chain); diff --git a/rust/cardano-chain-follower/src/stats/mod.rs b/rust/cardano-chain-follower/src/stats/mod.rs index fa21e6381e..75d9d08457 100644 --- a/rust/cardano-chain-follower/src/stats/mod.rs +++ b/rust/cardano-chain-follower/src/stats/mod.rs @@ -1,9 +1,10 @@ //! Cardano Chain Follower Statistics -pub(crate) mod follower; -pub(crate) mod live_chain; -pub(crate) mod mithril; -pub(crate) mod rollback; +pub mod follower; +pub mod live_chain; +pub mod mithril; +pub mod rollback; +pub mod thread; use std::sync::{Arc, LazyLock, RwLock}; @@ -13,6 +14,7 @@ use dashmap::DashMap; use rollback::{rollbacks, rollbacks_reset, RollbackType}; use serde::Serialize; use strum::IntoEnumIterator; +use thread::ThreadStat; use tracing::error; use crate::stats::{live_chain::Live, mithril::Mithril}; @@ -26,6 +28,8 @@ pub struct Statistics { pub live: Live, /// Statistics related to the mithril certified blockchain archive. pub mithril: Mithril, + /// Statistics related to the threads. + pub thread_stats: DashMap, } /// Type we use to manage the Sync Task handle map. @@ -458,6 +462,115 @@ pub(crate) fn mithril_sync_failure(chain: Network, failure: MithrilSyncFailures) } } +// ----------------- THREAD STATISTICS------------------- + +/// Initialize a thread statistic with the given name. +/// If it is service thread, mark it as such. +pub(crate) fn start_thread(chain: Network, name: &str, is_service: bool) { + // This will actually always succeed. + let Some(stats) = lookup_stats(chain) else { + return; + }; + + let Ok(chain_stats) = stats.write() else { + // Worst case if this fails (it never should) is we stop updating stats. + error!("Stats RwLock should never be able to error."); + return; + }; + + chain_stats + .thread_stats + .insert(name.to_string(), ThreadStat::start_thread(is_service)); +} + +/// Stop the thread with the given name. +pub(crate) fn stop_thread(chain: Network, name: &str) { + // This will actually always succeed. + let Some(stats) = lookup_stats(chain) else { + return; + }; + + let Ok(chain_stats) = stats.write() else { + // Worst case if this fails (it never should) is we stop updating stats. + error!("Stats RwLock should never be able to error."); + return; + }; + + if let Some(thread_stat) = chain_stats.thread_stats.get(name) { + thread_stat.stop_thread(); + }; +} + +/// Resume the thread with the given name. +pub(crate) fn resume_thread(chain: Network, name: &str) { + // This will actually always succeed. + let Some(stats) = lookup_stats(chain) else { + return; + }; + + let Ok(chain_stats) = stats.write() else { + // Worst case if this fails (it never should) is we stop updating stats. + error!("Stats RwLock should never be able to error."); + return; + }; + + if let Some(thread_stat) = chain_stats.thread_stats.get(name) { + thread_stat.resume_thread(); + }; +} + +/// Pause the thread with the given name. +pub(crate) fn pause_thread(chain: Network, name: &str) { + // This will actually always succeed. + let Some(stats) = lookup_stats(chain) else { + return; + }; + + let Ok(chain_stats) = stats.write() else { + // Worst case if this fails (it never should) is we stop updating stats. + error!("Stats RwLock should never be able to error."); + return; + }; + + if let Some(thread_stat) = chain_stats.thread_stats.get(name) { + thread_stat.pause_thread(); + }; +} + +/// Get the thread statistic with the given name. +#[allow(dead_code)] +pub fn thread_stat(chain: Network, name: &str) -> Option { + // This will actually always succeed. + let stats = lookup_stats(chain)?; + + let Ok(chain_stats) = stats.write() else { + // Worst case if this fails (it never should) is we stop updating stats. + error!("Stats RwLock should never be able to error."); + return None; + }; + + chain_stats.thread_stats.get(name).map(|stat| stat.clone()) +} + +/// Get the names of all the thread statistics. +#[allow(dead_code)] +pub fn thread_stat_names(chain: Network) -> Vec { + let Some(stats) = lookup_stats(chain) else { + return Vec::new(); + }; + + let Ok(chain_stats) = stats.write() else { + error!("Stats RwLock should never be able to error."); + return Vec::new(); + }; + + chain_stats + .thread_stats + .iter() + .map(|entry| entry.key().clone()) + .collect() +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { diff --git a/rust/cardano-chain-follower/src/stats/thread/mod.rs b/rust/cardano-chain-follower/src/stats/thread/mod.rs new file mode 100644 index 0000000000..5892e47abb --- /dev/null +++ b/rust/cardano-chain-follower/src/stats/thread/mod.rs @@ -0,0 +1,234 @@ +//! Thread statistics. + +pub(crate) mod name; + +use std::{ + fmt, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, Mutex, + }, + time::Duration, +}; + +use cpu_time::ThreadTime; +use serde::{ + ser::{SerializeStruct, Serializer}, + Serialize, +}; + +/// Thread statistics. +#[derive(Debug, Default, Clone, Serialize)] +#[allow(clippy::module_name_repetitions)] +pub struct ThreadStat(Arc); + +/// Inner thread statistics. +struct InnerThreadStat { + /// A counter for the number of times the thread has been resumed. + counter: AtomicU64, + /// A boolean value indicating whether the thread is running. + is_running: AtomicBool, + /// A boolean value indicating whether the thread is a service. + is_service: AtomicBool, + /// The latest CPU time. + latest_cpu_time: Mutex, + /// The total CPU time. + total_cpu_time: Mutex, +} + +impl Serialize for InnerThreadStat { + fn serialize(&self, serializer: S) -> Result + where S: Serializer { + let mut state = serializer.serialize_struct("ThreadStat", 6)?; + + state.serialize_field("counter", &self.counter.load(Ordering::SeqCst))?; + state.serialize_field("is_running", &self.is_running.load(Ordering::SeqCst))?; + state.serialize_field("is_service", &self.is_service.load(Ordering::SeqCst))?; + if let Ok(total_cpu_time) = self.total_cpu_time.lock() { + state.serialize_field("total_cpu_time", &*total_cpu_time)?; + } + if let Ok(latest_cpu_time) = self.latest_cpu_time.lock() { + state.serialize_field("latest_cpu_time", &*latest_cpu_time)?; + } + state.end() + } +} + +impl fmt::Debug for InnerThreadStat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let latest_cpu_time = self.latest_cpu_time.lock().map_err(|_| fmt::Error)?; + let total_cpu_time = self.total_cpu_time.lock().map_err(|_| fmt::Error)?; + + f.debug_struct("InnerThreadStat") + .field("counter", &self.counter.load(Ordering::SeqCst)) + .field("is_running", &self.is_running.load(Ordering::SeqCst)) + .field("is_service", &self.is_service.load(Ordering::SeqCst)) + .field("latest_cpu_time", &*latest_cpu_time) + .field("total_cpu_time", &*total_cpu_time) + .finish() + } +} + +impl Default for InnerThreadStat { + fn default() -> Self { + InnerThreadStat { + counter: AtomicU64::new(0), + is_running: AtomicBool::new(false), + is_service: AtomicBool::new(false), + latest_cpu_time: Mutex::new(Duration::ZERO), + total_cpu_time: Mutex::new(Duration::ZERO), + } + } +} + +impl InnerThreadStat { + /// Update the total time of the CPU used. + fn update_total_time(&self) { + // Get the current CPU time as a Duration + let current_time = ThreadTime::now().as_duration(); + + if let Ok(latest_cpu_time) = self.latest_cpu_time.lock() { + // Calculate elapsed time (current - previous) + let elapsed = if current_time > *latest_cpu_time { + current_time - *latest_cpu_time + } else { + Duration::ZERO + }; + // If the elapsed time is non-negative, update total_cpu_time + if elapsed > Duration::ZERO { + if let Ok(mut total_cpu_time) = self.total_cpu_time.lock() { + *total_cpu_time += elapsed; + } + } + } + } + + /// Update the latest time of the CPU used. + fn update_latest_time(&self) { + if let Ok(mut latest_cpu_time) = self.latest_cpu_time.lock() { + *latest_cpu_time = ThreadTime::now().as_duration(); + } + } + + /// Increase the counter by 1. + fn increment_counter(&self) { + self.counter.fetch_add(1, Ordering::SeqCst); + } +} + +impl ThreadStat { + /// Initialize a thread statistic. + pub(crate) fn start_thread(is_service: bool) -> Self { + Self(Arc::new(InnerThreadStat { + counter: 0.into(), + is_running: true.into(), + is_service: is_service.into(), + total_cpu_time: Duration::ZERO.into(), + latest_cpu_time: ThreadTime::now().as_duration().into(), + })) + } + + /// Stop the thread. + pub(crate) fn stop_thread(&self) { + self.0.is_running.store(false, Ordering::SeqCst); + self.0.update_latest_time(); + self.0.update_total_time(); + } + + /// Resume the thread. + pub(crate) fn resume_thread(&self) { + self.0.increment_counter(); + self.0.update_latest_time(); + } + + /// Pause the thread. + pub(crate) fn pause_thread(&self) { + self.0.update_latest_time(); + self.0.update_total_time(); + } + + /// Is the thread running? + pub fn is_running(&self) -> bool { + self.0.is_running.load(Ordering::SeqCst) + } + + /// The number of times the thread has been resumed. + pub fn counter(&self) -> u64 { + self.0.counter.load(Ordering::SeqCst) + } + + /// Get the total CPU time for a thread. + pub fn total_cpu_time(&self) -> Option { + self.0 + .total_cpu_time + .lock() + .ok() + .map(|total_cpu_time| *total_cpu_time) + } + + /// Get the latest CPU time for a thread. + pub fn latest_cpu_time(&self) -> Option { + self.0 + .latest_cpu_time + .lock() + .ok() + .map(|latest_cpu_time| *latest_cpu_time) + } +} + +#[cfg(test)] +mod tests { + use std::{thread, time::Duration}; + + use super::*; + + #[test] + fn test_thread_stat_initialization() { + let stat = ThreadStat::start_thread(true); + assert!(stat.is_running()); + assert_eq!(stat.counter(), 0); + assert!(stat.total_cpu_time().is_some()); + assert!(stat.latest_cpu_time().is_some()); + } + + #[test] + fn test_thread_stat_stop() { + let stat = ThreadStat::start_thread(false); + stat.stop_thread(); + assert!(!stat.is_running()); + } + + #[test] + fn test_thread_stat_resume() { + let stat = ThreadStat::start_thread(false); + stat.resume_thread(); + assert!(stat.is_running()); + assert_eq!(stat.counter(), 1); + } + + #[test] + fn test_thread_stat_pause() { + let stat = ThreadStat::start_thread(false); + stat.pause_thread(); + assert!(stat.is_running()); + } + + #[test] + fn test_thread_stat_update_cpu_time() { + let stat = ThreadStat::start_thread(false); + stat.pause_thread(); + thread::sleep(Duration::from_millis(10)); + stat.resume_thread(); + let total_cpu_time = stat.total_cpu_time().unwrap(); + assert!(total_cpu_time > Duration::ZERO); + } + + #[test] + fn test_thread_stat_multiple_resumes() { + let stat = ThreadStat::start_thread(false); + stat.resume_thread(); + stat.resume_thread(); + stat.resume_thread(); + assert_eq!(stat.counter(), 3); + } +} diff --git a/rust/cardano-chain-follower/src/stats/thread/name.rs b/rust/cardano-chain-follower/src/stats/thread/name.rs new file mode 100644 index 0000000000..2db970ce69 --- /dev/null +++ b/rust/cardano-chain-follower/src/stats/thread/name.rs @@ -0,0 +1,20 @@ +//! Thread names + +/// Chain Sync. +pub(crate) const CHAIN_SYNC: &str = "Async:ChainSync"; +/// Wait for Sync Ready. +pub(crate) const WAIT_FOR_SYNC_READY: &str = "Async:WaitForSyncReady"; +/// Live Sync Backfill and Purge. +pub(crate) const LIVE_SYNC_BACKFILL_AND_PURGE: &str = "Async:LiveSyncBackfillAndPurge"; +/// Mithril Iterator. +pub(crate) const MITHRIL_ITERATOR: &str = "MithrilIterator"; +/// Background Mithril Snapshot Updater. +pub(crate) const MITHRIL_SNAPSHOT_UPDATER: &str = "Async:MithrilSnapshotUpdater"; +/// Mithril compute snapshot. +pub(crate) const COMPUTE_SNAPSHOT_MSG: &str = "Async:ComputeSnapshotMsg"; +/// Background Mithril Snapshot Validator. +pub(crate) const VALIDATE_MITHRIL_SNAPSHOT: &str = "Async:ValidateMithrilSnapshot"; +/// Mithril Downloader, Dl and Dedup. +pub(crate) const MITHRIL_DL_DEDUP: &str = "MithrilDlDedup"; +/// Parallel Download Processor Worker. +pub(crate) const PARALLEL_DL_WORKER: &str = "ParallelDlWorker"; diff --git a/rust/cardano-chain-follower/src/turbo_downloader/mod.rs b/rust/cardano-chain-follower/src/turbo_downloader/mod.rs index 2424159bf9..4adab16d90 100644 --- a/rust/cardano-chain-follower/src/turbo_downloader/mod.rs +++ b/rust/cardano-chain-follower/src/turbo_downloader/mod.rs @@ -19,7 +19,9 @@ use std::{ }; use anyhow::{bail, Context, Result}; +use cardano_blockchain_types::Network; use catalyst_types::conversion::from_saturating; +use crossbeam_channel::{Receiver, RecvError}; use dashmap::DashMap; use http::{ header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE}, @@ -27,6 +29,8 @@ use http::{ }; use tracing::{debug, error}; +use crate::stats; + /// A Simple DNS Balancing Resolver struct BalancingResolver { /// The actual resolver @@ -364,7 +368,7 @@ impl ParallelDownloadProcessor { /// /// Can Fail IF there is no HTTP client provided or the URL does not support getting /// the content length. - pub(crate) async fn new(url: &str, mut cfg: DlConfig) -> anyhow::Result { + pub(crate) async fn new(url: &str, mut cfg: DlConfig, chain: Network) -> anyhow::Result { if cfg.chunk_size < MIN_CHUNK_SIZE { bail!( "Download chunk size must be at least {} bytes", @@ -402,14 +406,14 @@ impl ParallelDownloadProcessor { next_requested_chunk: AtomicUsize::new(0), })); - processor.start_workers()?; + processor.start_workers(chain)?; Ok(processor) } /// Starts the worker tasks, they will not start doing any work until `download` is /// called, which happens immediately after they are started. - fn start_workers(&self) -> anyhow::Result<()> { + fn start_workers(&self, chain: Network) -> anyhow::Result<()> { for worker in 0..self.0.cfg.workers { // The channel is unbounded, because work distribution is controlled to be at most // `work_queue` deep per worker. And we don't want anything unexpected to @@ -417,7 +421,11 @@ impl ParallelDownloadProcessor { let (work_queue_tx, work_queue_rx) = crossbeam_channel::unbounded::(); let params = self.0.clone(); thread::spawn(move || { - Self::worker(¶ms, worker, &work_queue_rx); + let worker_name = &format!("{}::{worker}", stats::thread::name::PARALLEL_DL_WORKER); + + stats::start_thread(chain, worker_name, false); + Self::worker(¶ms, worker, worker_name, &work_queue_rx, chain); + stats::stop_thread(chain, worker_name); }); let _unused = self.0.work_queue.insert(worker, work_queue_tx); @@ -426,11 +434,22 @@ impl ParallelDownloadProcessor { self.download() } + /// Call the work queue receiver. + /// This is a helper function to pause and resume the stats thread. + fn call_work_queue_receiver( + chain: Network, worker_name: &str, work_queue: &Receiver, + ) -> Result { + stats::pause_thread(chain, worker_name); + let recv = work_queue.recv(); + stats::resume_thread(chain, worker_name); + recv + } + /// The worker task - It is running in parallel and downloads chunks of the file as /// requested. fn worker( - params: &Arc, worker_id: usize, - work_queue: &crossbeam_channel::Receiver, + params: &Arc, worker_id: usize, worker_name: &str, + work_queue: &crossbeam_channel::Receiver, chain: Network, ) { debug!("Worker {worker_id} started"); @@ -444,7 +463,7 @@ impl ParallelDownloadProcessor { } let http_agent = params.cfg.make_http_agent(worker_id); - while let Ok(next_chunk) = work_queue.recv() { + while let Ok(next_chunk) = Self::call_work_queue_receiver(chain, worker_name, work_queue) { // Add a small delay to the first chunks for each worker. // So that the leading chunks are more likely to finish downloading first. if next_chunk > 0 && next_chunk < params.cfg.workers { diff --git a/rust/catalyst-types/Cargo.toml b/rust/catalyst-types/Cargo.toml index 52e3045125..1367bc1252 100644 --- a/rust/catalyst-types/Cargo.toml +++ b/rust/catalyst-types/Cargo.toml @@ -29,6 +29,9 @@ serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.9" base64-url = "3.0.0" uuid = { version = "1.11.0", features = ["v4", "v7", "serde"] } +fmmap = { version = "0.3.3", features = ["sync", "tokio-async"] } +once_cell = "1.20.2" +tracing = "0.1.41" [dev-dependencies] ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } diff --git a/rust/catalyst-types/src/lib.rs b/rust/catalyst-types/src/lib.rs index 57cdd356a1..324257bd5d 100644 --- a/rust/catalyst-types/src/lib.rs +++ b/rust/catalyst-types/src/lib.rs @@ -3,5 +3,6 @@ pub mod conversion; pub mod hashes; pub mod kid_uri; +pub mod mmap_file; pub mod problem_report; pub mod uuid; diff --git a/rust/catalyst-types/src/mmap_file.rs b/rust/catalyst-types/src/mmap_file.rs new file mode 100644 index 0000000000..b36a10bfd0 --- /dev/null +++ b/rust/catalyst-types/src/mmap_file.rs @@ -0,0 +1,152 @@ +//! Memory-mapped file. + +use std::{ + path::Path, + sync::{Arc, RwLock}, +}; + +use fmmap::{MmapFile, MmapFileExt}; +use once_cell::sync::Lazy; +use serde::Serialize; +use tracing::error; + +/// Memory-mapped file. +pub struct MemoryMapFile { + /// The memory-mapped file. + file: MmapFile, + /// The size of the memory-mapped file. + size: u64, +} + +/// Global statistic for memory-mapped files. +static MEMMAP_FILE_STATS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(MemMapFileStat::default()))); + +/// Memory-mapped file statistic. +#[derive(Debug, Default, Clone, Serialize)] +pub struct MemMapFileStat { + /// A counter for the number of memory-mapped files. + file_count: u64, + /// The total size of memory-mapped files. + total_size: u64, + /// The amount of time that memory-mapped files have been dropped. + drop_count: u64, + /// The total size of memory-mapped files that have been dropped. + drop_size: u64, + /// A count of errors encountered. + error_count: u64, +} + +impl MemMapFileStat { + /// Get the global memory-mapped file statistics. + /// Return default statistics if the mutex is poisoned. + pub fn current() -> MemMapFileStat { + if let Ok(stat) = MEMMAP_FILE_STATS.read() { + stat.clone() + } else { + error!("RwLock read poisoned, failed to read memory-mapped file statistics."); + MemMapFileStat::default() + } + } + + /// Get the statistic file count. + #[must_use] + pub fn file_count(&self) -> u64 { + self.file_count + } + + /// Get the statistic total size. + #[must_use] + pub fn total_size(&self) -> u64 { + self.total_size + } + + /// Get the statistic drop count. + #[must_use] + pub fn drop_count(&self) -> u64 { + self.drop_count + } + + /// Get the statistic drop size. + #[must_use] + pub fn drop_size(&self) -> u64 { + self.drop_size + } + + /// Get the statistic error count. + #[must_use] + pub fn error_count(&self) -> u64 { + self.error_count + } + + /// Update the global stats when a file is created. + fn update_create_stat(size: u64) { + if let Ok(mut stat) = MEMMAP_FILE_STATS.write() { + stat.file_count += 1; + stat.total_size += size; + } else { + error!( + "RwLock write poisoned, failed to update created memory-mapped file statistics." + ); + } + } + + /// Update the global stats when a file is dropped. + fn update_drop_stat(size: u64) { + if let Ok(mut stat) = MEMMAP_FILE_STATS.write() { + stat.drop_count += 1; + stat.drop_size += size; + } else { + error!( + "RwLock write poisoned, failed to update dropped memory-mapped file statistics." + ); + } + } + + /// Update the global error count when an error occurs. + fn update_err_stat() { + if let Ok(mut stat) = MEMMAP_FILE_STATS.write() { + stat.error_count += 1; + } else { + error!("RwLock write poisoned, failed to update error memory-mapped file statistics."); + } + } +} + +impl MemoryMapFile { + /// Get the size of the memory-mapped file. + pub fn size(&self) -> u64 { + self.size + } + + /// Get the memory-mapped file as a slice. + pub fn as_slice(&self) -> &[u8] { + self.file.as_slice() + } +} + +impl Drop for MemoryMapFile { + fn drop(&mut self) { + MemMapFileStat::update_drop_stat(self.size); + } +} + +impl TryFrom<&Path> for MemoryMapFile { + type Error = fmmap::error::Error; + + fn try_from(path: &Path) -> Result { + // Attempt to open the file with memory mapping options + match MmapFile::open_with_options(path, fmmap::Options::new().read(true).populate()) { + Ok(file) => { + let len = file.len() as u64; + let memory_map_file = MemoryMapFile { file, size: len }; + MemMapFileStat::update_create_stat(len); + Ok(memory_map_file) + }, + Err(error) => { + MemMapFileStat::update_err_stat(); + Err(error) + }, + } + } +}