Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cnidarium: implement deferred commits via StagedWriteBatch #4122

Merged
merged 9 commits into from
Apr 3, 2024
198 changes: 158 additions & 40 deletions crates/cnidarium/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{path::PathBuf, sync::Arc};
// use tokio_stream::wrappers::WatchStream;

use anyhow::{bail, ensure, Result};
use parking_lot::RwLock;
use rocksdb::{Options, DB};
use std::collections::HashMap;
use tokio::sync::watch;
use tracing::Span;

Expand All @@ -15,7 +15,7 @@ use crate::{
substore::{SubstoreConfig, SubstoreSnapshot, SubstoreStorage},
},
};
use crate::{snapshot_cache::SnapshotCache, StateDelta};
use crate::{snapshot_cache::SnapshotCache, RootHash, StateDelta};

mod temp;
pub use temp::TempStorage;
Expand Down Expand Up @@ -46,6 +46,40 @@ struct Inner {
db: Arc<DB>,
}

/// A staged write batch that can be committed to RocksDB.
///
/// This allows for write batches to be prepared and committed at a later time.
pub struct StagedWriteBatch {
erwanor marked this conversation as resolved.
Show resolved Hide resolved
/// The write batch to commit to RocksDB.
pub(crate) write_batch: rocksdb::WriteBatch,
/// The new version of the chain state.
pub(crate) version: jmt::Version,
/// The new versions of each substore.
pub(crate) multistore_versions: multistore::MultistoreCache,
/// The root hash of the chain state corresponding to this set of changes.
pub(crate) root_hash: RootHash,
/// The configs, root hashes, and new versions of each substore
/// that was updated in this batch.
pub(crate) substore_roots: HashMap<Arc<SubstoreConfig>, (RootHash, u64)>,
/// Whether or not to perform a migration.
pub(crate) perform_migration: bool,
/// A lightweight copy of the changeset, this is useful to provide
/// a stream of changes to subscribers.
pub(crate) changes: Arc<Cache>,
}

impl StagedWriteBatch {
/// Returns the new version of the chain state corresponding to this set of changes.
pub fn version(&self) -> jmt::Version {
self.version
}

/// Returns the root hash of the jmt corresponding to this set of changes.
pub fn root_hash(&self) -> &RootHash {
&self.root_hash
}
}

impl Storage {
/// Loads a storage instance from the given path, initializing it if necessary.
pub async fn load(path: PathBuf, default_prefixes: Vec<String>) -> Result<Self> {
Expand Down Expand Up @@ -271,9 +305,9 @@ impl Storage {
self.0.snapshots.read().get(version)
}

/// Commits the provided [`StateDelta`] to persistent storage as the latest
/// version of the chain state.
pub async fn commit(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
/// Prepares a commit for the provided [`StateDelta`], returning a [`StagedWriteBatch`].
/// The batch can be committed to the database using the [`Storage::commit_batch`] method.
pub async fn prepare_commit(&self, delta: StateDelta<Snapshot>) -> Result<StagedWriteBatch> {
// Extract the snapshot and the changes from the state delta
let (snapshot, changes) = delta.flatten();
let prev_snapshot_version = snapshot.version();
Expand All @@ -286,35 +320,28 @@ impl Storage {

ensure!(
prev_storage_version == prev_snapshot_version,
"trying to commit a delta forked from version {}, but the latest version is {}",
"trying to prepare a commit for a delta forked from version {}, but the latest version is {}",
prev_snapshot_version,
prev_storage_version
);

self.commit_inner(snapshot, changes, next_storage_version, false)
self.prepare_commit_inner(snapshot, changes, next_storage_version, false)
.await
}

/// Commits the supplied [`Cache`] to persistent storage.
///
/// # Migrations
/// In the case of chain state migrations we need to commit the new state
/// without incrementing the version. If `perform_migration` is `true` the
/// snapshot will _not_ be written to the snapshot cache, and no subscribers
/// will be notified. Substore versions will not be updated.
async fn commit_inner(
async fn prepare_commit_inner(
&self,
snapshot: Snapshot,
cache: Cache,
version: jmt::Version,
perform_migration: bool,
) -> Result<crate::RootHash> {
tracing::debug!(new_jmt_version = ?version, "committing state delta");
) -> Result<StagedWriteBatch> {
tracing::debug!(new_jmt_version = ?version, "preparing to commit state delta");
// Save a copy of the changes to send to subscribers later.
let changes = Arc::new(cache.clone_changes());

let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config);
let mut substore_roots = Vec::new();
let mut substore_roots = HashMap::new();
let mut multistore_versions =
multistore::MultistoreCache::from_config(self.0.multistore_config.clone());

Expand Down Expand Up @@ -366,24 +393,24 @@ impl Storage {
continue;
};

let version = if perform_migration {
let new_version = if perform_migration {
old_substore_version
} else {
old_substore_version.wrapping_add(1)
};
new_versions.push(version);
new_versions.push(new_version);
let substore_snapshot = SubstoreSnapshot {
config: config.clone(),
rocksdb_snapshot: rocksdb_snapshot.clone(),
version,
version: new_version,
db: db.clone(),
};

let substore_storage = SubstoreStorage { substore_snapshot };

// Commit the substore and collect its root hash
let (root_hash, substore_batch) = substore_storage
.commit(changeset, write_batch, version, perform_migration)
.commit(changeset, write_batch, new_version, perform_migration)
.await?;
write_batch = substore_batch;

Expand All @@ -393,7 +420,15 @@ impl Storage {
?version,
"added substore to write batch"
);
substore_roots.push((config.clone(), root_hash, version));
substore_roots.insert(config.clone(), (root_hash, new_version));

tracing::debug!(
?root_hash,
prefix = ?config.prefix,
?new_version,
"updating substore version"
);
multistore_versions.set_version(config.clone(), new_version);
}

// Add substore roots to the main store changeset
Expand All @@ -405,7 +440,7 @@ impl Storage {
Cache::default()
});

for (config, root_hash, _) in substore_roots.iter() {
for (config, (root_hash, _)) in substore_roots.iter() {
main_store_changes
.unwritten_changes
.insert(config.prefix.to_string(), Some(root_hash.0.to_vec()));
Expand All @@ -432,27 +467,108 @@ impl Storage {
"added main store to write batch"
);

tracing::debug!(?global_root_hash, version = ?version, "updating main store version");
let main_store_config = self.0.multistore_config.main_store.clone();
multistore_versions.set_version(main_store_config, version);

Ok(StagedWriteBatch {
write_batch,
version,
multistore_versions,
root_hash: global_root_hash,
substore_roots,
perform_migration,
changes,
})
}

/// Commits the provided [`StateDelta`] to persistent storage as the latest
/// version of the chain state.
pub async fn commit(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
let batch = self.prepare_commit(delta).await?;
self.commit_batch(batch).await
}

/// Commits the supplied [`StagedWriteBatch`] to persistent storage.
///
/// # Migrations
/// In the case of chain state migrations we need to commit the new state
/// without incrementing the version. If `perform_migration` is `true` the
/// snapshot will _not_ be written to the snapshot cache, and no subscribers
/// will be notified. Substore versions will not be updated.
async fn commit_batch(&self, batch: StagedWriteBatch) -> Result<crate::RootHash> {
erwanor marked this conversation as resolved.
Show resolved Hide resolved
let StagedWriteBatch {
write_batch,
version,
multistore_versions,
root_hash: global_root_hash,
substore_roots,
perform_migration,
changes,
} = batch;

let db = self.0.db.clone();

// check that the version of the batch being committed is the correct next version
let old_version = self.latest_version();
let expected_new_version = if perform_migration {
old_version
} else {
old_version.wrapping_add(1)
};

ensure!(
expected_new_version == version,
"new version mismatch: expected {} but got {}",
expected_new_version,
version
);

// also check that each of the substore versions are the correct next version
let snapshot = self.latest_snapshot();
for (config, new_version) in &multistore_versions.substores {
erwanor marked this conversation as resolved.
Show resolved Hide resolved
if config.prefix.is_empty() {
// this is the main store, ignore
continue;
}

let old_substore_version = config
.latest_version_from_snapshot(&db, &snapshot.0.snapshot)?
erwanor marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or_else(|| {
tracing::debug!("substore is empty, fetching initialized version from cache");
snapshot
.substore_version(&config)
.expect("prefix should be initialized")
});

// if the substore exists in `substore_roots`, there have been updates to the substore.
// if `perform_migration` is false and there are updates, the next version should be previous + 1.
// otherwise, the version should remain the same.
let expected_new_version = if substore_roots.get(config).is_some() && !perform_migration
{
old_substore_version.wrapping_add(1)
} else {
old_substore_version
};

ensure!(
expected_new_version == *new_version,
"substore new version mismatch for substore with prefix {}: expected {} but got {}",
config.prefix,
expected_new_version,
new_version
);
}

tracing::debug!(new_jmt_version = ?batch.version, "committing batch to db");

db.write(write_batch).expect("can write to db");
tracing::debug!(
?global_root_hash,
?version,
"committed main store and substores to db"
);

// Update the tracked versions for each substore.
for (config, root_hash, new_version) in substore_roots {
tracing::debug!(
?root_hash,
prefix = ?config.prefix,
?new_version,
"updating substore version"
);
multistore_versions.set_version(config, new_version);
}

tracing::debug!(?global_root_hash, ?version, "updating main store version");
multistore_versions.set_version(main_store_config, version);

// If we're not performing a migration, we should update the snapshot cache
if !perform_migration {
tracing::debug!("updating snapshot cache");
Expand Down Expand Up @@ -487,8 +603,10 @@ impl Storage {
pub async fn commit_in_place(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
let (snapshot, changes) = delta.flatten();
let old_version = self.latest_version();
self.commit_inner(snapshot, changes, old_version, true)
.await
let batch = self
.prepare_commit_inner(snapshot, changes, old_version, true)
.await?;
self.commit_batch(batch).await
}

/// Returns the internal handle to RocksDB, this is useful to test adjacent storage crates.
Expand Down
2 changes: 1 addition & 1 deletion crates/cnidarium/src/store/substore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use jmt::storage::TreeWriter;

/// Specifies the configuration of a substore, which is a prefixed subset of
/// the main store with its own merkle tree, nonverifiable data, preimage index, etc.
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct SubstoreConfig {
/// The prefix of the substore. If empty, it is the root-level store config.
pub prefix: String,
Expand Down
Loading