Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cnidarium: prefix queries over substores are hazardous #4653

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ async fn delete_empty_deleted_packet_commitments(
pin_mut!(stream);

while let Some(entry) = stream.next().await {
let (substore_key, value) = entry?;
let (key, value) = entry?;
if value.is_empty() {
delta.delete(format!("ibc-data/{substore_key}"));
delta.delete(key);
}
}

Expand Down
53 changes: 40 additions & 13 deletions crates/cnidarium/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::iter;
use std::{any::Any, sync::Arc};

use anyhow::Result;
Expand Down Expand Up @@ -271,6 +272,7 @@ impl StateRead for Snapshot {

let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");
let substore_prefix = config.prefix.clone();

let version = self
.substore_version(&config)
Expand Down Expand Up @@ -302,18 +304,21 @@ impl StateRead for Snapshot {
for tuple in jmt_keys_iterator {
// For each key that matches the prefix, fetch the value from the JMT column family.
let (key_preimage, _) = tuple?;
let substore_key = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings");
let key_hash = jmt::KeyHash::with::<sha2::Sha256>(substore_key.as_bytes());

let k = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();

let key_hash = jmt::KeyHash::with::<sha2::Sha256>(k.as_bytes());
let full_key = if substore_prefix.is_empty() {
substore_key.to_string()
} else {
format!("{substore_prefix}/{substore_key}").to_string()
};

let v = substore
.get_jmt(key_hash)?
.expect("keys in jmt_keys should have a corresponding value in jmt");

tx_prefix_item.blocking_send(Ok((k, v)))?;
tx_prefix_item.blocking_send(Ok((full_key, v)))?;
}
anyhow::Ok(())
})
Expand All @@ -332,7 +337,6 @@ impl StateRead for Snapshot {
let db = self.0.db.clone();

let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");

let version = self
.substore_version(&config)
Expand All @@ -357,12 +361,20 @@ impl StateRead for Snapshot {
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

let substore_prefix = &substore.config.prefix;

for key_and_keyhash in iter {
let (raw_preimage, _) = key_and_keyhash?;
let preimage = std::str::from_utf8(raw_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();
tx_prefix_keys.blocking_send(Ok(preimage))?;
.expect("saved jmt keys are utf-8 strings");

let full_key = if substore_prefix.is_empty() {
preimage.to_string()
} else {
format!("{substore_prefix}/{preimage}").to_string()
};

tx_prefix_keys.blocking_send(Ok(full_key))?;
}
anyhow::Ok(())
})
Expand Down Expand Up @@ -403,9 +415,24 @@ impl StateRead for Snapshot {
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);
let substore_prefix = substore.config.prefix.as_bytes().to_vec();
for i in iter {
let (key, value) = i?;
tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?;
let (boxed_key, boxed_value) = i?;
let key: Vec<u8> = boxed_key.into();
let value: Vec<u8> = boxed_value.into();

// Costly to do on every iteration, but this should be dwarfed by the
// context switch to the tokio runtime.
let mut full_key: Vec<u8> = vec![];
if substore_prefix.is_empty() {
full_key.extend(key);
} else {
full_key.extend(substore_prefix.clone());
full_key.extend(iter::once(b'/'));
full_key.extend(key);
}

tx_prefix_query.blocking_send(Ok((full_key, value)))?;
}
anyhow::Ok(())
})
Expand All @@ -416,7 +443,7 @@ impl StateRead for Snapshot {

/// Returns a stream of all key-value pairs with the given prefix, and range
/// from nonverifiable storage.
/// TODO(erwan): For now this method only supports range queries over the main store.
/// **Important**: Only supports range queries over the main store.
fn nonverifiable_range_raw(
&self,
prefix: Option<&[u8]>,
Expand Down
98 changes: 92 additions & 6 deletions crates/cnidarium/tests/substore_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_a/{key}");
if counter >= kv_a.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
panic!("prefix_keys query returned too many entries")
Expand All @@ -287,7 +286,6 @@ async fn test_substore_prefix_queries() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_b/{key}");

if counter >= kv_b.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
Expand Down Expand Up @@ -380,7 +378,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_a/{key}");
if counter >= kv_a.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
panic!("prefix_keys query returned too many entries")
Expand All @@ -401,7 +398,6 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> {
let mut range = snapshot.prefix_keys(query_prefix);
while let Some(res) = range.next().await {
let key = res?;
let key = format!("prefix_b/{key}");

if counter >= kv_b.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
Expand Down Expand Up @@ -497,7 +493,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> {
let (raw_key, raw_value) = res?;
let key = String::from_utf8(raw_key)?;
let value = String::from_utf8(raw_value)?;
let key = format!("prefix_a/{key}");
if counter >= kv_a.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
panic!("prefix_keys query returned too many entries")
Expand All @@ -523,7 +518,6 @@ async fn test_substore_nv_prefix() -> anyhow::Result<()> {
let (raw_key, raw_value) = res?;
let key = String::from_utf8(raw_key)?;
let value = String::from_utf8(raw_value)?;
let key = format!("prefix_b/{key}");

if counter >= kv_b.len() {
tracing::debug!(?key, ?query_prefix, "unexpected key");
Expand Down Expand Up @@ -663,3 +657,95 @@ async fn test_substore_nv_range_queries_main_store() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test]
/// Minimal reproduction of the prefix range cache bug.
///
/// Context:
/// `cnidarium`, our storage layer, supports prefix storage.
/// This allows users to configure independent storage units, each with
/// their own merkle tree, nonverifiable sidecar, and separate namespace.
/// Routing is done transparently without the user having to worry about
/// the details.
///
/// Overview:
/// Prefix queries return tuples of (key, value)s, but instead of
/// returning the full key, they return the substore key. This is a layering
/// violation, and indeed causes a bug in the cache interleaving logic.
///
/// Terminology:
/// - a `full_key`: a key that contains a substore prefix, a delimiter, and a substore key.
/// - a `substore_key`: a key with a stripped prefix.
///
/// Walkthrough:
/// `StateDelta` index changes using full keys, as it is not aware of the
/// particular substore configuration that it is working against, by design.
/// As part of the cache interleaving logic, the `StateDetla` will try look for
/// new writes or covering deletions. However, since the base prefix implementation
/// returns substore keys, the cache will build an incoherence range and panic (or miss data).
async fn reproduction_bad_substore_cache_range() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmpdir = tempfile::tempdir()?;
let db_path = tmpdir.into_path();
// We pick a friendly prefix with high lexicographic order to help
// with reproducing a "bad range" where the lower boundn is greater than
// the upper bound.
let substore_prefix = "zest".to_string();
let substore_prefixes = vec![substore_prefix.clone()];
let storage = Storage::load(db_path, substore_prefixes).await?;

// Write some keys in the substore so that we can prefix range over something
let mut delta = StateDelta::new(storage.latest_snapshot());

let mut substore_kvs = vec![];

for i in 0..100 {
let k = format!("{}/key_{i:020}", substore_prefix);
let v = format!("value_{i}").as_bytes().to_vec();
delta.put_raw(k.clone(), v.clone());
substore_kvs.push(k)
}

let _ = storage.commit(delta).await?;
let snapshot = storage.latest_snapshot();

// We can prefix range fine on a static snapshot.
let mut naive_prefix = snapshot.prefix_raw("zest/");
// Track the number of prefix entries returned as a basic check.
let mut visited = vec![];
while let Some(entry) = naive_prefix.next().await {
let (k, _) = entry?;
visited.push(k);
}
assert_eq!(visited, substore_kvs, "prefix query is missing keys");

// We established that we can do prefix range on a static snapshot.
// Now let's try on a no-op `StateDelta`
let mut delta = StateDelta::new(snapshot);
let mut clean_delta_prefix = delta.prefix_raw("zest/");
let mut visited = vec![];
while let Some(entry) = clean_delta_prefix.next().await {
let (k, _) = entry?;
visited.push(k);
}
assert_eq!(visited, substore_kvs, "prefix query is missing keys");

// It worked, finally let's try on a dirty delta.
delta.put_raw(
"zest/normal_key".to_string(),
"normal_value".as_bytes().to_vec(),
);
let mut dirty_delta_prefix = delta.prefix_raw("zest/");
let mut visited = vec![];
// Cache interleaving logic will build a bad range and cause a panic.
// Check out `v0.77.3` or prior to see the panic.
while let Some(entry) = dirty_delta_prefix.next().await {
let (k, _) = entry?;
visited.push(k);
}
// Add the key we wrote to the substore.
substore_kvs.push("zest/normal_key".to_string());
assert_eq!(visited, substore_kvs, "prefix query is missing keys");

Ok(())
}
Loading