Skip to content

Commit

Permalink
feat(rust/cardano-chain-follower): add thread and mmap file stats (#150)
Browse files Browse the repository at this point in the history
* feat(cardano-chain-follower): add thread and mmap file stats

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): remove log

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): fix mmapfile name

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat call

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread logic

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat call

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): update forever task thread stat

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): linter and format

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat name constants to its own file

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat calling

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): move mmap file to cat-types

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-blockchain-types): format

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): remove unused dep

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat calling

Signed-off-by: bkioshn <[email protected]>

* fix: mmapfile

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-blockchain-types): use RwLock for mmap stat

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-blockchain-types): add size function

Signed-off-by: bkioshn <[email protected]>

* fix(catalyst-types): mmap file error and structure

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): stat visibility

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat naming

Signed-off-by: bkioshn <[email protected]>

* fix(cardano-chain-follower): thread stat worker

Signed-off-by: bkioshn <[email protected]>

---------

Signed-off-by: bkioshn <[email protected]>
Co-authored-by: Steven Johnson <[email protected]>
  • Loading branch information
bkioshn and stevenj authored Jan 21, 2025
1 parent 288a5b6 commit 74e0fb9
Show file tree
Hide file tree
Showing 17 changed files with 634 additions and 53 deletions.
1 change: 1 addition & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ maindbname
mapref
mdlint
mdns
MEMMAP
memx
Metadatum
mgrybyk
Expand Down
6 changes: 3 additions & 3 deletions rust/cardano-chain-follower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mithril-client = { version = "0.10.4", default-features = false, features = [
"num-integer-backend",
] }
cardano-blockchain-types = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250114-00" }
catalyst-types = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250108-00" }
catalyst-types = { version = "0.0.1", path = "../catalyst-types" }

thiserror = "1.0.69"
tokio = { version = "1.42.0", features = [
Expand All @@ -31,7 +31,7 @@ tokio = { version = "1.42.0", features = [
] }
tracing = "0.1.41"
tracing-log = "0.2.0"
dashmap = "6.1.0"
dashmap = { version = "6.1.0", features = ["serde"] }
url = "2.5.4"
anyhow = "1.0.95"
chrono = "0.4.39"
Expand All @@ -48,14 +48,14 @@ serde = "1.0.217"
serde_json = "1.0.134"
mimalloc = { version = "0.1.43", optional = true }
memx = "0.1.32"
fmmap = { version = "0.3.3", features = ["sync", "tokio-async"] }
zstd = "0.13.2"
logcall = "0.1.11"
tar = "0.4.43"
ureq = { version = "2.12.1", features = ["native-certs"] }
http = "1.2.0"
hickory-resolver = { version = "0.24.2", features = ["dns-over-rustls"] }
moka = { version = "0.12.9", features = ["sync"] }
cpu-time = "1.0.0"

[dev-dependencies]
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
Expand Down
7 changes: 6 additions & 1 deletion rust/cardano-chain-follower/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,13 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU

// Start the Live chain backfill task.
let _backfill_join_handle = spawn(async move {
stats::start_thread(
cfg.chain,
stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE,
true,
);
live_sync_backfill_and_purge(backfill_cfg.clone(), rx, sync_waiter).await;
stats::stop_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
});

// Live Fill data starts at fork 1.
Expand All @@ -539,7 +545,6 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
loop {
// We never have a connection if we end up around the loop, so make a new one.
let mut peer = persistent_reconnect(&cfg.relay_address, cfg.chain).await;

match resync_live_tip(&mut peer, cfg.chain).await {
Ok(tip) => debug!("Tip Resynchronized to {tip}"),
Err(error) => {
Expand Down
7 changes: 6 additions & 1 deletion rust/cardano-chain-follower/src/chain_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,13 @@ impl ChainSyncConfig {
// Start the Mithril Snapshot Follower
let rx = self.mithril_cfg.run().await?;

let config = self.clone();
// Start Chain Sync
*locked_handle = Some(tokio::spawn(chain_sync(self.clone(), rx)));
*locked_handle = Some(tokio::spawn(async move {
stats::start_thread(config.chain, stats::thread::name::CHAIN_SYNC, true);
chain_sync(config.clone(), rx).await;
stats::stop_thread(config.chain, stats::thread::name::CHAIN_SYNC);
}));

// sync_map.insert(chain, handle);
debug!("Chain Sync for {} : Started", self.chain);
Expand Down
5 changes: 3 additions & 2 deletions rust/cardano-chain-follower/src/chain_sync_ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
};
use tracing::error;

use crate::chain_update;
use crate::{chain_update, stats};

/// Data we hold related to sync being ready or not.
struct SyncReady {
Expand Down Expand Up @@ -88,6 +88,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
let (tx, rx) = oneshot::channel::<()>();

tokio::spawn(async move {
stats::start_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY, true);
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
Expand All @@ -101,7 +102,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
if let Ok(()) = rx.await {
status.ready = true;
}

stats::stop_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY);
// If the channel closes early, we can NEVER use the Blockchain data.
});

Expand Down
17 changes: 12 additions & 5 deletions rust/cardano-chain-follower/src/mithril_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,32 @@
use std::path::Path;

use cardano_blockchain_types::Point;
use cardano_blockchain_types::{Network, Point};
use pallas_hardano::storage::immutable::FallibleBlock;
use tokio::task;

use crate::error::{Error, Result};
use crate::{
error::{Error, Result},
stats,
};

/// Synchronous Immutable block iterator.
pub(crate) type ImmutableBlockIterator = Box<dyn Iterator<Item = FallibleBlock> + Send + Sync>;

/// Get a mithril snapshot iterator.
pub(crate) async fn make_mithril_iterator(
path: &Path, start: &Point,
path: &Path, start: &Point, chain: Network,
) -> Result<ImmutableBlockIterator> {
let path = path.to_path_buf();
let start = start.clone();
// Initial input
let res = task::spawn_blocking(move || {
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
.map_err(|error| Error::MithrilSnapshot(Some(error)))
stats::start_thread(chain, stats::thread::name::MITHRIL_ITERATOR, false);
let result =
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
.map_err(|error| Error::MithrilSnapshot(Some(error)));
stats::stop_thread(chain, stats::thread::name::MITHRIL_ITERATOR);
result
})
.await;

Expand Down
12 changes: 11 additions & 1 deletion rust/cardano-chain-follower/src/mithril_snapshot_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
mithril_snapshot_data::{latest_mithril_snapshot_id, SnapshotData},
mithril_snapshot_sync::background_mithril_update,
snapshot_id::SnapshotId,
stats,
turbo_downloader::DlConfig,
};

Expand Down Expand Up @@ -413,7 +414,16 @@ impl MithrilSnapshotConfig {
let (tx, rx) = mpsc::channel::<MithrilUpdateMessage>(2);

// let handle = tokio::spawn(background_mithril_update(chain, self.clone(), tx));
*locked_handle = Some(tokio::spawn(background_mithril_update(self.clone(), tx)));
let config = self.clone();
*locked_handle = Some(tokio::spawn(async move {
stats::start_thread(
config.chain,
stats::thread::name::MITHRIL_SNAPSHOT_UPDATER,
true,
);
background_mithril_update(config.clone(), tx).await;
stats::stop_thread(config.chain, stats::thread::name::MITHRIL_SNAPSHOT_UPDATER);
}));

// sync_map.insert(chain, handle);
debug!(
Expand Down
6 changes: 3 additions & 3 deletions rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl MithrilSnapshotIterator {
chain: Network, path: &Path, from: &Point, search_interval: u64,
) -> Option<MithrilSnapshotIterator> {
let point = probe_point(from, search_interval);
let Ok(mut iterator) = make_mithril_iterator(path, &point).await else {
let Ok(mut iterator) = make_mithril_iterator(path, &point, chain).await else {
return None;
};

Expand Down Expand Up @@ -116,7 +116,7 @@ impl MithrilSnapshotIterator {
let this = this?;

// Remake the iterator, based on the new known point.
let Ok(iterator) = make_mithril_iterator(path, &this).await else {
let Ok(iterator) = make_mithril_iterator(path, &this, chain).await else {
return None;
};

Expand Down Expand Up @@ -176,7 +176,7 @@ impl MithrilSnapshotIterator {

debug!("Actual Mithril Iterator Start: {}", from);

let iterator = make_mithril_iterator(path, from).await?;
let iterator = make_mithril_iterator(path, from, chain).await?;

Ok(MithrilSnapshotIterator {
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
Expand Down
9 changes: 7 additions & 2 deletions rust/cardano-chain-follower/src/mithril_snapshot_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,12 @@ async fn validate_mithril_snapshot(
match tokio::spawn(async move {
// This can be long running and CPU Intensive.
// So we spawn it off to a background task.
MessageBuilder::new()
stats::start_thread(chain, stats::thread::name::COMPUTE_SNAPSHOT_MSG, true);
let result = MessageBuilder::new()
.compute_snapshot_message(&cert, &mithril_path)
.await
.await;
stats::stop_thread(chain, stats::thread::name::COMPUTE_SNAPSHOT_MSG);
result
})
.await
{
Expand Down Expand Up @@ -517,6 +520,7 @@ fn background_validate_mithril_snapshot(
chain: Network, certificate: MithrilCertificate, tmp_path: PathBuf,
) -> tokio::task::JoinHandle<bool> {
tokio::spawn(async move {
stats::start_thread(chain, stats::thread::name::VALIDATE_MITHRIL_SNAPSHOT, true);
debug!(
"Mithril Snapshot background updater for: {} : Check Certificate.",
chain
Expand All @@ -540,6 +544,7 @@ fn background_validate_mithril_snapshot(
chain
);

stats::stop_thread(chain, stats::thread::name::VALIDATE_MITHRIL_SNAPSHOT);
true
})
}
Expand Down
53 changes: 29 additions & 24 deletions rust/cardano-chain-follower/src/mithril_turbo_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ use std::{

use anyhow::{anyhow, bail};
use async_trait::async_trait;
use catalyst_types::conversion::from_saturating;
use catalyst_types::{conversion::from_saturating, mmap_file::MemoryMapFile};
use dashmap::DashSet;
use fmmap::MmapFileExt;
use memx::memcmp;
use mithril_client::{
common::CompressionAlgorithm, snapshot_downloader::SnapshotDownloader, MithrilResult,
Expand Down Expand Up @@ -134,8 +133,7 @@ impl Inner {
self.ext_size.fetch_add(entry_size, Ordering::SeqCst);

// Try and deduplicate the file if we can, otherwise just extract it.
if let Ok((prev_mmap, _)) =
Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref())
if let Ok(prev_mmap) = Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref())
{
let expected_file_size = from_saturating(entry_size);
let mut buf: Vec<u8> = Vec::with_capacity(expected_file_size);
Expand Down Expand Up @@ -225,7 +223,7 @@ impl Inner {
/// Check if a given path from the archive is able to be deduplicated.
fn can_deduplicate(
rel_file: &Path, file_size: u64, prev_file: Option<&PathBuf>,
) -> MithrilResult<(fmmap::MmapFile, u64)> {
) -> MithrilResult<MemoryMapFile> {
// Can't dedup if the current file is not de-dupable (must be immutable)
if rel_file.starts_with("immutable") {
// Can't dedup if we don't have a previous file to dedup against.
Expand All @@ -234,8 +232,8 @@ impl Inner {
// If the current file is not exactly the same as the previous file size, we
// can't dedup.
if file_size == current_size {
if let Ok(pref_file_loaded) = mmap_open_sync(prev_file) {
if pref_file_loaded.1 == file_size {
if let Ok(pref_file_loaded) = Self::mmap_open_sync(prev_file) {
if pref_file_loaded.size() == file_size {
return Ok(pref_file_loaded);
}
}
Expand All @@ -245,6 +243,17 @@ impl Inner {
}
bail!("Can not deduplicate.");
}

/// Open a file using mmap for performance.
fn mmap_open_sync(path: &Path) -> MithrilResult<MemoryMapFile> {
match MemoryMapFile::try_from(path) {
Ok(mmap_file) => Ok(mmap_file),
Err(error) => {
error!(error=%error, file=%path.to_string_lossy(), "Failed to open file");
Err(error.into())
},
}
}
}

/// A snapshot downloader that accelerates Download using `aria2`.
Expand Down Expand Up @@ -302,7 +311,17 @@ impl MithrilTurboDownloader {
let target_dir = target_dir.to_owned();

// This is fully synchronous IO, so do it on a sync thread.
let result = spawn_blocking(move || inner.dl_and_dedup(&location, &target_dir)).await;
let result = spawn_blocking(move || {
stats::start_thread(
inner.cfg.chain,
stats::thread::name::MITHRIL_DL_DEDUP,
false,
);
let result = inner.dl_and_dedup(&location, &target_dir);
stats::stop_thread(inner.cfg.chain, stats::thread::name::MITHRIL_DL_DEDUP);
result
})
.await;

if let Ok(result) = result {
return result;
Expand All @@ -321,20 +340,6 @@ fn get_file_size_sync(file: &Path) -> Option<u64> {
Some(metadata.len())
}

/// Open a file using mmap for performance.
fn mmap_open_sync(path: &Path) -> MithrilResult<(fmmap::MmapFile, u64)> {
match fmmap::MmapFile::open_with_options(path, fmmap::Options::new().read(true).populate()) {
Ok(file) => {
let len = file.len() as u64;
Ok((file, len))
},
Err(error) => {
error!(error=%error, file=%path.to_string_lossy(), "Failed to open file");
Err(error.into())
},
}
}

#[async_trait]
impl SnapshotDownloader for MithrilTurboDownloader {
async fn download_unpack(
Expand Down Expand Up @@ -366,9 +371,9 @@ impl SnapshotDownloader for MithrilTurboDownloader {

async fn probe(&self, location: &str) -> MithrilResult<()> {
debug!("Probe Snapshot location='{location}'.");

let dl_config = self.inner.cfg.dl_config.clone().unwrap_or_default();
let dl_processor = ParallelDownloadProcessor::new(location, dl_config).await?;
let dl_processor =
ParallelDownloadProcessor::new(location, dl_config, self.inner.cfg.chain).await?;

// Decompress and extract and de-dupe each file in the archive.
stats::mithril_extract_started(self.inner.cfg.chain);
Expand Down
Loading

0 comments on commit 74e0fb9

Please sign in to comment.