Skip to content

Commit

Permalink
headers-cache: Store state root of changes and check it against header
Browse files Browse the repository at this point in the history
  • Loading branch information
kvinwang committed Oct 12, 2023
1 parent 9c609bc commit afa0bbd
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 39 deletions.
15 changes: 14 additions & 1 deletion crates/phaxt/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use phala_node_rpc_ext_types::GetStorageChangesResponse;
use phala_node_rpc_ext_types::{GetStorageChangesResponse, GetStorageChangesResponseWithRoot};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::to_value as to_json_value;
use subxt::{
Expand Down Expand Up @@ -42,6 +42,19 @@ impl<'a, T: Config> ExtraRpcClient<'a, T> {
.map_err(Into::into)
}

/// Query storage changes with root
pub async fn get_storage_changes_with_root(
&self,
from: &T::Hash,
to: &T::Hash,
) -> Result<GetStorageChangesResponseWithRoot, Error> {
let params = rpc_params![to_json_value(from)?, to_json_value(to)?];
self.client
.request("pha_getStorageChangesWithRoot", params)
.await
.map_err(Into::into)
}

/// Returns the keys with prefix, leave empty to get all the keys
pub async fn storage_pairs(
&self,
Expand Down
106 changes: 84 additions & 22 deletions standalone/headers-cache/src/grab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,53 @@ impl<'c> Crawler<'c> {
let config = self.config;
let metadata = &mut *self.metadata;

let relay_start = metadata.checked.header.unwrap_or(config.genesis_block);
let relay_end = metadata
.recent_imported
.header
.unwrap_or(0)
.min(relay_start + config.check_batch);
if relay_start < relay_end {
check_and_fix_headers(db, config, "relay", relay_start, Some(relay_end), None).await?;
metadata.checked.header = Some(relay_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
{
let relay_start = metadata.checked.header.unwrap_or(config.genesis_block);
let relay_end = metadata
.recent_imported
.header
.unwrap_or(0)
.min(relay_start + config.check_batch);
if relay_start < relay_end {
check_and_fix_headers(db, config, "relay", relay_start, Some(relay_end), None)
.await?;
metadata.checked.header = Some(relay_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
}
}

let para_start = metadata.checked.para_header.unwrap_or(0);
let para_end = metadata
.recent_imported
.para_header
.unwrap_or(0)
.min(para_start + config.check_batch);
if para_start < para_end {
check_and_fix_headers(db, config, "para", para_start, Some(para_end), None).await?;
metadata.checked.para_header = Some(para_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
{
let para_start = metadata.checked.para_header.unwrap_or(0);
let para_end = metadata
.recent_imported
.para_header
.unwrap_or(0)
.min(para_start + config.check_batch);
if para_start < para_end {
check_and_fix_headers(db, config, "para", para_start, Some(para_end), None).await?;
metadata.checked.para_header = Some(para_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
}
}

{
let changes_start = metadata.checked.storage_changes.unwrap_or(0);
let max_checked_header = metadata.checked.para_header.unwrap_or_default();
let changes_end = metadata
.recent_imported
.storage_changes
.unwrap_or(0)
.min(changes_start + config.check_batch)
.min(max_checked_header);
if changes_start < changes_end {
check_and_fix_storages_changes(db, config, changes_start, Some(changes_end), None)
.await?;
metadata.checked.storage_changes = Some(changes_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
}
}
Ok(())
}
Expand Down Expand Up @@ -319,6 +342,45 @@ pub(crate) async fn check_and_fix_headers(
Ok(response)
}

pub(crate) async fn check_and_fix_storages_changes(
db: &CacheDB,
config: &Serve,
from: BlockNumber,
to: Option<BlockNumber>,
count: Option<BlockNumber>,
) -> Result<u32> {
let to = to.unwrap_or(from + count.unwrap_or(1));
info!("Checking storage changes from {from} to {to}");
let mut state_root_mismatches = 0_u32;
for block in from..to {
let header = db
.get_header(block)
.ok_or(anyhow!("Header {block} not found"))?;
let header = decode_header(&header)?;
let changes = db
.get_storage_changes(block)
.ok_or(anyhow!("Storage changes {block} not found"))?;
let actual_root = decode_header(&changes)
.map(|h| h.state_root)
.unwrap_or_default();
if header.state_root != actual_root {
info!("Storage changes {block} mismatch, trying to regrab");
let para_api = pherry::subxt_connect(&config.para_node_uri)
.await
.context(format!("Failed to connect to {}", config.para_node_uri))?;
cache::grab_storage_changes(&para_api, header.number, 1, 1, |info| {
db.put_storage_changes(info.block_header.number, &info.encode())
.context("Failed to put record to DB")?;
Ok(())
})
.await
.context("Failed to grab storage changes from node")?;
state_root_mismatches += 1;
}
}
Ok(state_root_mismatches)
}

fn decode_header(data: &[u8]) -> Result<Header> {
let header = Header::decode(&mut &data[..]).context("Failed to decode header")?;
Ok(header)
Expand Down
18 changes: 15 additions & 3 deletions standalone/headers-cache/src/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,21 @@ async fn api_check_blocks(
to: Option<BlockNumber>,
count: Option<BlockNumber>,
) -> Result<String, String> {
crate::grab::check_and_fix_headers(&app.db, &app.config, chain, from, to, count)
.await
.map_err(|e| e.to_string())
if chain == "state" {
let mismatches =
crate::grab::check_and_fix_storages_changes(&app.db, &app.config, from, to, count)
.await
.map_err(|e| e.to_string())?;
if mismatches == 0 {
Ok("No mismatches".into())
} else {
Ok(format!("Mismatches: {:?}", mismatches))
}
} else {
crate::grab::check_and_fix_headers(&app.db, &app.config, chain, from, to, count)
.await
.map_err(|e| e.to_string())
}
}

pub(crate) async fn serve(db: CacheDB, config: ServeConfig, token: Option<String>) -> Result<()> {
Expand Down
13 changes: 7 additions & 6 deletions standalone/pherry/src/headers_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,13 @@ pub async fn grab_storage_changes(

for from in (start_at..=to).step_by(batch_size as _) {
let to = to.min(from.saturating_add(batch_size - 1));
let headers = match crate::fetch_storage_changes(api, None, from, to).await {
Err(e) if e.to_string().contains("not found") => {
break;
}
other => other?,
};
let headers =
match crate::fetch_storage_changes_with_root_or_not(api, None, from, to, true).await {
Err(e) if e.to_string().contains("not found") => {
break;
}
other => other?,
};
for header in headers {
f(header)?;
grabbed += 1;
Expand Down
51 changes: 45 additions & 6 deletions standalone/pherry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use anyhow::{anyhow, Context, Result};
use log::{debug, error, info, warn};
use sp_core::crypto::AccountId32;
use phala_node_rpc_ext::MakeInto;
use phala_trie_storage::ser::StorageChanges;
use sp_core::{crypto::AccountId32, H256};
use std::cmp;
use std::convert::TryFrom;
use std::str::FromStr;
use std::time::Duration;
use tokio::time::sleep;
Expand Down Expand Up @@ -319,6 +322,16 @@ pub async fn fetch_storage_changes(
cache: Option<&CacheClient>,
from: BlockNumber,
to: BlockNumber,
) -> Result<Vec<BlockHeaderWithChanges>> {
fetch_storage_changes_with_root_or_not(client, cache, from, to, false).await
}

pub async fn fetch_storage_changes_with_root_or_not(
client: &RpcClient,
cache: Option<&CacheClient>,
from: BlockNumber,
to: BlockNumber,
with_root: bool,
) -> Result<Vec<BlockHeaderWithChanges>> {
log::info!("fetch_storage_changes ({from}-{to})");
if to < from {
Expand All @@ -336,21 +349,47 @@ pub async fn fetch_storage_changes(
}
let from_hash = get_header_hash(client, Some(from)).await?;
let to_hash = get_header_hash(client, Some(to)).await?;
let storage_changes = chain_client::fetch_storage_changes(client, &from_hash, &to_hash)
.await?

let changes = if with_root {
client
.extra_rpc()
.get_storage_changes_with_root(&from_hash, &to_hash)
.await?
.into_iter()
.map(|changes| {
Ok((changes.changes, {
let raw: [u8; 32] = TryFrom::try_from(&changes.state_root[..])
.or(Err(anyhow!("Invalid state root")))?;
H256::from(raw)
}))
})
.collect::<Result<Vec<_>>>()?
} else {
client
.extra_rpc()
.get_storage_changes(&from_hash, &to_hash)
.await?
.into_iter()
.map(|changes| (changes, Default::default()))
.collect::<Vec<_>>()
};
let storage_changes = changes
.into_iter()
.enumerate()
.map(|(offset, storage_changes)| {
.map(|(offset, (storage_changes, state_root))| {
BlockHeaderWithChanges {
// Headers are synced separately. Only the `number` is used in pRuntime while syncing blocks.
block_header: BlockHeader {
number: from + offset as BlockNumber,
parent_hash: Default::default(),
state_root: Default::default(),
state_root,
extrinsics_root: Default::default(),
digest: Default::default(),
},
storage_changes,
storage_changes: StorageChanges {
main_storage_changes: storage_changes.main_storage_changes.into_(),
child_storage_changes: storage_changes.child_storage_changes.into_(),
},
}
})
.collect();
Expand Down
3 changes: 2 additions & 1 deletion standalone/pherry/src/prefetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ impl PrefetchClient {
to: next_to,
handle: tokio::spawn(async move {
log::info!("prefetching ({next_from}-{next_to})");
crate::fetch_storage_changes(&client, cache.as_ref(), next_from, next_to).await
crate::fetch_storage_changes(&client, cache.as_ref(), next_from, next_to)
.await
}),
});
Ok(result)
Expand Down

0 comments on commit afa0bbd

Please sign in to comment.