Skip to content

Commit

Permalink
graph: introduce tracking for entity storage size changes in BlockSta…
Browse files Browse the repository at this point in the history
…teMetrics
  • Loading branch information
MoonBoi9001 committed Jan 27, 2025
1 parent a506cd7 commit 3a73cf2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
41 changes: 41 additions & 0 deletions graph/src/components/metrics/block_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct BlockStateMetrics {
pub read_bytes_counter: HashMap<CounterKey, u64>,
pub write_bytes_counter: HashMap<CounterKey, u64>,
pub entity_count_changes: HashMap<CounterKey, u64>,
pub current_storage_size: HashMap<CounterKey, u64>,
}

#[derive(Hash, PartialEq, Eq, Debug, Clone)]
Expand All @@ -46,6 +47,7 @@ impl BlockStateMetrics {
gas_counter: HashMap::new(),
op_counter: HashMap::new(),
entity_count_changes: HashMap::new(),
current_storage_size: HashMap::new(),
}
}

Expand All @@ -69,6 +71,10 @@ impl BlockStateMetrics {
for (key, value) in other.entity_count_changes {
*self.entity_count_changes.entry(key).or_insert(0) = value;
}

for (key, value) in other.current_storage_size {
*self.current_storage_size.entry(key).or_insert(0) = value;
}
}

fn serialize_to_csv<T: Serialize, U: Serialize, I: IntoIterator<Item = T>>(
Expand Down Expand Up @@ -195,6 +201,35 @@ impl BlockStateMetrics {
}
}

pub fn track_storage_size_change(&mut self, entity_type: &EntityType, entity: &Entity, is_removal: bool) {
if ENV_VARS.enable_dips_metrics {
let key = CounterKey::Entity(entity_type.clone(), entity.id());
let size = entity.weight() as u64;

let storage = self.current_storage_size.entry(key).or_insert(0);
if is_removal {
*storage = storage.saturating_sub(size);
} else {
*storage = size;
}
}
}

pub fn track_storage_size_change_batch(&mut self, entity_type: &EntityType, entities: &[Entity], is_removal: bool) {
if ENV_VARS.enable_dips_metrics {
for entity in entities {
self.track_storage_size_change(entity_type, entity, is_removal);
}
}
}

pub fn track_entity_count_change_batch(&mut self, entity_type: &EntityType, changes: &[i32]) {
if ENV_VARS.enable_dips_metrics {
let total_change: i32 = changes.iter().sum();
self.track_entity_count_change(entity_type, total_change);
}
}

pub fn flush_metrics_to_store(
&self,
logger: &Logger,
Expand All @@ -218,6 +253,7 @@ impl BlockStateMetrics {
let read_bytes_counter = self.read_bytes_counter.clone();
let write_bytes_counter = self.write_bytes_counter.clone();
let entity_count_changes = self.entity_count_changes.clone();
let current_storage_size = self.current_storage_size.clone();

// Spawn the async task
crate::spawn(async move {
Expand Down Expand Up @@ -246,6 +282,11 @@ impl BlockStateMetrics {
Self::counter_to_csv(&entity_count_changes, vec!["entity", "id", "count"])
.unwrap(),
),
(
"storage_size",
Self::counter_to_csv(&current_storage_size, vec!["entity", "id", "bytes"])
.unwrap(),
),
];

// Convert each metrics upload into a future
Expand Down
30 changes: 30 additions & 0 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,31 @@ impl RowGroup {
pub fn ids(&self) -> impl Iterator<Item = &Id> {
self.rows.iter().map(|emod| emod.id())
}

pub fn track_metrics(&self, metrics: &mut BlockStateMetrics) {
// Track entity count changes
let changes: Vec<i32> = self.rows.iter()
.map(|row| row.entity_count_change())
.collect();
metrics.track_entity_count_change_batch(&self.entity_type, &changes);

// Track storage changes and writes
let (writes, removals): (Vec<_>, Vec<_>) = self.rows.iter()
.filter_map(|row| match row {
EntityModification::Insert { data, .. } |
EntityModification::Overwrite { data, .. } => Some((data, false)),
EntityModification::Remove { .. } => None,
})
.unzip();

if !writes.is_empty() {
metrics.track_entity_write_batch(&self.entity_type, &writes);
metrics.track_storage_size_change_batch(&self.entity_type, &writes, false);
}
if !removals.is_empty() {
metrics.track_storage_size_change_batch(&self.entity_type, &removals, true);
}
}
}

struct ClampsByBlockIterator<'a> {
Expand Down Expand Up @@ -683,6 +708,11 @@ impl Batch {
mods.group_entry(&m.key().entity_type).push(m, block)?;
}

// Track metrics for each group
for group in &mods.groups {
group.track_metrics(&mut metrics);
}

let data_sources = DataSources::new(block_ptr.cheap_clone(), data_sources);
let offchain_to_remove = DataSources::new(block_ptr.cheap_clone(), offchain_to_remove);
let first_block = block_ptr.number;
Expand Down
12 changes: 12 additions & 0 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.metrics.track_storage_size_change(&entity_type, &entity, false);

if !state.entity_cache.contains_key(&key) {
state.metrics.track_entity_count_change(&entity_type, 1);
}

state
.entity_cache
.set(key, entity, Some(&mut state.write_capacity_remaining))?;
Expand Down Expand Up @@ -388,6 +394,12 @@ impl HostExports {
"store_remove",
)?;

if let Some(entity) = state.entity_cache.get(&key, GetScope::Store)? {
state.metrics.track_storage_size_change(&entity_type, &entity, true);

state.metrics.track_entity_count_change(&entity_type, -1);
}

state.entity_cache.remove(key);

Ok(())
Expand Down

0 comments on commit 3a73cf2

Please sign in to comment.