From 67511d8f9a65d337822e014dd0332fe8a39feef4 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 24 Jun 2024 17:56:45 -0400 Subject: [PATCH] cnidarium: sound prefix queries on substores (#4653) ## Describe your changes This PR contains a minimal reproduction for a bug in cnidarium's prefix query handling. It also contains a sketch for a fix that we can workshop. The bug was introduced in the original substore implementation PR (#3131). ## Minimal reproduction of the prefix range cache bug. **Context** `cnidarium`, our storage layer, supports prefix storage. This allows users to configure independent storage units, each with their own merkle tree, nonverifiable sidecar, and separate namespace. Routing is done transparently without the user having to worry about the details. **Overview** Prefix queries return tuples of (key, value)s, but instead of returning the full key, they return the substore key. This is a layering violation, and indeed causes a bug in the cache interleaving logic. **Terminology** - a `full_key`: a key that contains a substore prefix, a delimiter, and a substore key. - a `substore_key`: a key with a stripped prefix. **Walkthrough** `StateDelta` index changes using full keys, as it is not aware of the particular substore configuration that it is working against, by design. As part of the cache interleaving logic, the `StateDetla` will try look for new writes or covering deletions. However, since the base prefix implementation returns substore keys, the cache will build an incoherence range and panic (or miss data). ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > Consensus breaking in the sense that the chain won't halt if we hit this. --- crates/bin/pd/src/migrate/testnet78.rs | 4 +- crates/cnidarium/src/snapshot.rs | 53 +++++++++---- crates/cnidarium/tests/substore_tests.rs | 98 ++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 21 deletions(-) diff --git a/crates/bin/pd/src/migrate/testnet78.rs b/crates/bin/pd/src/migrate/testnet78.rs index 0074b4b034..468e4e677a 100644 --- a/crates/bin/pd/src/migrate/testnet78.rs +++ b/crates/bin/pd/src/migrate/testnet78.rs @@ -169,9 +169,9 @@ async fn delete_empty_deleted_packet_commitments( pin_mut!(stream); while let Some(entry) = stream.next().await { - let (substore_key, value) = entry?; + let (key, value) = entry?; if value.is_empty() { - delta.delete(format!("ibc-data/{substore_key}")); + delta.delete(key); } } diff --git a/crates/cnidarium/src/snapshot.rs b/crates/cnidarium/src/snapshot.rs index e36eef64d8..eb3518805a 100644 --- a/crates/cnidarium/src/snapshot.rs +++ b/crates/cnidarium/src/snapshot.rs @@ -1,3 +1,4 @@ +use std::iter; use std::{any::Any, sync::Arc}; use anyhow::Result; @@ -271,6 +272,7 @@ impl StateRead for Snapshot { let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix); tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore"); + let substore_prefix = config.prefix.clone(); let version = self .substore_version(&config) @@ -302,18 +304,21 @@ impl StateRead for Snapshot { for tuple in jmt_keys_iterator { // For each key that matches the prefix, fetch the value from the JMT column family. let (key_preimage, _) = tuple?; + let substore_key = std::str::from_utf8(key_preimage.as_ref()) + .expect("saved jmt keys are utf-8 strings"); + let key_hash = jmt::KeyHash::with::(substore_key.as_bytes()); - let k = std::str::from_utf8(key_preimage.as_ref()) - .expect("saved jmt keys are utf-8 strings") - .to_string(); - - let key_hash = jmt::KeyHash::with::(k.as_bytes()); + let full_key = if substore_prefix.is_empty() { + substore_key.to_string() + } else { + format!("{substore_prefix}/{substore_key}").to_string() + }; let v = substore .get_jmt(key_hash)? .expect("keys in jmt_keys should have a corresponding value in jmt"); - tx_prefix_item.blocking_send(Ok((k, v)))?; + tx_prefix_item.blocking_send(Ok((full_key, v)))?; } anyhow::Ok(()) }) @@ -332,7 +337,6 @@ impl StateRead for Snapshot { let db = self.0.db.clone(); let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix); - tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore"); let version = self .substore_version(&config) @@ -357,12 +361,20 @@ impl StateRead for Snapshot { .rocksdb_snapshot .iterator_cf_opt(cf_jmt_keys, options, mode); + let substore_prefix = &substore.config.prefix; + for key_and_keyhash in iter { let (raw_preimage, _) = key_and_keyhash?; let preimage = std::str::from_utf8(raw_preimage.as_ref()) - .expect("saved jmt keys are utf-8 strings") - .to_string(); - tx_prefix_keys.blocking_send(Ok(preimage))?; + .expect("saved jmt keys are utf-8 strings"); + + let full_key = if substore_prefix.is_empty() { + preimage.to_string() + } else { + format!("{substore_prefix}/{preimage}").to_string() + }; + + tx_prefix_keys.blocking_send(Ok(full_key))?; } anyhow::Ok(()) }) @@ -403,9 +415,24 @@ impl StateRead for Snapshot { substore .rocksdb_snapshot .iterator_cf_opt(cf_nonverifiable, options, mode); + let substore_prefix = substore.config.prefix.as_bytes().to_vec(); for i in iter { - let (key, value) = i?; - tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?; + let (boxed_key, boxed_value) = i?; + let key: Vec = boxed_key.into(); + let value: Vec = boxed_value.into(); + + // Costly to do on every iteration, but this should be dwarfed by the + // context switch to the tokio runtime. + let mut full_key: Vec = vec![]; + if substore_prefix.is_empty() { + full_key.extend(key); + } else { + full_key.extend(substore_prefix.clone()); + full_key.extend(iter::once(b'/')); + full_key.extend(key); + } + + tx_prefix_query.blocking_send(Ok((full_key, value)))?; } anyhow::Ok(()) }) @@ -416,7 +443,7 @@ impl StateRead for Snapshot { /// Returns a stream of all key-value pairs with the given prefix, and range /// from nonverifiable storage. - /// TODO(erwan): For now this method only supports range queries over the main store. + /// **Important**: Only supports range queries over the main store. fn nonverifiable_range_raw( &self, prefix: Option<&[u8]>, diff --git a/crates/cnidarium/tests/substore_tests.rs b/crates/cnidarium/tests/substore_tests.rs index 1d44992146..f3ee5e4f83 100644 --- a/crates/cnidarium/tests/substore_tests.rs +++ b/crates/cnidarium/tests/substore_tests.rs @@ -266,7 +266,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_a/{key}"); if counter >= kv_a.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); panic!("prefix_keys query returned too many entries") @@ -287,7 +286,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_b/{key}"); if counter >= kv_b.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); @@ -380,7 +378,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_a/{key}"); if counter >= kv_a.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); panic!("prefix_keys query returned too many entries") @@ -401,7 +398,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> { let mut range = snapshot.prefix_keys(query_prefix); while let Some(res) = range.next().await { let key = res?; - let key = format!("prefix_b/{key}"); if counter >= kv_b.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); @@ -497,7 +493,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> { let (raw_key, raw_value) = res?; let key = String::from_utf8(raw_key)?; let value = String::from_utf8(raw_value)?; - let key = format!("prefix_a/{key}"); if counter >= kv_a.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); panic!("prefix_keys query returned too many entries") @@ -523,7 +518,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> { let (raw_key, raw_value) = res?; let key = String::from_utf8(raw_key)?; let value = String::from_utf8(raw_value)?; - let key = format!("prefix_b/{key}"); if counter >= kv_b.len() { tracing::debug!(?key, ?query_prefix, "unexpected key"); @@ -663,3 +657,95 @@ async fn test_substore_nv_range_queries_main_store() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +/// Minimal reproduction of the prefix range cache bug. +/// +/// Context: +/// `cnidarium`, our storage layer, supports prefix storage. +/// This allows users to configure independent storage units, each with +/// their own merkle tree, nonverifiable sidecar, and separate namespace. +/// Routing is done transparently without the user having to worry about +/// the details. +/// +/// Overview: +/// Prefix queries return tuples of (key, value)s, but instead of +/// returning the full key, they return the substore key. This is a layering +/// violation, and indeed causes a bug in the cache interleaving logic. +/// +/// Terminology: +/// - a `full_key`: a key that contains a substore prefix, a delimiter, and a substore key. +/// - a `substore_key`: a key with a stripped prefix. +/// +/// Walkthrough: +/// `StateDelta` index changes using full keys, as it is not aware of the +/// particular substore configuration that it is working against, by design. +/// As part of the cache interleaving logic, the `StateDetla` will try look for +/// new writes or covering deletions. However, since the base prefix implementation +/// returns substore keys, the cache will build an incoherence range and panic (or miss data). +async fn reproduction_bad_substore_cache_range() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + // We pick a friendly prefix with high lexicographic order to help + // with reproducing a "bad range" where the lower boundn is greater than + // the upper bound. + let substore_prefix = "zest".to_string(); + let substore_prefixes = vec![substore_prefix.clone()]; + let storage = Storage::load(db_path, substore_prefixes).await?; + + // Write some keys in the substore so that we can prefix range over something + let mut delta = StateDelta::new(storage.latest_snapshot()); + + let mut substore_kvs = vec![]; + + for i in 0..100 { + let k = format!("{}/key_{i:020}", substore_prefix); + let v = format!("value_{i}").as_bytes().to_vec(); + delta.put_raw(k.clone(), v.clone()); + substore_kvs.push(k) + } + + let _ = storage.commit(delta).await?; + let snapshot = storage.latest_snapshot(); + + // We can prefix range fine on a static snapshot. + let mut naive_prefix = snapshot.prefix_raw("zest/"); + // Track the number of prefix entries returned as a basic check. + let mut visited = vec![]; + while let Some(entry) = naive_prefix.next().await { + let (k, _) = entry?; + visited.push(k); + } + assert_eq!(visited, substore_kvs, "prefix query is missing keys"); + + // We established that we can do prefix range on a static snapshot. + // Now let's try on a no-op `StateDelta` + let mut delta = StateDelta::new(snapshot); + let mut clean_delta_prefix = delta.prefix_raw("zest/"); + let mut visited = vec![]; + while let Some(entry) = clean_delta_prefix.next().await { + let (k, _) = entry?; + visited.push(k); + } + assert_eq!(visited, substore_kvs, "prefix query is missing keys"); + + // It worked, finally let's try on a dirty delta. + delta.put_raw( + "zest/normal_key".to_string(), + "normal_value".as_bytes().to_vec(), + ); + let mut dirty_delta_prefix = delta.prefix_raw("zest/"); + let mut visited = vec![]; + // Cache interleaving logic will build a bad range and cause a panic. + // Check out `v0.77.3` or prior to see the panic. + while let Some(entry) = dirty_delta_prefix.next().await { + let (k, _) = entry?; + visited.push(k); + } + // Add the key we wrote to the substore. + substore_kvs.push("zest/normal_key".to_string()); + assert_eq!(visited, substore_kvs, "prefix query is missing keys"); + + Ok(()) +}