Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust/cardano-chain-follower): add thread and mmap file stats #150

Merged
merged 25 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7f2b8f0
feat(cardano-chain-follower): add thread and mmap file stats
bkioshn Jan 15, 2025
8fcaacb
fix(cardano-chain-follower): remove log
bkioshn Jan 15, 2025
c9d478e
fix(cardano-chain-follower): fix mmapfile name
bkioshn Jan 15, 2025
1bf8f75
fix(cardano-chain-follower): thread stat call
bkioshn Jan 15, 2025
3cc0d46
fix(cardano-chain-follower): thread logic
bkioshn Jan 15, 2025
8edb23f
fix(cardano-chain-follower): thread stat call
bkioshn Jan 15, 2025
476e131
fix(cardano-chain-follower): update forever task thread stat
bkioshn Jan 15, 2025
329455f
fix(cardano-chain-follower): linter and format
bkioshn Jan 15, 2025
2edc0e3
fix(cardano-chain-follower): thread stat name constants to its own file
bkioshn Jan 16, 2025
7a06a97
Merge branch 'main' into feat/thread_mmap_stats
stevenj Jan 16, 2025
6b68b3c
fix(cardano-chain-follower): thread stat calling
bkioshn Jan 17, 2025
d105c6c
fix(cardano-chain-follower): move mmap file to cat-types
bkioshn Jan 17, 2025
5276b16
fix(cardano-blockchain-types): format
bkioshn Jan 19, 2025
68dfa90
fix(cardano-chain-follower): remove unused dep
bkioshn Jan 19, 2025
4a6ab32
fix(cardano-chain-follower): thread stat calling
bkioshn Jan 19, 2025
7968e86
fix: mmapfile
bkioshn Jan 19, 2025
454c37e
Merge branch 'main' into feat/thread_mmap_stats
bkioshn Jan 19, 2025
d156158
Merge branch 'main' into feat/thread_mmap_stats
stevenj Jan 20, 2025
60960df
fix(cardano-blockchain-types): use RwLock for mmap stat
bkioshn Jan 20, 2025
56bb35a
fix(cardano-blockchain-types): add size function
bkioshn Jan 20, 2025
60c347e
fix(catalyst-types): mmap file error and structure
bkioshn Jan 21, 2025
6b1f1fb
fix(cardano-chain-follower): stat visibility
bkioshn Jan 21, 2025
8ba010f
fix(cardano-chain-follower): thread stat naming
bkioshn Jan 21, 2025
69479f4
fix(cardano-chain-follower): thread stat worker
bkioshn Jan 21, 2025
4aa61cf
Merge branch 'main' into feat/thread_mmap_stats
stevenj Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ maindbname
mapref
mdlint
mdns
MEMMAP
memx
Metadatum
mgrybyk
Expand Down
6 changes: 3 additions & 3 deletions rust/cardano-chain-follower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mithril-client = { version = "0.10.4", default-features = false, features = [
"num-integer-backend",
] }
cardano-blockchain-types = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250114-00" }
catalyst-types = { version = "0.0.1", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250108-00" }
catalyst-types = { version = "0.0.1", path = "../catalyst-types" }

thiserror = "1.0.69"
tokio = { version = "1.42.0", features = [
Expand All @@ -31,7 +31,7 @@ tokio = { version = "1.42.0", features = [
] }
tracing = "0.1.41"
tracing-log = "0.2.0"
dashmap = "6.1.0"
dashmap = { version = "6.1.0", features = ["serde"] }
url = "2.5.4"
anyhow = "1.0.95"
chrono = "0.4.39"
Expand All @@ -48,14 +48,14 @@ serde = "1.0.217"
serde_json = "1.0.134"
mimalloc = { version = "0.1.43", optional = true }
memx = "0.1.32"
fmmap = { version = "0.3.3", features = ["sync", "tokio-async"] }
zstd = "0.13.2"
logcall = "0.1.11"
tar = "0.4.43"
ureq = { version = "2.12.1", features = ["native-certs"] }
http = "1.2.0"
hickory-resolver = { version = "0.24.2", features = ["dns-over-rustls"] }
moka = { version = "0.12.9", features = ["sync"] }
cpu-time = "1.0.0"

[dev-dependencies]
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
Expand Down
7 changes: 6 additions & 1 deletion rust/cardano-chain-follower/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,13 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU

// Start the Live chain backfill task.
let _backfill_join_handle = spawn(async move {
stats::start_thread(
cfg.chain,
stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE,
true,
);
live_sync_backfill_and_purge(backfill_cfg.clone(), rx, sync_waiter).await;
stats::stop_thread(cfg.chain, stats::thread::name::LIVE_SYNC_BACKFILL_AND_PURGE);
});

// Live Fill data starts at fork 1.
Expand All @@ -539,7 +545,6 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
loop {
// We never have a connection if we end up around the loop, so make a new one.
let mut peer = persistent_reconnect(&cfg.relay_address, cfg.chain).await;

match resync_live_tip(&mut peer, cfg.chain).await {
Ok(tip) => debug!("Tip Resynchronized to {tip}"),
Err(error) => {
Expand Down
7 changes: 6 additions & 1 deletion rust/cardano-chain-follower/src/chain_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,13 @@ impl ChainSyncConfig {
// Start the Mithril Snapshot Follower
let rx = self.mithril_cfg.run().await?;

let config = self.clone();
// Start Chain Sync
*locked_handle = Some(tokio::spawn(chain_sync(self.clone(), rx)));
*locked_handle = Some(tokio::spawn(async move {
stats::start_thread(config.chain, stats::thread::name::CHAIN_SYNC, true);
chain_sync(config.clone(), rx).await;
stats::stop_thread(config.chain, stats::thread::name::CHAIN_SYNC);
}));

// sync_map.insert(chain, handle);
debug!("Chain Sync for {} : Started", self.chain);
Expand Down
5 changes: 3 additions & 2 deletions rust/cardano-chain-follower/src/chain_sync_ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
};
use tracing::error;

use crate::chain_update;
use crate::{chain_update, stats};

/// Data we hold related to sync being ready or not.
struct SyncReady {
Expand Down Expand Up @@ -88,6 +88,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
let (tx, rx) = oneshot::channel::<()>();

tokio::spawn(async move {
stats::start_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY, true);
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
Expand All @@ -101,7 +102,7 @@ pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
if let Ok(()) = rx.await {
status.ready = true;
}

stats::stop_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY);
// If the channel closes early, we can NEVER use the Blockchain data.
});

Expand Down
17 changes: 12 additions & 5 deletions rust/cardano-chain-follower/src/mithril_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,32 @@

use std::path::Path;

use cardano_blockchain_types::Point;
use cardano_blockchain_types::{Network, Point};
use pallas_hardano::storage::immutable::FallibleBlock;
use tokio::task;

use crate::error::{Error, Result};
use crate::{
error::{Error, Result},
stats,
};

/// Synchronous Immutable block iterator.
pub(crate) type ImmutableBlockIterator = Box<dyn Iterator<Item = FallibleBlock> + Send + Sync>;

/// Get a mithril snapshot iterator.
pub(crate) async fn make_mithril_iterator(
path: &Path, start: &Point,
path: &Path, start: &Point, chain: Network,
) -> Result<ImmutableBlockIterator> {
let path = path.to_path_buf();
let start = start.clone();
// Initial input
let res = task::spawn_blocking(move || {
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
.map_err(|error| Error::MithrilSnapshot(Some(error)))
stats::start_thread(chain, stats::thread::name::MITHRIL_ITERATOR, false);
let result =
pallas_hardano::storage::immutable::read_blocks_from_point(&path, start.clone().into())
.map_err(|error| Error::MithrilSnapshot(Some(error)));
stats::stop_thread(chain, stats::thread::name::MITHRIL_ITERATOR);
result
})
.await;

Expand Down
12 changes: 11 additions & 1 deletion rust/cardano-chain-follower/src/mithril_snapshot_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
mithril_snapshot_data::{latest_mithril_snapshot_id, SnapshotData},
mithril_snapshot_sync::background_mithril_update,
snapshot_id::SnapshotId,
stats,
turbo_downloader::DlConfig,
};

Expand Down Expand Up @@ -413,7 +414,16 @@ impl MithrilSnapshotConfig {
let (tx, rx) = mpsc::channel::<MithrilUpdateMessage>(2);

// let handle = tokio::spawn(background_mithril_update(chain, self.clone(), tx));
*locked_handle = Some(tokio::spawn(background_mithril_update(self.clone(), tx)));
let config = self.clone();
*locked_handle = Some(tokio::spawn(async move {
stats::start_thread(
config.chain,
stats::thread::name::MITHRIL_SNAPSHOT_UPDATER,
true,
);
background_mithril_update(config.clone(), tx).await;
stats::stop_thread(config.chain, stats::thread::name::MITHRIL_SNAPSHOT_UPDATER);
}));

// sync_map.insert(chain, handle);
debug!(
Expand Down
6 changes: 3 additions & 3 deletions rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl MithrilSnapshotIterator {
chain: Network, path: &Path, from: &Point, search_interval: u64,
) -> Option<MithrilSnapshotIterator> {
let point = probe_point(from, search_interval);
let Ok(mut iterator) = make_mithril_iterator(path, &point).await else {
let Ok(mut iterator) = make_mithril_iterator(path, &point, chain).await else {
return None;
};

Expand Down Expand Up @@ -116,7 +116,7 @@ impl MithrilSnapshotIterator {
let this = this?;

// Remake the iterator, based on the new known point.
let Ok(iterator) = make_mithril_iterator(path, &this).await else {
let Ok(iterator) = make_mithril_iterator(path, &this, chain).await else {
return None;
};

Expand Down Expand Up @@ -176,7 +176,7 @@ impl MithrilSnapshotIterator {

debug!("Actual Mithril Iterator Start: {}", from);

let iterator = make_mithril_iterator(path, from).await?;
let iterator = make_mithril_iterator(path, from, chain).await?;

Ok(MithrilSnapshotIterator {
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
Expand Down
9 changes: 7 additions & 2 deletions rust/cardano-chain-follower/src/mithril_snapshot_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,12 @@ async fn validate_mithril_snapshot(
match tokio::spawn(async move {
// This can be long running and CPU Intensive.
// So we spawn it off to a background task.
MessageBuilder::new()
stats::start_thread(chain, stats::thread::name::COMPUTE_SNAPSHOT_MSG, true);
let result = MessageBuilder::new()
.compute_snapshot_message(&cert, &mithril_path)
.await
.await;
stats::stop_thread(chain, stats::thread::name::COMPUTE_SNAPSHOT_MSG);
result
})
.await
{
Expand Down Expand Up @@ -517,6 +520,7 @@ fn background_validate_mithril_snapshot(
chain: Network, certificate: MithrilCertificate, tmp_path: PathBuf,
) -> tokio::task::JoinHandle<bool> {
tokio::spawn(async move {
stats::start_thread(chain, stats::thread::name::VALIDATE_MITHRIL_SNAPSHOT, true);
debug!(
"Mithril Snapshot background updater for: {} : Check Certificate.",
chain
Expand All @@ -540,6 +544,7 @@ fn background_validate_mithril_snapshot(
chain
);

stats::stop_thread(chain, stats::thread::name::VALIDATE_MITHRIL_SNAPSHOT);
true
})
}
Expand Down
53 changes: 29 additions & 24 deletions rust/cardano-chain-follower/src/mithril_turbo_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ use std::{

use anyhow::{anyhow, bail};
use async_trait::async_trait;
use catalyst_types::conversion::from_saturating;
use catalyst_types::{conversion::from_saturating, mmap_file::MemoryMapFile};
use dashmap::DashSet;
use fmmap::MmapFileExt;
use memx::memcmp;
use mithril_client::{
common::CompressionAlgorithm, snapshot_downloader::SnapshotDownloader, MithrilResult,
Expand Down Expand Up @@ -134,8 +133,7 @@ impl Inner {
self.ext_size.fetch_add(entry_size, Ordering::SeqCst);

// Try and deduplicate the file if we can, otherwise just extract it.
if let Ok((prev_mmap, _)) =
Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref())
if let Ok(prev_mmap) = Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref())
{
let expected_file_size = from_saturating(entry_size);
let mut buf: Vec<u8> = Vec::with_capacity(expected_file_size);
Expand Down Expand Up @@ -225,7 +223,7 @@ impl Inner {
/// Check if a given path from the archive is able to be deduplicated.
fn can_deduplicate(
rel_file: &Path, file_size: u64, prev_file: Option<&PathBuf>,
) -> MithrilResult<(fmmap::MmapFile, u64)> {
) -> MithrilResult<MemoryMapFile> {
// Can't dedup if the current file is not de-dupable (must be immutable)
if rel_file.starts_with("immutable") {
// Can't dedup if we don't have a previous file to dedup against.
Expand All @@ -234,8 +232,8 @@ impl Inner {
// If the current file is not exactly the same as the previous file size, we
// can't dedup.
if file_size == current_size {
if let Ok(pref_file_loaded) = mmap_open_sync(prev_file) {
if pref_file_loaded.1 == file_size {
if let Ok(pref_file_loaded) = Self::mmap_open_sync(prev_file) {
if pref_file_loaded.size() == file_size {
return Ok(pref_file_loaded);
}
}
Expand All @@ -245,6 +243,17 @@ impl Inner {
}
bail!("Can not deduplicate.");
}

/// Open a file using mmap for performance.
fn mmap_open_sync(path: &Path) -> MithrilResult<MemoryMapFile> {
match MemoryMapFile::try_from(path) {
Ok(mmap_file) => Ok(mmap_file),
Err(error) => {
error!(error=%error, file=%path.to_string_lossy(), "Failed to open file");
Err(error.into())
},
}
}
}

/// A snapshot downloader that accelerates Download using `aria2`.
Expand Down Expand Up @@ -302,7 +311,17 @@ impl MithrilTurboDownloader {
let target_dir = target_dir.to_owned();

// This is fully synchronous IO, so do it on a sync thread.
let result = spawn_blocking(move || inner.dl_and_dedup(&location, &target_dir)).await;
let result = spawn_blocking(move || {
stats::start_thread(
inner.cfg.chain,
stats::thread::name::MITHRIL_DL_DEDUP,
false,
);
let result = inner.dl_and_dedup(&location, &target_dir);
stats::stop_thread(inner.cfg.chain, stats::thread::name::MITHRIL_DL_DEDUP);
result
})
.await;

if let Ok(result) = result {
return result;
Expand All @@ -321,20 +340,6 @@ fn get_file_size_sync(file: &Path) -> Option<u64> {
Some(metadata.len())
}

/// Open a file using mmap for performance.
fn mmap_open_sync(path: &Path) -> MithrilResult<(fmmap::MmapFile, u64)> {
match fmmap::MmapFile::open_with_options(path, fmmap::Options::new().read(true).populate()) {
Ok(file) => {
let len = file.len() as u64;
Ok((file, len))
},
Err(error) => {
error!(error=%error, file=%path.to_string_lossy(), "Failed to open file");
Err(error.into())
},
}
}

#[async_trait]
impl SnapshotDownloader for MithrilTurboDownloader {
async fn download_unpack(
Expand Down Expand Up @@ -366,9 +371,9 @@ impl SnapshotDownloader for MithrilTurboDownloader {

async fn probe(&self, location: &str) -> MithrilResult<()> {
debug!("Probe Snapshot location='{location}'.");

let dl_config = self.inner.cfg.dl_config.clone().unwrap_or_default();
let dl_processor = ParallelDownloadProcessor::new(location, dl_config).await?;
let dl_processor =
ParallelDownloadProcessor::new(location, dl_config, self.inner.cfg.chain).await?;

// Decompress and extract and de-dupe each file in the archive.
stats::mithril_extract_started(self.inner.cfg.chain);
Expand Down
Loading
Loading