diff --git a/crates/bin/pd/src/migrate/testnet78.rs b/crates/bin/pd/src/migrate/testnet78.rs index 0074b4b034..468e4e677a 100644 --- a/crates/bin/pd/src/migrate/testnet78.rs +++ b/crates/bin/pd/src/migrate/testnet78.rs @@ -169,9 +169,9 @@ async fn delete_empty_deleted_packet_commitments( pin_mut!(stream); while let Some(entry) = stream.next().await { - let (substore_key, value) = entry?; + let (key, value) = entry?; if value.is_empty() { - delta.delete(format!("ibc-data/{substore_key}")); + delta.delete(key); } } diff --git a/crates/cnidarium/src/snapshot.rs b/crates/cnidarium/src/snapshot.rs index e36eef64d8..eb3518805a 100644 --- a/crates/cnidarium/src/snapshot.rs +++ b/crates/cnidarium/src/snapshot.rs @@ -1,3 +1,4 @@ +use std::iter; use std::{any::Any, sync::Arc}; use anyhow::Result; @@ -271,6 +272,7 @@ impl StateRead for Snapshot { let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix); tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore"); + let substore_prefix = config.prefix.clone(); let version = self .substore_version(&config) @@ -302,18 +304,21 @@ impl StateRead for Snapshot { for tuple in jmt_keys_iterator { // For each key that matches the prefix, fetch the value from the JMT column family. let (key_preimage, _) = tuple?; + let substore_key = std::str::from_utf8(key_preimage.as_ref()) + .expect("saved jmt keys are utf-8 strings"); + let key_hash = jmt::KeyHash::with::(substore_key.as_bytes()); - let k = std::str::from_utf8(key_preimage.as_ref()) - .expect("saved jmt keys are utf-8 strings") - .to_string(); - - let key_hash = jmt::KeyHash::with::(k.as_bytes()); + let full_key = if substore_prefix.is_empty() { + substore_key.to_string() + } else { + format!("{substore_prefix}/{substore_key}").to_string() + }; let v = substore .get_jmt(key_hash)? .expect("keys in jmt_keys should have a corresponding value in jmt"); - tx_prefix_item.blocking_send(Ok((k, v)))?; + tx_prefix_item.blocking_send(Ok((full_key, v)))?; } anyhow::Ok(()) }) @@ -332,7 +337,6 @@ impl StateRead for Snapshot { let db = self.0.db.clone(); let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix); - tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore"); let version = self .substore_version(&config) @@ -357,12 +361,20 @@ impl StateRead for Snapshot { .rocksdb_snapshot .iterator_cf_opt(cf_jmt_keys, options, mode); + let substore_prefix = &substore.config.prefix; + for key_and_keyhash in iter { let (raw_preimage, _) = key_and_keyhash?; let preimage = std::str::from_utf8(raw_preimage.as_ref()) - .expect("saved jmt keys are utf-8 strings") - .to_string(); - tx_prefix_keys.blocking_send(Ok(preimage))?; + .expect("saved jmt keys are utf-8 strings"); + + let full_key = if substore_prefix.is_empty() { + preimage.to_string() + } else { + format!("{substore_prefix}/{preimage}").to_string() + }; + + tx_prefix_keys.blocking_send(Ok(full_key))?; } anyhow::Ok(()) }) @@ -403,9 +415,24 @@ impl StateRead for Snapshot { substore .rocksdb_snapshot .iterator_cf_opt(cf_nonverifiable, options, mode); + let substore_prefix = substore.config.prefix.as_bytes().to_vec(); for i in iter { - let (key, value) = i?; - tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?; + let (boxed_key, boxed_value) = i?; + let key: Vec = boxed_key.into(); + let value: Vec = boxed_value.into(); + + // Costly to do on every iteration, but this should be dwarfed by the + // context switch to the tokio runtime. + let mut full_key: Vec = vec![]; + if substore_prefix.is_empty() { + full_key.extend(key); + } else { + full_key.extend(substore_prefix.clone()); + full_key.extend(iter::once(b'/')); + full_key.extend(key); + } + + tx_prefix_query.blocking_send(Ok((full_key, value)))?; } anyhow::Ok(()) }) @@ -416,7 +443,7 @@ impl StateRead for Snapshot { /// Returns a stream of all key-value pairs with the given prefix, and range /// from nonverifiable storage. - /// TODO(erwan): For now this method only supports range queries over the main store. + /// **Important**: Only supports range queries over the main store. fn nonverifiable_range_raw( &self, prefix: Option<&[u8]>, diff --git a/crates/cnidarium/tests/substore_tests.rs b/crates/cnidarium/tests/substore_tests.rs index 1d44992146..f3ee5e4f83 100644 --- a/crates/cnidarium/tests/substore_tests.rs +++ b/crates/cnidarium/tests/substore_tests.rs @@ -266,7 +266,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_a/{key}"); if counter >= kv_a.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); panic!("prefix_keys query returned too many entries") @@ -287,7 +286,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_b/{key}"); if counter >= kv_b.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); @@ -380,7 +378,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_a/{key}"); if counter >= kv_a.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); panic!("prefix_keys query returned too many entries") @@ -401,7 +398,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_b/{key}"); if counter >= kv_b.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); @@ -497,7 +493,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> { let (raw_key, raw_value) = res?; let key = String::from_utf8(raw_key)?; let value = String::from_utf8(raw_value)?; - let key = format!("prefix_a/{key}"); if counter >= kv_a.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); panic!("prefix_keys query returned too many entries") @@ -523,7 +518,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> { let (raw_key, raw_value) = res?; let key = String::from_utf8(raw_key)?; let value = String::from_utf8(raw_value)?; - let key = format!("prefix_b/{key}"); if counter >= kv_b.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); @@ -663,3 +657,95 @@ async fn test_substore_nv_range_queries_main_store() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +/// Minimal reproduction of the prefix range cache bug. +/// +/// Context: +/// `cnidarium`, our storage layer, supports prefix storage. +/// This allows users to configure independent storage units, each with +/// their own merkle tree, nonverifiable sidecar, and separate namespace. +/// Routing is done transparently without the user having to worry about +/// the details. +/// +/// Overview: +/// Prefix queries return tuples of (key, value)s, but instead of +/// returning the full key, they return the substore key. This is a layering +/// violation, and indeed causes a bug in the cache interleaving logic. +/// +/// Terminology: +/// - a `full_key`: a key that contains a substore prefix, a delimiter, and a substore key. +/// - a `substore_key`: a key with a stripped prefix. +/// +/// Walkthrough: +/// `StateDelta` index changes using full keys, as it is not aware of the +/// particular substore configuration that it is working against, by design. +/// As part of the cache interleaving logic, the `StateDetla` will try look for +/// new writes or covering deletions. However, since the base prefix implementation +/// returns substore keys, the cache will build an incoherence range and panic (or miss data). +async fn reproduction_bad_substore_cache_range() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + // We pick a friendly prefix with high lexicographic order to help + // with reproducing a "bad range" where the lower boundn is greater than + // the upper bound. + let substore_prefix = "zest".to_string(); + let substore_prefixes = vec![substore_prefix.clone()]; + let storage = Storage::load(db_path, substore_prefixes).await?; + + // Write some keys in the substore so that we can prefix range over something + let mut delta = StateDelta::new(storage.latest_snapshot()); + + let mut substore_kvs = vec![]; + + for i in 0..100 { + let k = format!("{}/key_{i:020}", substore_prefix); + let v = format!("value_{i}").as_bytes().to_vec(); + delta.put_raw(k.clone(), v.clone()); + substore_kvs.push(k) + } + + let _ = storage.commit(delta).await?; + let snapshot = storage.latest_snapshot(); + + // We can prefix range fine on a static snapshot. + let mut naive_prefix = snapshot.prefix_raw("zest/"); + // Track the number of prefix entries returned as a basic check. + let mut visited = vec![]; + while let Some(entry) = naive_prefix.next().await { + let (k, _) = entry?; + visited.push(k); + } + assert_eq!(visited, substore_kvs, "prefix query is missing keys"); + + // We established that we can do prefix range on a static snapshot. + // Now let's try on a no-op `StateDelta` + let mut delta = StateDelta::new(snapshot); + let mut clean_delta_prefix = delta.prefix_raw("zest/"); + let mut visited = vec![]; + while let Some(entry) = clean_delta_prefix.next().await { + let (k, _) = entry?; + visited.push(k); + } + assert_eq!(visited, substore_kvs, "prefix query is missing keys"); + + // It worked, finally let's try on a dirty delta. + delta.put_raw( + "zest/normal_key".to_string(), + "normal_value".as_bytes().to_vec(), + ); + let mut dirty_delta_prefix = delta.prefix_raw("zest/"); + let mut visited = vec![]; + // Cache interleaving logic will build a bad range and cause a panic. + // Check out `v0.77.3` or prior to see the panic. + while let Some(entry) = dirty_delta_prefix.next().await { + let (k, _) = entry?; + visited.push(k); + } + // Add the key we wrote to the substore. + substore_kvs.push("zest/normal_key".to_string()); + assert_eq!(visited, substore_kvs, "prefix query is missing keys"); + + Ok(()) +}