diff --git a/graph/src/components/metrics/block_state.rs b/graph/src/components/metrics/block_state.rs index 5d813894665..e46683638a9 100644 --- a/graph/src/components/metrics/block_state.rs +++ b/graph/src/components/metrics/block_state.rs @@ -17,7 +17,7 @@ use crate::{ util::cache_weight::CacheWeight, }; -#[derive(Debug)] +#[derive(Default, Debug)] pub struct BlockStateMetrics { pub gas_counter: HashMap, pub op_counter: HashMap, @@ -201,11 +201,16 @@ impl BlockStateMetrics { } } - pub fn track_storage_size_change(&mut self, entity_type: &EntityType, entity: &Entity, is_removal: bool) { + pub fn track_storage_size_change( + &mut self, + entity_type: &EntityType, + entity: &Entity, + is_removal: bool, + ) { if ENV_VARS.enable_dips_metrics { let key = CounterKey::Entity(entity_type.clone(), entity.id()); let size = entity.weight() as u64; - + let storage = self.current_storage_size.entry(key).or_insert(0); if is_removal { *storage = storage.saturating_sub(size); @@ -215,7 +220,12 @@ impl BlockStateMetrics { } } - pub fn track_storage_size_change_batch(&mut self, entity_type: &EntityType, entities: &[Entity], is_removal: bool) { + pub fn track_storage_size_change_batch( + &mut self, + entity_type: &EntityType, + entities: &[Entity], + is_removal: bool, + ) { if ENV_VARS.enable_dips_metrics { for entity in entities { self.track_storage_size_change(entity_type, entity, is_removal); diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 7811d472dc8..d382864d078 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -4,6 +4,7 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, cheap_clone::CheapClone, + components::metrics::block_state::BlockStateMetrics, components::subgraph::Entity, constraint_violation, data::{store::Id, subgraph::schema::SubgraphError}, @@ -498,27 +499,28 @@ impl RowGroup { pub fn track_metrics(&self, metrics: &mut BlockStateMetrics) { // Track entity count changes - let changes: Vec = self.rows.iter() + let changes: Vec = self + .rows + .iter() .map(|row| row.entity_count_change()) .collect(); metrics.track_entity_count_change_batch(&self.entity_type, &changes); - // Track storage changes and writes - let (writes, removals): (Vec<_>, Vec<_>) = self.rows.iter() + // Track writes only + let writes: Vec = self + .rows + .iter() .filter_map(|row| match row { - EntityModification::Insert { data, .. } | - EntityModification::Overwrite { data, .. } => Some((data, false)), + EntityModification::Insert { data, .. } + | EntityModification::Overwrite { data, .. } => Some(data.as_ref().clone()), EntityModification::Remove { .. } => None, }) - .unzip(); + .collect(); if !writes.is_empty() { metrics.track_entity_write_batch(&self.entity_type, &writes); metrics.track_storage_size_change_batch(&self.entity_type, &writes, false); } - if !removals.is_empty() { - metrics.track_storage_size_change_batch(&self.entity_type, &removals, true); - } } } @@ -704,6 +706,8 @@ impl Batch { let mut mods = RowGroups::new(); + let mut metrics = BlockStateMetrics::default(); + for m in raw_mods { mods.group_entry(&m.key().entity_type).push(m, block)?; } diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index f95ce08a2f7..978f07b1dd5 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -33,6 +33,8 @@ use crate::{error::DeterminismLevel, module::IntoTrap}; use super::module::WasmInstanceData; +use graph::schema::EntityKey; + fn write_poi_event( proof_of_indexing: &SharedProofOfIndexing, poi_event: &ProofOfIndexingEvent, @@ -350,9 +352,16 @@ impl HostExports { state.metrics.track_entity_write(&entity_type, &entity); - state.metrics.track_storage_size_change(&entity_type, &entity, false); - - if !state.entity_cache.contains_key(&key) { + state + .metrics + .track_storage_size_change(&entity_type, &entity, false); + + if state + .entity_cache + .get(&key, GetScope::Store) + .map_err(|e| HostExportError::Deterministic(e.into()))? + .is_none() + { state.metrics.track_entity_count_change(&entity_type, 1); } @@ -364,7 +373,7 @@ impl HostExports { } pub(crate) fn store_remove( - &self, + &mut self, logger: &Logger, state: &mut BlockState, proof_of_indexing: &SharedProofOfIndexing, @@ -394,13 +403,7 @@ impl HostExports { "store_remove", )?; - if let Some(entity) = state.entity_cache.get(&key, GetScope::Store)? { - state.metrics.track_storage_size_change(&entity_type, &entity, true); - - state.metrics.track_entity_count_change(&entity_type, -1); - } - - state.entity_cache.remove(key); + self.remove_entity(&key, state)?; Ok(()) } @@ -1245,6 +1248,28 @@ impl HostExports { .map(|mut tokens| tokens.pop().unwrap()) .context("Failed to decode") } + + fn remove_entity( + &mut self, + key: &EntityKey, + state: &mut BlockState, + ) -> Result<(), HostExportError> { + let entity_type = key.entity_type.clone(); + + if let Some(entity) = state + .entity_cache + .get(key, GetScope::Store) + .map_err(|e| HostExportError::Deterministic(e.into()))? + { + state + .metrics + .track_storage_size_change(&entity_type, &entity, true); + state.metrics.track_entity_count_change(&entity_type, -1); + } + + state.entity_cache.remove(key.clone()); + Ok(()) + } } fn string_to_h160(string: &str) -> Result {