Skip to content

Commit

Permalink
headers-cache: Check storage changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kvinwang committed Oct 12, 2023
1 parent afa0bbd commit c8ae98a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
43 changes: 30 additions & 13 deletions standalone/headers-cache/src/grab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl<'c> Crawler<'c> {
*next_delta,
count,
self.config.grab_storage_changes_batch,
!self.config.no_state_root,
|info| {
self.db
.put_storage_changes(info.block_header.number, &info.encode())
Expand Down Expand Up @@ -222,7 +223,8 @@ impl<'c> Crawler<'c> {
.min(relay_start + config.check_batch);
if relay_start < relay_end {
check_and_fix_headers(db, config, "relay", relay_start, Some(relay_end), None)
.await?;
.await
.context("Failed to check relay headers")?;
metadata.checked.header = Some(relay_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
Expand All @@ -237,15 +239,17 @@ impl<'c> Crawler<'c> {
.unwrap_or(0)
.min(para_start + config.check_batch);
if para_start < para_end {
check_and_fix_headers(db, config, "para", para_start, Some(para_end), None).await?;
check_and_fix_headers(db, config, "para", para_start, Some(para_end), None)
.await
.context("Failed to check para headers")?;
metadata.checked.para_header = Some(para_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
}
}

{
let changes_start = metadata.checked.storage_changes.unwrap_or(0);
if !config.no_state_root {
let changes_start = metadata.checked.storage_changes.unwrap_or(1);
let max_checked_header = metadata.checked.para_header.unwrap_or_default();
let changes_end = metadata
.recent_imported
Expand All @@ -254,8 +258,16 @@ impl<'c> Crawler<'c> {
.min(changes_start + config.check_batch)
.min(max_checked_header);
if changes_start < changes_end {
check_and_fix_storages_changes(db, config, changes_start, Some(changes_end), None)
.await?;
check_and_fix_storages_changes(
db,
Some(self.para_api.clone()),
config,
changes_start,
Some(changes_end),
None,
)
.await
.context("Failed to check storage changes")?;
metadata.checked.storage_changes = Some(changes_end);
db.put_metadata(metadata)
.context("Failed to update metadata")?;
Expand Down Expand Up @@ -344,17 +356,24 @@ pub(crate) async fn check_and_fix_headers(

pub(crate) async fn check_and_fix_storages_changes(
db: &CacheDB,
api: Option<ChainApi>,
config: &Serve,
from: BlockNumber,
to: Option<BlockNumber>,
count: Option<BlockNumber>,
) -> Result<u32> {
let api = match api {
Some(api) => api,
None => pherry::subxt_connect(&config.para_node_uri)
.await
.context(format!("Failed to connect to {}", config.para_node_uri))?,
};
let to = to.unwrap_or(from + count.unwrap_or(1));
info!("Checking storage changes from {from} to {to}");
let mut state_root_mismatches = 0_u32;
for block in from..to {
let header = db
.get_header(block)
.get_para_header(block)
.ok_or(anyhow!("Header {block} not found"))?;
let header = decode_header(&header)?;
let changes = db
Expand All @@ -363,12 +382,10 @@ pub(crate) async fn check_and_fix_storages_changes(
let actual_root = decode_header(&changes)
.map(|h| h.state_root)
.unwrap_or_default();
if header.state_root != actual_root {
info!("Storage changes {block} mismatch, trying to regrab");
let para_api = pherry::subxt_connect(&config.para_node_uri)
.await
.context(format!("Failed to connect to {}", config.para_node_uri))?;
cache::grab_storage_changes(&para_api, header.number, 1, 1, |info| {
let expected_root = header.state_root;
if expected_root != actual_root {
info!("Storage changes {block} mismatch, expected={expected_root} actual={actual_root}, trying to regrab");
cache::grab_storage_changes(&api, header.number, 1, 1, true, |info| {
db.put_storage_changes(info.block_header.number, &info.encode())
.context("Failed to put record to DB")?;
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions standalone/headers-cache/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ struct Serve {
/// The max batch size to check headers
#[clap(long, default_value_t = 100000)]
check_batch: BlockNumber,
/// Don't check state root for each storage changes
#[clap(long)]
no_state_root: bool,
}

#[derive(Subcommand)]
Expand Down
14 changes: 10 additions & 4 deletions standalone/headers-cache/src/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,16 @@ async fn api_check_blocks(
count: Option<BlockNumber>,
) -> Result<String, String> {
if chain == "state" {
let mismatches =
crate::grab::check_and_fix_storages_changes(&app.db, &app.config, from, to, count)
.await
.map_err(|e| e.to_string())?;
let mismatches = crate::grab::check_and_fix_storages_changes(
&app.db,
None,
&app.config,
from,
to,
count,
)
.await
.map_err(|e| e.to_string())?;
if mismatches == 0 {
Ok("No mismatches".into())
} else {
Expand Down
16 changes: 6 additions & 10 deletions standalone/pherry/src/headers_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub async fn grap_storage_changes_to_file(
batch_size: BlockNumber,
mut output: impl Write,
) -> Result<BlockNumber> {
grab_storage_changes(api, start_at, count, batch_size, |changes| {
grab_storage_changes(api, start_at, count, batch_size, true, |changes| {
if changes.block_header.number % 1000 == 0 {
info!("Got storage changes at {}", changes.block_header.number);
}
Expand Down Expand Up @@ -391,6 +391,7 @@ pub async fn grab_storage_changes(
start_at: BlockNumber,
count: BlockNumber,
batch_size: BlockNumber,
with_root: bool,
mut f: impl FnMut(BlockHeaderWithChanges) -> Result<()>,
) -> Result<BlockNumber> {
if count == 0 {
Expand All @@ -402,15 +403,10 @@ pub async fn grab_storage_changes(

for from in (start_at..=to).step_by(batch_size as _) {
let to = to.min(from.saturating_add(batch_size - 1));
let headers =
match crate::fetch_storage_changes_with_root_or_not(api, None, from, to, true).await {
Err(e) if e.to_string().contains("not found") => {
break;
}
other => other?,
};
for header in headers {
f(header)?;
let changes =
crate::fetch_storage_changes_with_root_or_not(api, None, from, to, with_root).await?;
for blk in changes {
f(blk)?;
grabbed += 1;
}
}
Expand Down
2 changes: 1 addition & 1 deletion standalone/pherry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pub async fn fetch_storage_changes_with_root_or_not(
to: BlockNumber,
with_root: bool,
) -> Result<Vec<BlockHeaderWithChanges>> {
log::info!("fetch_storage_changes ({from}-{to})");
log::info!("fetch_storage_changes with_root={with_root}, ({from}-{to})");
if to < from {
return Ok(vec![]);
}
Expand Down

0 comments on commit c8ae98a

Please sign in to comment.