Skip to content

Commit

Permalink
graph: ensure removal paths go through remove_entity
Browse files Browse the repository at this point in the history
address CI failures
run format check
  • Loading branch information
MoonBoi9001 committed Jan 27, 2025
1 parent 3a73cf2 commit ef663d1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 24 deletions.
18 changes: 14 additions & 4 deletions graph/src/components/metrics/block_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
util::cache_weight::CacheWeight,
};

#[derive(Debug)]
#[derive(Default, Debug)]
pub struct BlockStateMetrics {
pub gas_counter: HashMap<CounterKey, u64>,
pub op_counter: HashMap<CounterKey, u64>,
Expand Down Expand Up @@ -201,11 +201,16 @@ impl BlockStateMetrics {
}
}

pub fn track_storage_size_change(&mut self, entity_type: &EntityType, entity: &Entity, is_removal: bool) {
pub fn track_storage_size_change(
&mut self,
entity_type: &EntityType,
entity: &Entity,
is_removal: bool,
) {
if ENV_VARS.enable_dips_metrics {
let key = CounterKey::Entity(entity_type.clone(), entity.id());
let size = entity.weight() as u64;

let storage = self.current_storage_size.entry(key).or_insert(0);
if is_removal {
*storage = storage.saturating_sub(size);
Expand All @@ -215,7 +220,12 @@ impl BlockStateMetrics {
}
}

pub fn track_storage_size_change_batch(&mut self, entity_type: &EntityType, entities: &[Entity], is_removal: bool) {
pub fn track_storage_size_change_batch(
&mut self,
entity_type: &EntityType,
entities: &[Entity],
is_removal: bool,
) {
if ENV_VARS.enable_dips_metrics {
for entity in entities {
self.track_storage_size_change(entity_type, entity, is_removal);
Expand Down
22 changes: 13 additions & 9 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashSet, sync::Arc};
use crate::{
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
cheap_clone::CheapClone,
components::metrics::block_state::BlockStateMetrics,
components::subgraph::Entity,
constraint_violation,
data::{store::Id, subgraph::schema::SubgraphError},
Expand Down Expand Up @@ -498,27 +499,28 @@ impl RowGroup {

pub fn track_metrics(&self, metrics: &mut BlockStateMetrics) {
// Track entity count changes
let changes: Vec<i32> = self.rows.iter()
let changes: Vec<i32> = self
.rows
.iter()
.map(|row| row.entity_count_change())
.collect();
metrics.track_entity_count_change_batch(&self.entity_type, &changes);

// Track storage changes and writes
let (writes, removals): (Vec<_>, Vec<_>) = self.rows.iter()
// Track writes only
let writes: Vec<Entity> = self
.rows
.iter()
.filter_map(|row| match row {
EntityModification::Insert { data, .. } |
EntityModification::Overwrite { data, .. } => Some((data, false)),
EntityModification::Insert { data, .. }
| EntityModification::Overwrite { data, .. } => Some(data.as_ref().clone()),
EntityModification::Remove { .. } => None,
})
.unzip();
.collect();

if !writes.is_empty() {
metrics.track_entity_write_batch(&self.entity_type, &writes);
metrics.track_storage_size_change_batch(&self.entity_type, &writes, false);
}
if !removals.is_empty() {
metrics.track_storage_size_change_batch(&self.entity_type, &removals, true);
}
}
}

Expand Down Expand Up @@ -704,6 +706,8 @@ impl Batch {

let mut mods = RowGroups::new();

let mut metrics = BlockStateMetrics::default();

for m in raw_mods {
mods.group_entry(&m.key().entity_type).push(m, block)?;
}
Expand Down
47 changes: 36 additions & 11 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::{error::DeterminismLevel, module::IntoTrap};

use super::module::WasmInstanceData;

use graph::schema::EntityKey;

fn write_poi_event(
proof_of_indexing: &SharedProofOfIndexing,
poi_event: &ProofOfIndexingEvent,
Expand Down Expand Up @@ -350,9 +352,16 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.metrics.track_storage_size_change(&entity_type, &entity, false);

if !state.entity_cache.contains_key(&key) {
state
.metrics
.track_storage_size_change(&entity_type, &entity, false);

if state
.entity_cache
.get(&key, GetScope::Store)
.map_err(|e| HostExportError::Deterministic(e.into()))?
.is_none()
{
state.metrics.track_entity_count_change(&entity_type, 1);
}

Expand All @@ -364,7 +373,7 @@ impl HostExports {
}

pub(crate) fn store_remove(
&self,
&mut self,
logger: &Logger,
state: &mut BlockState,
proof_of_indexing: &SharedProofOfIndexing,
Expand Down Expand Up @@ -394,13 +403,7 @@ impl HostExports {
"store_remove",
)?;

if let Some(entity) = state.entity_cache.get(&key, GetScope::Store)? {
state.metrics.track_storage_size_change(&entity_type, &entity, true);

state.metrics.track_entity_count_change(&entity_type, -1);
}

state.entity_cache.remove(key);
self.remove_entity(&key, state)?;

Ok(())
}
Expand Down Expand Up @@ -1245,6 +1248,28 @@ impl HostExports {
.map(|mut tokens| tokens.pop().unwrap())
.context("Failed to decode")
}

fn remove_entity(
&mut self,
key: &EntityKey,
state: &mut BlockState,
) -> Result<(), HostExportError> {
let entity_type = key.entity_type.clone();

if let Some(entity) = state
.entity_cache
.get(key, GetScope::Store)
.map_err(|e| HostExportError::Deterministic(e.into()))?
{
state
.metrics
.track_storage_size_change(&entity_type, &entity, true);
state.metrics.track_entity_count_change(&entity_type, -1);
}

state.entity_cache.remove(key.clone());
Ok(())
}
}

fn string_to_h160(string: &str) -> Result<H160, DeterministicHostError> {
Expand Down

0 comments on commit ef663d1

Please sign in to comment.