Skip to content

Commit

Permalink
cnidarium: sound prefix queries on substores (#4653)
Browse files Browse the repository at this point in the history
## Describe your changes

This PR contains a minimal reproduction for a bug in cnidarium's prefix
query handling. It also contains a sketch for a fix that we can
workshop. The bug was introduced in the original substore implementation
PR (#3131).

## Minimal reproduction of the prefix range cache bug.

**Context**
`cnidarium`, our storage layer, supports prefix storage.
This allows users to configure independent storage units, each with
their own merkle tree, nonverifiable sidecar, and separate namespace.
Routing is done transparently without the user having to worry about
the details.

**Overview**
Prefix queries return tuples of (key, value)s, but instead of
returning the full key, they return the substore key. This is a layering
violation, and indeed causes a bug in the cache interleaving logic.

**Terminology**
- a `full_key`: a key that contains a substore prefix, a delimiter, and
a substore key.
- a `substore_key`: a key with a stripped prefix.

**Walkthrough**
`StateDelta` index changes using full keys, as it is not aware of the
particular substore configuration that it is working against, by design.
As part of the cache interleaving logic, the `StateDetla` will try look
for
new writes or covering deletions. However, since the base prefix
implementation
returns substore keys, the cache will build an incoherence range and
panic (or miss data).

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

> Consensus breaking in the sense that the chain won't halt if we hit
this.
  • Loading branch information
erwanor committed Jun 24, 2024
1 parent fa8535c commit 67511d8
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 21 deletions.
4 changes: 2 additions & 2 deletions crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ async fn delete_empty_deleted_packet_commitments(
pin_mut!(stream);

while let Some(entry) = stream.next().await {
let (substore_key, value) = entry?;
let (key, value) = entry?;
if value.is_empty() {
delta.delete(format!("ibc-data/{substore_key}"));
delta.delete(key);
}
}

Expand Down
53 changes: 40 additions & 13 deletions crates/cnidarium/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::iter;
use std::{any::Any, sync::Arc};

use anyhow::Result;
Expand Down Expand Up @@ -271,6 +272,7 @@ impl StateRead for Snapshot {

let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");
let substore_prefix = config.prefix.clone();

let version = self
.substore_version(&config)
Expand Down Expand Up @@ -302,18 +304,21 @@ impl StateRead for Snapshot {
for tuple in jmt_keys_iterator {
// For each key that matches the prefix, fetch the value from the JMT column family.
let (key_preimage, _) = tuple?;
let substore_key = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings");
let key_hash = jmt::KeyHash::with::<sha2::Sha256>(substore_key.as_bytes());

let k = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();

let key_hash = jmt::KeyHash::with::<sha2::Sha256>(k.as_bytes());
let full_key = if substore_prefix.is_empty() {
substore_key.to_string()
} else {
format!("{substore_prefix}/{substore_key}").to_string()
};

let v = substore
.get_jmt(key_hash)?
.expect("keys in jmt_keys should have a corresponding value in jmt");

tx_prefix_item.blocking_send(Ok((k, v)))?;
tx_prefix_item.blocking_send(Ok((full_key, v)))?;
}
anyhow::Ok(())
})
Expand All @@ -332,7 +337,6 @@ impl StateRead for Snapshot {
let db = self.0.db.clone();

let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");

let version = self
.substore_version(&config)
Expand All @@ -357,12 +361,20 @@ impl StateRead for Snapshot {
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

let substore_prefix = &substore.config.prefix;

for key_and_keyhash in iter {
let (raw_preimage, _) = key_and_keyhash?;
let preimage = std::str::from_utf8(raw_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();
tx_prefix_keys.blocking_send(Ok(preimage))?;
.expect("saved jmt keys are utf-8 strings");

let full_key = if substore_prefix.is_empty() {
preimage.to_string()
} else {
format!("{substore_prefix}/{preimage}").to_string()
};

tx_prefix_keys.blocking_send(Ok(full_key))?;
}
anyhow::Ok(())
})
Expand Down Expand Up @@ -403,9 +415,24 @@ impl StateRead for Snapshot {
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);
let substore_prefix = substore.config.prefix.as_bytes().to_vec();
for i in iter {
let (key, value) = i?;
tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?;
let (boxed_key, boxed_value) = i?;
let key: Vec<u8> = boxed_key.into();
let value: Vec<u8> = boxed_value.into();

// Costly to do on every iteration, but this should be dwarfed by the
// context switch to the tokio runtime.
let mut full_key: Vec<u8> = vec![];
if substore_prefix.is_empty() {
full_key.extend(key);
} else {
full_key.extend(substore_prefix.clone());
full_key.extend(iter::once(b'/'));
full_key.extend(key);
}

tx_prefix_query.blocking_send(Ok((full_key, value)))?;
}
anyhow::Ok(())
})
Expand All @@ -416,7 +443,7 @@ impl StateRead for Snapshot {

/// Returns a stream of all key-value pairs with the given prefix, and range
/// from nonverifiable storage.
/// TODO(erwan): For now this method only supports range queries over the main store.
/// **Important**: Only supports range queries over the main store.
fn nonverifiable_range_raw(
&self,
prefix: Option<&[u8]>,
Expand Down
98 changes: 92 additions & 6 deletions crates/cnidarium/tests/substore_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_a/{key}");
if counter >= kv_a.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
panic!("prefix_keys query returned too many entries")
Expand All @@ -287,7 +286,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_b/{key}");

if counter >= kv_b.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
Expand Down Expand Up @@ -380,7 +378,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_a/{key}");
if counter >= kv_a.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
panic!("prefix_keys query returned too many entries")
Expand All @@ -401,7 +398,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_b/{key}");

if counter >= kv_b.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
Expand Down Expand Up @@ -497,7 +493,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> {
let (raw_key, raw_value) = res?;
let key = String::from_utf8(raw_key)?;
let value = String::from_utf8(raw_value)?;
let key = format!("prefix_a/{key}");
if counter >= kv_a.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
panic!("prefix_keys query returned too many entries")
Expand All @@ -523,7 +518,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> {
let (raw_key, raw_value) = res?;
let key = String::from_utf8(raw_key)?;
let value = String::from_utf8(raw_value)?;
let key = format!("prefix_b/{key}");

if counter >= kv_b.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
Expand Down Expand Up @@ -663,3 +657,95 @@ async fn test_substore_nv_range_queries_main_store() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test]
/// Minimal reproduction of the prefix range cache bug.
///
/// Context:
/// `cnidarium`, our storage layer, supports prefix storage.
/// This allows users to configure independent storage units, each with
/// their own merkle tree, nonverifiable sidecar, and separate namespace.
/// Routing is done transparently without the user having to worry about
/// the details.
///
/// Overview:
/// Prefix queries return tuples of (key, value)s, but instead of
/// returning the full key, they return the substore key. This is a layering
/// violation, and indeed causes a bug in the cache interleaving logic.
///
/// Terminology:
/// - a `full_key`: a key that contains a substore prefix, a delimiter, and a substore key.
/// - a `substore_key`: a key with a stripped prefix.
///
/// Walkthrough:
/// `StateDelta` index changes using full keys, as it is not aware of the
/// particular substore configuration that it is working against, by design.
/// As part of the cache interleaving logic, the `StateDetla` will try look for
/// new writes or covering deletions. However, since the base prefix implementation
/// returns substore keys, the cache will build an incoherence range and panic (or miss data).
async fn reproduction_bad_substore_cache_range() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmpdir = tempfile::tempdir()?;
let db_path = tmpdir.into_path();
// We pick a friendly prefix with high lexicographic order to help
// with reproducing a "bad range" where the lower boundn is greater than
// the upper bound.
let substore_prefix = "zest".to_string();
let substore_prefixes = vec![substore_prefix.clone()];
let storage = Storage::load(db_path, substore_prefixes).await?;

// Write some keys in the substore so that we can prefix range over something
let mut delta = StateDelta::new(storage.latest_snapshot());

let mut substore_kvs = vec![];

for i in 0..100 {
let k = format!("{}/key_{i:020}", substore_prefix);
let v = format!("value_{i}").as_bytes().to_vec();
delta.put_raw(k.clone(), v.clone());
substore_kvs.push(k)
}

let _ = storage.commit(delta).await?;
let snapshot = storage.latest_snapshot();

// We can prefix range fine on a static snapshot.
let mut naive_prefix = snapshot.prefix_raw("zest/");
// Track the number of prefix entries returned as a basic check.
let mut visited = vec![];
while let Some(entry) = naive_prefix.next().await {
let (k, _) = entry?;
visited.push(k);
}
assert_eq!(visited, substore_kvs, "prefix query is missing keys");

// We established that we can do prefix range on a static snapshot.
// Now let's try on a no-op `StateDelta`
let mut delta = StateDelta::new(snapshot);
let mut clean_delta_prefix = delta.prefix_raw("zest/");
let mut visited = vec![];
while let Some(entry) = clean_delta_prefix.next().await {
let (k, _) = entry?;
visited.push(k);
}
assert_eq!(visited, substore_kvs, "prefix query is missing keys");

// It worked, finally let's try on a dirty delta.
delta.put_raw(
"zest/normal_key".to_string(),
"normal_value".as_bytes().to_vec(),
);
let mut dirty_delta_prefix = delta.prefix_raw("zest/");
let mut visited = vec![];
// Cache interleaving logic will build a bad range and cause a panic.
// Check out `v0.77.3` or prior to see the panic.
while let Some(entry) = dirty_delta_prefix.next().await {
let (k, _) = entry?;
visited.push(k);
}
// Add the key we wrote to the substore.
substore_kvs.push("zest/normal_key".to_string());
assert_eq!(visited, substore_kvs, "prefix query is missing keys");

Ok(())
}

0 comments on commit 67511d8

Please sign in to comment.