diff --git a/crates/phaxt/src/rpc.rs b/crates/phaxt/src/rpc.rs index f4a42f4952..3339c2ea98 100644 --- a/crates/phaxt/src/rpc.rs +++ b/crates/phaxt/src/rpc.rs @@ -1,4 +1,4 @@ -use phala_node_rpc_ext_types::GetStorageChangesResponse; +use phala_node_rpc_ext_types::{GetStorageChangesResponse, GetStorageChangesResponseWithRoot}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::to_value as to_json_value; use subxt::{ @@ -42,6 +42,19 @@ impl<'a, T: Config> ExtraRpcClient<'a, T> { .map_err(Into::into) } + /// Query storage changes with root + pub async fn get_storage_changes_with_root( + &self, + from: &T::Hash, + to: &T::Hash, + ) -> Result { + let params = rpc_params![to_json_value(from)?, to_json_value(to)?]; + self.client + .request("pha_getStorageChangesWithRoot", params) + .await + .map_err(Into::into) + } + /// Returns the keys with prefix, leave empty to get all the keys pub async fn storage_pairs( &self, diff --git a/standalone/headers-cache/src/grab.rs b/standalone/headers-cache/src/grab.rs index 64adfa45bd..9138d4efa9 100644 --- a/standalone/headers-cache/src/grab.rs +++ b/standalone/headers-cache/src/grab.rs @@ -213,30 +213,53 @@ impl<'c> Crawler<'c> { let config = self.config; let metadata = &mut *self.metadata; - let relay_start = metadata.checked.header.unwrap_or(config.genesis_block); - let relay_end = metadata - .recent_imported - .header - .unwrap_or(0) - .min(relay_start + config.check_batch); - if relay_start < relay_end { - check_and_fix_headers(db, config, "relay", relay_start, Some(relay_end), None).await?; - metadata.checked.header = Some(relay_end); - db.put_metadata(metadata) - .context("Failed to update metadata")?; + { + let relay_start = metadata.checked.header.unwrap_or(config.genesis_block); + let relay_end = metadata + .recent_imported + .header + .unwrap_or(0) + .min(relay_start + config.check_batch); + if relay_start < relay_end { + check_and_fix_headers(db, config, "relay", relay_start, Some(relay_end), None) + .await?; + metadata.checked.header = Some(relay_end); + db.put_metadata(metadata) + .context("Failed to update metadata")?; + } } - let para_start = metadata.checked.para_header.unwrap_or(0); - let para_end = metadata - .recent_imported - .para_header - .unwrap_or(0) - .min(para_start + config.check_batch); - if para_start < para_end { - check_and_fix_headers(db, config, "para", para_start, Some(para_end), None).await?; - metadata.checked.para_header = Some(para_end); - db.put_metadata(metadata) - .context("Failed to update metadata")?; + { + let para_start = metadata.checked.para_header.unwrap_or(0); + let para_end = metadata + .recent_imported + .para_header + .unwrap_or(0) + .min(para_start + config.check_batch); + if para_start < para_end { + check_and_fix_headers(db, config, "para", para_start, Some(para_end), None).await?; + metadata.checked.para_header = Some(para_end); + db.put_metadata(metadata) + .context("Failed to update metadata")?; + } + } + + { + let changes_start = metadata.checked.storage_changes.unwrap_or(0); + let max_checked_header = metadata.checked.para_header.unwrap_or_default(); + let changes_end = metadata + .recent_imported + .storage_changes + .unwrap_or(0) + .min(changes_start + config.check_batch) + .min(max_checked_header); + if changes_start < changes_end { + check_and_fix_storages_changes(db, config, changes_start, Some(changes_end), None) + .await?; + metadata.checked.storage_changes = Some(changes_end); + db.put_metadata(metadata) + .context("Failed to update metadata")?; + } } Ok(()) } @@ -319,6 +342,45 @@ pub(crate) async fn check_and_fix_headers( Ok(response) } +pub(crate) async fn check_and_fix_storages_changes( + db: &CacheDB, + config: &Serve, + from: BlockNumber, + to: Option, + count: Option, +) -> Result { + let to = to.unwrap_or(from + count.unwrap_or(1)); + info!("Checking storage changes from {from} to {to}"); + let mut state_root_mismatches = 0_u32; + for block in from..to { + let header = db + .get_header(block) + .ok_or(anyhow!("Header {block} not found"))?; + let header = decode_header(&header)?; + let changes = db + .get_storage_changes(block) + .ok_or(anyhow!("Storage changes {block} not found"))?; + let actual_root = decode_header(&changes) + .map(|h| h.state_root) + .unwrap_or_default(); + if header.state_root != actual_root { + info!("Storage changes {block} mismatch, trying to regrab"); + let para_api = pherry::subxt_connect(&config.para_node_uri) + .await + .context(format!("Failed to connect to {}", config.para_node_uri))?; + cache::grab_storage_changes(¶_api, header.number, 1, 1, |info| { + db.put_storage_changes(info.block_header.number, &info.encode()) + .context("Failed to put record to DB")?; + Ok(()) + }) + .await + .context("Failed to grab storage changes from node")?; + state_root_mismatches += 1; + } + } + Ok(state_root_mismatches) +} + fn decode_header(data: &[u8]) -> Result
{ let header = Header::decode(&mut &data[..]).context("Failed to decode header")?; Ok(header) diff --git a/standalone/headers-cache/src/web_api.rs b/standalone/headers-cache/src/web_api.rs index 1c772e504a..8635d9a9d2 100644 --- a/standalone/headers-cache/src/web_api.rs +++ b/standalone/headers-cache/src/web_api.rs @@ -211,9 +211,21 @@ async fn api_check_blocks( to: Option, count: Option, ) -> Result { - crate::grab::check_and_fix_headers(&app.db, &app.config, chain, from, to, count) - .await - .map_err(|e| e.to_string()) + if chain == "state" { + let mismatches = + crate::grab::check_and_fix_storages_changes(&app.db, &app.config, from, to, count) + .await + .map_err(|e| e.to_string())?; + if mismatches == 0 { + Ok("No mismatches".into()) + } else { + Ok(format!("Mismatches: {:?}", mismatches)) + } + } else { + crate::grab::check_and_fix_headers(&app.db, &app.config, chain, from, to, count) + .await + .map_err(|e| e.to_string()) + } } pub(crate) async fn serve(db: CacheDB, config: ServeConfig, token: Option) -> Result<()> { diff --git a/standalone/pherry/src/headers_cache.rs b/standalone/pherry/src/headers_cache.rs index c89bd6733e..0ceb7a4abe 100644 --- a/standalone/pherry/src/headers_cache.rs +++ b/standalone/pherry/src/headers_cache.rs @@ -402,12 +402,13 @@ pub async fn grab_storage_changes( for from in (start_at..=to).step_by(batch_size as _) { let to = to.min(from.saturating_add(batch_size - 1)); - let headers = match crate::fetch_storage_changes(api, None, from, to).await { - Err(e) if e.to_string().contains("not found") => { - break; - } - other => other?, - }; + let headers = + match crate::fetch_storage_changes_with_root_or_not(api, None, from, to, true).await { + Err(e) if e.to_string().contains("not found") => { + break; + } + other => other?, + }; for header in headers { f(header)?; grabbed += 1; diff --git a/standalone/pherry/src/lib.rs b/standalone/pherry/src/lib.rs index dfb076d4c6..0db7a0f7fb 100644 --- a/standalone/pherry/src/lib.rs +++ b/standalone/pherry/src/lib.rs @@ -1,7 +1,10 @@ use anyhow::{anyhow, Context, Result}; use log::{debug, error, info, warn}; -use sp_core::crypto::AccountId32; +use phala_node_rpc_ext::MakeInto; +use phala_trie_storage::ser::StorageChanges; +use sp_core::{crypto::AccountId32, H256}; use std::cmp; +use std::convert::TryFrom; use std::str::FromStr; use std::time::Duration; use tokio::time::sleep; @@ -319,6 +322,16 @@ pub async fn fetch_storage_changes( cache: Option<&CacheClient>, from: BlockNumber, to: BlockNumber, +) -> Result> { + fetch_storage_changes_with_root_or_not(client, cache, from, to, false).await +} + +pub async fn fetch_storage_changes_with_root_or_not( + client: &RpcClient, + cache: Option<&CacheClient>, + from: BlockNumber, + to: BlockNumber, + with_root: bool, ) -> Result> { log::info!("fetch_storage_changes ({from}-{to})"); if to < from { @@ -336,21 +349,47 @@ pub async fn fetch_storage_changes( } let from_hash = get_header_hash(client, Some(from)).await?; let to_hash = get_header_hash(client, Some(to)).await?; - let storage_changes = chain_client::fetch_storage_changes(client, &from_hash, &to_hash) - .await? + + let changes = if with_root { + client + .extra_rpc() + .get_storage_changes_with_root(&from_hash, &to_hash) + .await? + .into_iter() + .map(|changes| { + Ok((changes.changes, { + let raw: [u8; 32] = TryFrom::try_from(&changes.state_root[..]) + .or(Err(anyhow!("Invalid state root")))?; + H256::from(raw) + })) + }) + .collect::>>()? + } else { + client + .extra_rpc() + .get_storage_changes(&from_hash, &to_hash) + .await? + .into_iter() + .map(|changes| (changes, Default::default())) + .collect::>() + }; + let storage_changes = changes .into_iter() .enumerate() - .map(|(offset, storage_changes)| { + .map(|(offset, (storage_changes, state_root))| { BlockHeaderWithChanges { // Headers are synced separately. Only the `number` is used in pRuntime while syncing blocks. block_header: BlockHeader { number: from + offset as BlockNumber, parent_hash: Default::default(), - state_root: Default::default(), + state_root, extrinsics_root: Default::default(), digest: Default::default(), }, - storage_changes, + storage_changes: StorageChanges { + main_storage_changes: storage_changes.main_storage_changes.into_(), + child_storage_changes: storage_changes.child_storage_changes.into_(), + }, } }) .collect(); diff --git a/standalone/pherry/src/prefetcher.rs b/standalone/pherry/src/prefetcher.rs index 22477e5ce4..87efa7b742 100644 --- a/standalone/pherry/src/prefetcher.rs +++ b/standalone/pherry/src/prefetcher.rs @@ -59,7 +59,8 @@ impl PrefetchClient { to: next_to, handle: tokio::spawn(async move { log::info!("prefetching ({next_from}-{next_to})"); - crate::fetch_storage_changes(&client, cache.as_ref(), next_from, next_to).await + crate::fetch_storage_changes(&client, cache.as_ref(), next_from, next_to) + .await }), }); Ok(result)