Skip to content

Commit

Permalink
cnidarium: use incremental jmt migration (#4053)
Browse files Browse the repository at this point in the history
Close #3506, this PR:
- use `[email protected]` which includes the `migration` feature
- bump `borsh` to `1.3.0` to be able to serialize jmt nodes
- revamp the substore migration tests to include basic tests and a
proptest strategy
- notably, the proptest integration tests lay the groundwork for
deeper/more targeted strategies
  • Loading branch information
erwanor authored Mar 23, 2024
1 parent 77665ba commit c7cadd5
Show file tree
Hide file tree
Showing 7 changed files with 1,146 additions and 125 deletions.
121 changes: 83 additions & 38 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ ibig = { version = "0.3" }
ics23 = { version = "0.11.0" }
im = { version = "^15.1.0" }
indicatif = { version = "0.16" }
jmt = { version = "0.9" }
jmt = { version = "0.10", features = ["migration"] }
metrics = { version = "0.22" }
metrics-tracing-context = { version = "0.15" }
num-bigint = { version = "0.4" }
Expand Down
11 changes: 7 additions & 4 deletions crates/cnidarium/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ edition = {workspace = true}

[features]
migration = []
migration-proptests = ["migration"]
default = ["metrics"]
rpc = ["dep:tonic", "dep:prost", "dep:serde", "dep:pbjson", "dep:ibc-proto"]

[dependencies]
anyhow = {workspace = true}
async-trait = {workspace = true}
borsh = "0.10.3"
borsh = { version = "1.3.0" , features = ["derive", "de_strict_order"]}
futures = {workspace = true}
hex = {workspace = true}
ibc-proto = {workspace = true, default-features = false, features = ["serde"], optional = true}
Expand All @@ -37,6 +38,8 @@ tonic = {workspace = true, optional = true}
tracing = {workspace = true}

[dev-dependencies]
tempfile = {workspace = true}
tracing-subscriber = {workspace = true}
tokio = {workspace = true}
tempfile = { workspace = true }
tracing-subscriber = { workspace = true }
tokio = { workspace = true, features = ["full", "rt-multi-thread"] }
proptest = "1.3.1"
test-strategy = "0.3.1"
29 changes: 16 additions & 13 deletions crates/cnidarium/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,24 +323,27 @@ impl Storage {
// its own changes to the batch, and we will commit it at the end.
let mut write_batch = rocksdb::WriteBatch::default();

// Note(erwan): Here, we iterate over each substore, and spawn a task to
// commit it. Since we know that the substore keyspace is disjoint, we
// could consider rewriting this loop into a [`tokio::task::JoinSet`],
// however consider that `rocksdb::WriteBatch` is _not_ thread-safe.
// Note(erwan): Here, we spawn a commit task for each substore.
// The substore keyspaces are disjoint, so conceptually it is
// fine to rewrite it using a [`tokio::task::JoinSet`].
// The reason this isn't done is because `rocksdb::WriteBatch`
// is _not_ thread-safe.
//
// This means that to spin-up N tasks, we would need to use a
// single batch wrapped in a mutex, or use N batches, and find
// a way to commit to them atomically. Since that is not supported
// by RocksDB, we would have to iterate over each entry in each
// batch, and merge them together.
// a way to commit to them atomically. This isn't supported by
// RocksDB which leaves one option: to iterate over each entry
// in each batch, and merge them together. At this point, this
// is probably not worth it.
//
// Another option is to trade atomicity for parallelism by producing
// N batches, and committing them in distinct atomic writes. This is
// dangerous because it could leave the node in an inconsistent state.
// potentially faster, but it is also more dangerous, because if one
// of the writes fails, we are left with a partially committed state.
//
// Instead of doing that, we lean on the fact that the number of substores
// is small, and that the synchronization overhead of a joinset would exceed
// its benefits.
// The current implementation leans on the fact that the number of
// substores is small, and that the synchronization overhead of a joinset
// would exceed its benefits. This works well for now.
for config in self.0.multistore_config.iter() {
tracing::debug!(substore_prefix = ?config.prefix, "processing substore");
// If the substore is empty, we need to fetch its initialized version from the cache.
Expand Down Expand Up @@ -376,7 +379,7 @@ impl Storage {

// Commit the substore and collect the root hash
let (root_hash, substore_batch) = substore_storage
.commit(changeset, write_batch, version)
.commit(changeset, write_batch, version, perform_migration)
.await?;
write_batch = substore_batch;

Expand Down Expand Up @@ -417,7 +420,7 @@ impl Storage {
};

let (global_root_hash, write_batch) = main_store_storage
.commit(main_store_changes, write_batch, version)
.commit(main_store_changes, write_batch, version, perform_migration)
.await?;
tracing::debug!(
?global_root_hash,
Expand Down
5 changes: 4 additions & 1 deletion crates/cnidarium/src/store/multistore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ impl MultistoreConfig {
/// # Examples
/// `prefix_a/key` -> `key` in `substore_a`
/// `prefix_a` -> `prefix_a` in `main_store`
/// `preifx_a/` -> `prefix_a/` in `main_store`
/// `prefix_a/` -> `prefix_a/` in `main_store`
/// `nonexistent_prefix` -> `nonexistent_prefix` in `main_store`
pub fn route_key_bytes<'a>(&self, key: &'a [u8]) -> (&'a [u8], Arc<SubstoreConfig>) {
let config = self.find_substore(key);

// If the key is a total match for the prefix, we return the original key
// routed to the main store. This is where subtree root hashes are stored.
if key == config.prefix.as_bytes() {
return (key, self.main_store.clone());
}
Expand Down
30 changes: 16 additions & 14 deletions crates/cnidarium/src/store/substore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ impl SubstoreStorage {
self,
cache: Cache,
mut write_batch: rocksdb::WriteBatch,
new_version: jmt::Version,
write_version: jmt::Version,
perform_migration: bool,
) -> Result<(RootHash, rocksdb::WriteBatch)> {
let span = Span::current();

Expand All @@ -369,8 +370,6 @@ impl SubstoreStorage {
.spawn_blocking(move || {
span.in_scope(|| {
let jmt = jmt::Sha256Jmt::new(&self.substore_snapshot);

// TODO(erwan): this could be folded with sharding the changesets.
let unwritten_changes: Vec<_> = cache
.unwritten_changes
.into_iter()
Expand All @@ -394,10 +393,15 @@ impl SubstoreStorage {
};
}

let (root_hash, batch) = jmt.put_value_set(
unwritten_changes.into_iter().map(|(keyhash, _key, some_value)| (keyhash, some_value)),
new_version,
)?;
// We only track the keyhash and possible values; at the time of writing,
// `rustfmt` panics on inlining the closure, so we use a helper function to skip the key.
let skip_key = |(keyhash, _key, some_value)| (keyhash, some_value);

let (root_hash, batch) = if perform_migration {
jmt.append_value_set(unwritten_changes.into_iter().map(skip_key), write_version)?
} else {
jmt.put_value_set(unwritten_changes.into_iter().map(skip_key), write_version)?
};

self.write_node_batch(&batch.node_batch)?;
tracing::trace!(?root_hash, "wrote node batch to backing store");
Expand Down Expand Up @@ -426,8 +430,6 @@ impl TreeWriter for SubstoreStorage {
/// nodes (`DbNodeKey` -> `Node`) and the JMT values,
/// (`VersionedKeyHash` -> `Option<Vec<u8>>`).
fn write_node_batch(&self, node_batch: &jmt::storage::NodeBatch) -> Result<()> {
use borsh::BorshSerialize;

let node_batch = node_batch.clone();
let cf_jmt = self
.substore_snapshot
Expand All @@ -437,8 +439,8 @@ impl TreeWriter for SubstoreStorage {
for (node_key, node) in node_batch.nodes() {
let db_node_key = DbNodeKey::from(node_key.clone());
let db_node_key_bytes = db_node_key.encode()?;
let value_bytes = &node.try_to_vec()?;
tracing::trace!(?db_node_key_bytes, value_bytes = ?hex::encode(value_bytes));
let value_bytes = borsh::to_vec(node)?;
tracing::trace!(?db_node_key_bytes, value_bytes = ?hex::encode(&value_bytes));
self.substore_snapshot
.db
.put_cf(cf_jmt, db_node_key_bytes, value_bytes)?;
Expand All @@ -451,8 +453,8 @@ impl TreeWriter for SubstoreStorage {
for ((version, key_hash), some_value) in node_batch.values() {
let versioned_key = VersionedKeyHash::new(*version, *key_hash);
let key_bytes = &versioned_key.encode();
let value_bytes = &some_value.try_to_vec()?;
tracing::trace!(?key_bytes, value_bytes = ?hex::encode(value_bytes));
let value_bytes = borsh::to_vec(some_value)?;
tracing::trace!(?key_bytes, value_bytes = ?hex::encode(&value_bytes));

self.substore_snapshot
.db
Expand All @@ -479,7 +481,7 @@ impl DbNodeKey {
pub fn encode(&self) -> Result<Vec<u8>> {
let mut bytes = Vec::new();
bytes.extend_from_slice(&self.0.version().to_be_bytes()); // encode version as big-endian
let rest = borsh::BorshSerialize::try_to_vec(&self.0)?;
let rest = borsh::to_vec(&self.0)?;
bytes.extend_from_slice(&rest);
Ok(bytes)
}
Expand Down
Loading

0 comments on commit c7cadd5

Please sign in to comment.