From c8ae98abcb370acbe9acad213284c9cdb542c0de Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Wed, 11 Oct 2023 03:05:06 +0000 Subject: [PATCH] headers-cache: Check storage changes --- standalone/headers-cache/src/grab.rs | 43 +++++++++++++++++-------- standalone/headers-cache/src/main.rs | 3 ++ standalone/headers-cache/src/web_api.rs | 14 +++++--- standalone/pherry/src/headers_cache.rs | 16 ++++----- standalone/pherry/src/lib.rs | 2 +- 5 files changed, 50 insertions(+), 28 deletions(-) diff --git a/standalone/headers-cache/src/grab.rs b/standalone/headers-cache/src/grab.rs index 9138d4efa9..8cef363023 100644 --- a/standalone/headers-cache/src/grab.rs +++ b/standalone/headers-cache/src/grab.rs @@ -190,6 +190,7 @@ impl<'c> Crawler<'c> { *next_delta, count, self.config.grab_storage_changes_batch, + !self.config.no_state_root, |info| { self.db .put_storage_changes(info.block_header.number, &info.encode()) @@ -222,7 +223,8 @@ impl<'c> Crawler<'c> { .min(relay_start + config.check_batch); if relay_start < relay_end { check_and_fix_headers(db, config, "relay", relay_start, Some(relay_end), None) - .await?; + .await + .context("Failed to check relay headers")?; metadata.checked.header = Some(relay_end); db.put_metadata(metadata) .context("Failed to update metadata")?; @@ -237,15 +239,17 @@ impl<'c> Crawler<'c> { .unwrap_or(0) .min(para_start + config.check_batch); if para_start < para_end { - check_and_fix_headers(db, config, "para", para_start, Some(para_end), None).await?; + check_and_fix_headers(db, config, "para", para_start, Some(para_end), None) + .await + .context("Failed to check para headers")?; metadata.checked.para_header = Some(para_end); db.put_metadata(metadata) .context("Failed to update metadata")?; } } - { - let changes_start = metadata.checked.storage_changes.unwrap_or(0); + if !config.no_state_root { + let changes_start = metadata.checked.storage_changes.unwrap_or(1); let max_checked_header = metadata.checked.para_header.unwrap_or_default(); let changes_end = metadata .recent_imported @@ -254,8 +258,16 @@ impl<'c> Crawler<'c> { .min(changes_start + config.check_batch) .min(max_checked_header); if changes_start < changes_end { - check_and_fix_storages_changes(db, config, changes_start, Some(changes_end), None) - .await?; + check_and_fix_storages_changes( + db, + Some(self.para_api.clone()), + config, + changes_start, + Some(changes_end), + None, + ) + .await + .context("Failed to check storage changes")?; metadata.checked.storage_changes = Some(changes_end); db.put_metadata(metadata) .context("Failed to update metadata")?; @@ -344,17 +356,24 @@ pub(crate) async fn check_and_fix_headers( pub(crate) async fn check_and_fix_storages_changes( db: &CacheDB, + api: Option, config: &Serve, from: BlockNumber, to: Option, count: Option, ) -> Result { + let api = match api { + Some(api) => api, + None => pherry::subxt_connect(&config.para_node_uri) + .await + .context(format!("Failed to connect to {}", config.para_node_uri))?, + }; let to = to.unwrap_or(from + count.unwrap_or(1)); info!("Checking storage changes from {from} to {to}"); let mut state_root_mismatches = 0_u32; for block in from..to { let header = db - .get_header(block) + .get_para_header(block) .ok_or(anyhow!("Header {block} not found"))?; let header = decode_header(&header)?; let changes = db @@ -363,12 +382,10 @@ pub(crate) async fn check_and_fix_storages_changes( let actual_root = decode_header(&changes) .map(|h| h.state_root) .unwrap_or_default(); - if header.state_root != actual_root { - info!("Storage changes {block} mismatch, trying to regrab"); - let para_api = pherry::subxt_connect(&config.para_node_uri) - .await - .context(format!("Failed to connect to {}", config.para_node_uri))?; - cache::grab_storage_changes(¶_api, header.number, 1, 1, |info| { + let expected_root = header.state_root; + if expected_root != actual_root { + info!("Storage changes {block} mismatch, expected={expected_root} actual={actual_root}, trying to regrab"); + cache::grab_storage_changes(&api, header.number, 1, 1, true, |info| { db.put_storage_changes(info.block_header.number, &info.encode()) .context("Failed to put record to DB")?; Ok(()) diff --git a/standalone/headers-cache/src/main.rs b/standalone/headers-cache/src/main.rs index e178c00820..8f588aba84 100644 --- a/standalone/headers-cache/src/main.rs +++ b/standalone/headers-cache/src/main.rs @@ -160,6 +160,9 @@ struct Serve { /// The max batch size to check headers #[clap(long, default_value_t = 100000)] check_batch: BlockNumber, + /// Don't check state root for each storage changes + #[clap(long)] + no_state_root: bool, } #[derive(Subcommand)] diff --git a/standalone/headers-cache/src/web_api.rs b/standalone/headers-cache/src/web_api.rs index 8635d9a9d2..ba0431bfff 100644 --- a/standalone/headers-cache/src/web_api.rs +++ b/standalone/headers-cache/src/web_api.rs @@ -212,10 +212,16 @@ async fn api_check_blocks( count: Option, ) -> Result { if chain == "state" { - let mismatches = - crate::grab::check_and_fix_storages_changes(&app.db, &app.config, from, to, count) - .await - .map_err(|e| e.to_string())?; + let mismatches = crate::grab::check_and_fix_storages_changes( + &app.db, + None, + &app.config, + from, + to, + count, + ) + .await + .map_err(|e| e.to_string())?; if mismatches == 0 { Ok("No mismatches".into()) } else { diff --git a/standalone/pherry/src/headers_cache.rs b/standalone/pherry/src/headers_cache.rs index 0ceb7a4abe..1cd762bc94 100644 --- a/standalone/pherry/src/headers_cache.rs +++ b/standalone/pherry/src/headers_cache.rs @@ -234,7 +234,7 @@ pub async fn grap_storage_changes_to_file( batch_size: BlockNumber, mut output: impl Write, ) -> Result { - grab_storage_changes(api, start_at, count, batch_size, |changes| { + grab_storage_changes(api, start_at, count, batch_size, true, |changes| { if changes.block_header.number % 1000 == 0 { info!("Got storage changes at {}", changes.block_header.number); } @@ -391,6 +391,7 @@ pub async fn grab_storage_changes( start_at: BlockNumber, count: BlockNumber, batch_size: BlockNumber, + with_root: bool, mut f: impl FnMut(BlockHeaderWithChanges) -> Result<()>, ) -> Result { if count == 0 { @@ -402,15 +403,10 @@ pub async fn grab_storage_changes( for from in (start_at..=to).step_by(batch_size as _) { let to = to.min(from.saturating_add(batch_size - 1)); - let headers = - match crate::fetch_storage_changes_with_root_or_not(api, None, from, to, true).await { - Err(e) if e.to_string().contains("not found") => { - break; - } - other => other?, - }; - for header in headers { - f(header)?; + let changes = + crate::fetch_storage_changes_with_root_or_not(api, None, from, to, with_root).await?; + for blk in changes { + f(blk)?; grabbed += 1; } } diff --git a/standalone/pherry/src/lib.rs b/standalone/pherry/src/lib.rs index 0db7a0f7fb..d18a52e257 100644 --- a/standalone/pherry/src/lib.rs +++ b/standalone/pherry/src/lib.rs @@ -333,7 +333,7 @@ pub async fn fetch_storage_changes_with_root_or_not( to: BlockNumber, with_root: bool, ) -> Result> { - log::info!("fetch_storage_changes ({from}-{to})"); + log::info!("fetch_storage_changes with_root={with_root}, ({from}-{to})"); if to < from { return Ok(vec![]); }