diff --git a/apps/hubble/src/addon/src/db/rocksdb.rs b/apps/hubble/src/addon/src/db/rocksdb.rs index 04ee860cbf..ee8835c210 100644 --- a/apps/hubble/src/addon/src/db/rocksdb.rs +++ b/apps/hubble/src/addon/src/db/rocksdb.rs @@ -1,6 +1,6 @@ use crate::db::multi_chunk_writer::MultiChunkWriter; use crate::logger::LOGGER; -use crate::statsd::statsd; +use crate::metrics::statsd; use crate::store::{ self, get_db, get_iterator_options, hub_error_to_js_throw, increment_vec_u8, HubError, PageOptions, PAGE_SIZE_MAX, diff --git a/apps/hubble/src/addon/src/lib.rs b/apps/hubble/src/addon/src/lib.rs index 113dc4403d..17d20bc19f 100644 --- a/apps/hubble/src/addon/src/lib.rs +++ b/apps/hubble/src/addon/src/lib.rs @@ -11,7 +11,7 @@ use threadpool::ThreadPool; mod db; mod logger; -mod statsd; +mod metrics; mod store; mod trie; @@ -89,7 +89,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("ed25519_verify", ed25519_verify)?; cx.export_function("blake3_20", js_blake3_20)?; - cx.export_function("createStatsdClient", statsd::js_create_statsd_client)?; + cx.export_function("createStatsdClient", metrics::js_create_statsd_client)?; cx.export_function("flushLogBuffer", logger::js_flush_log_buffer)?; cx.export_function("setLogLevel", logger::js_set_log_level)?; @@ -241,6 +241,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { // Register Merkle Trie methods MerkleTrie::register_js_methods(&mut cx)?; + crate::metrics::register_counters_js(&mut cx)?; + Ok(()) } diff --git a/apps/hubble/src/addon/src/metrics/counters.rs b/apps/hubble/src/addon/src/metrics/counters.rs new file mode 100644 index 0000000000..e03c02da9a --- /dev/null +++ b/apps/hubble/src/addon/src/metrics/counters.rs @@ -0,0 +1,320 @@ +use std::{ + collections::{hash_map, HashMap}, + hash::Hash, + marker::PhantomData, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, + time::{Instant, SystemTime, UNIX_EPOCH}, +}; + +use neon::{ + context::{Context, FunctionContext, ModuleContext}, + result::{JsResult, NeonResult}, + types::JsString, +}; +use once_cell::sync::Lazy; + +use crate::store::StoreDef; + +#[derive(Default)] +pub struct Counters { + prints: CounterMetric, + store_counters: RwLock>, +} + +#[derive(Eq, PartialEq, Hash, Debug, Clone, Copy)] +pub enum FidLockSource { + Merge, + Prune, +} + +#[derive(Eq, PartialEq, Hash, Debug, Clone, Copy)] +pub enum StoreAction { + FidLock(FidLockSource), + Merge, + ThreadPoolWait, + MergeCompactState, + MergeAdd, + MergeRemove, + DeleteMany(usize), + DeleteMoveTransaction, + PutRemoveTransaction, + DeleteAddTransaction, + DeleteCompactStateTransaction, + PutAddTransaction, + PutAddCompactStateTransaction, + GetRemovesByFid, + GetAddsByFid, + GetRemoves, + GetAdd, + Revoke, + GetAllMessagesByFid(Option), +} + +impl Counters { + pub fn store_action(&self, store_name: String, action: StoreAction, time_us: u64) { + Self::hash_record(&self.store_counters, (store_name, action), time_us) + } + + fn hash_record(hash: &RwLock>, key: T, time_us: u64) { + { + let hash_read = hash.read().unwrap(); + if let Some(entry) = hash_read.get(&key) { + entry.record_duration(time_us); + return; + } + } + + let mut hash_write = hash.write().unwrap(); + let entry = match hash_write.entry(key) { + hash_map::Entry::Occupied(o) => o.into_mut(), + hash_map::Entry::Vacant(v) => v.insert(CounterMetric::default()), + }; + + entry.record_duration(time_us); + } + + pub fn print_to_writer(&self, mut out: W) -> Result<(), std::io::Error> { + let start = Instant::now(); + + self.prints.write_to_config("counter_print", &mut out)?; + + { + let counters = self.store_counters.read().unwrap(); + for ((store, action), counter) in counters.iter() { + let extra_opt = match action { + StoreAction::DeleteMany(0) => continue, + StoreAction::DeleteMany(count) => Some(format!(",count=\"{}\"", *count)), + StoreAction::GetAllMessagesByFid(Some(page_size)) => { + Some(format!(",page_size=\"{}\"", *page_size)) + } + StoreAction::FidLock(source) => Some(format!(",source=\"{:?}\"", source)), + _ => None, + } + .unwrap_or_default(); + + /* write count so summary view is easier */ + { + let action_str = match action { + StoreAction::DeleteMany(_) => "DeleteMany".to_string(), + StoreAction::GetAllMessagesByFid(_) => "GetAllMessagesByFid".to_string(), + StoreAction::FidLock(_) => "FidLock".to_string(), + action => format!("{action:?}"), + }; + + writeln!(out, "#HELP store_count")?; + writeln!(out, "#TYPE store_count counter")?; + writeln!( + out, + "store_count{{store=\"{}\",action=\"{}\"{}}} {}", + store, + action_str, + extra_opt, + counter.count.load(Ordering::Relaxed) + )?; + + writeln!(out, "#HELP store_total_us")?; + writeln!(out, "#TYPE store_total_us counter")?; + writeln!( + out, + "store_total_us{{store=\"{}\",action=\"{}\"{}}} {}", + store, + action_str, + extra_opt, + counter.total_us.load(Ordering::Relaxed) + )?; + + writeln!(out, "#HELP store_max_us")?; + writeln!(out, "#TYPE store_max_us gauge")?; + writeln!( + out, + "store_max_us{{store=\"{}\",action=\"{}\"{}}} {}", + store, + action_str, + extra_opt, + counter.max_us_value.load(Ordering::Relaxed) + )?; + + writeln!(out, "#HELP store_min_us")?; + writeln!(out, "#TYPE store_min_us gauge")?; + writeln!( + out, + "store_min_us{{store=\"{}\",action=\"{}\"{}}} {}", + store, + action_str, + extra_opt, + counter.min_us_value.load(Ordering::Relaxed) + )?; + } + } + } + + self.prints + .record_duration(start.elapsed().as_micros() as u64); + + Ok(()) + } +} + +pub struct StoreLifetimeCounter { + store_name: Option, + start: Instant, + action: StoreAction, +} + +impl StoreLifetimeCounter { + pub fn new(name: S, action: StoreAction) -> Self { + StoreLifetimeCounter { + store_name: Some(name.name()), + start: Instant::now(), + action, + } + } +} + +impl Drop for StoreLifetimeCounter { + fn drop(&mut self) { + counters().store_action( + self.store_name.take().unwrap(), + self.action, + self.start.elapsed().as_micros() as u64, + ); + } +} + +pub trait StoreName { + fn name(self) -> String; +} + +impl StoreName for String { + fn name(self) -> String { + self + } +} + +impl StoreName for &'static str { + fn name(self) -> String { + self.to_string() + } +} + +impl<'a> StoreName for &'a dyn StoreDef { + fn name(self) -> String { + self.debug_name().to_string() + } +} + +impl StoreName for PhantomData { + fn name(self) -> String { + std::any::type_name::() + .rsplit("::") + .next() + .unwrap() + .to_string() + } +} + +#[derive(Default)] +pub struct CounterMetric { + count: AtomicU64, + total_us: AtomicU64, + extrema_last_update: AtomicU64, + max_us_value: AtomicU64, + min_us_value: AtomicU64, +} + +impl CounterMetric { + pub fn record_duration(&self, time_us: u64) { + self.count.fetch_add(1, Ordering::AcqRel); + self.total_us.fetch_add(time_us, Ordering::AcqRel); + + let current_minute = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + / 60; + + /* note: don't really care about race conditions, we may drop the true peak time and that's fine. */ + let last_update = self.extrema_last_update.load(Ordering::Acquire); + if last_update != current_minute { + self.extrema_last_update + .store(current_minute, Ordering::Release); + self.max_us_value.store(time_us, Ordering::Release); + self.min_us_value.store(time_us, Ordering::Release); + } else { + let current = self.max_us_value.load(Ordering::Acquire); + if current < time_us { + self.max_us_value.store(time_us, Ordering::Release); + } + + let current = self.min_us_value.load(Ordering::Acquire); + if time_us < current { + self.min_us_value.store(time_us, Ordering::Release); + } + } + } + + pub fn write_to_config( + &self, + prefix: &str, + mut out: W, + ) -> Result<(), std::io::Error> { + writeln!(out, "#HELP {}_count", prefix)?; + writeln!(out, "#TYPE {}_count counter", prefix)?; + writeln!( + out, + "{}_count {}", + prefix, + self.count.load(Ordering::Relaxed) + )?; + + writeln!(out, "#HELP {}_total_us", prefix)?; + writeln!(out, "#TYPE {}_total_us counter", prefix)?; + writeln!( + out, + "{}_total_us {}", + prefix, + self.total_us.load(Ordering::Relaxed) + )?; + + writeln!(out, "#HELP {}_max_us", prefix)?; + writeln!(out, "#TYPE {}_max_us gauge", prefix)?; + writeln!( + out, + "{}_max_us {}", + prefix, + self.max_us_value.load(Ordering::Relaxed) + )?; + + writeln!(out, "#HELP {}_min_us", prefix)?; + writeln!(out, "#TYPE {}_min_us gauge", prefix)?; + writeln!( + out, + "{}_min_us {}", + prefix, + self.min_us_value.load(Ordering::Relaxed) + )?; + + Ok(()) + } +} + +static COUNTERS: Lazy> = Lazy::new(|| Arc::new(Counters::default())); + +pub fn counters() -> &'static Counters { + COUNTERS.as_ref() +} + +fn js_counters_string(mut cx: FunctionContext) -> JsResult { + let counters = counters(); + let mut out = Vec::::new(); + counters.print_to_writer(&mut out).unwrap(); + Ok(cx.string(std::str::from_utf8(&out).unwrap())) +} + +pub fn register_counters_js(cx: &mut ModuleContext) -> NeonResult<()> { + cx.export_function("countersString", js_counters_string)?; + Ok(()) +} diff --git a/apps/hubble/src/addon/src/metrics/mod.rs b/apps/hubble/src/addon/src/metrics/mod.rs new file mode 100644 index 0000000000..ebdd4b4c8b --- /dev/null +++ b/apps/hubble/src/addon/src/metrics/mod.rs @@ -0,0 +1,5 @@ +mod counters; +mod statsd; + +pub use counters::*; +pub use statsd::*; diff --git a/apps/hubble/src/addon/src/statsd.rs b/apps/hubble/src/addon/src/metrics/statsd.rs similarity index 100% rename from apps/hubble/src/addon/src/statsd.rs rename to apps/hubble/src/addon/src/metrics/statsd.rs diff --git a/apps/hubble/src/addon/src/store/cast_store.rs b/apps/hubble/src/addon/src/store/cast_store.rs index ce9242015d..99d66399f1 100644 --- a/apps/hubble/src/addon/src/store/cast_store.rs +++ b/apps/hubble/src/addon/src/store/cast_store.rs @@ -8,6 +8,7 @@ use super::{ }; use crate::{ db::{RocksDB, RocksDbTransactionBatch}, + metrics::StoreAction, protos::{self, Message, MessageType}, }; use crate::{ @@ -57,6 +58,10 @@ pub struct CastStoreDef { } impl StoreDef for CastStoreDef { + fn debug_name(&self) -> &'static str { + "CastStore" + } + fn postfix(&self) -> u8 { UserPostfix::CastMessage as u8 } @@ -533,7 +538,10 @@ impl CastStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_cast_adds_by_fid(&store, fid, &page_options); deferred_settle_messages(deferred, &channel, messages); @@ -559,7 +567,10 @@ impl CastStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_cast_removes_by_fid(&store, fid, &page_options); deferred_settle_messages(deferred, &channel, messages); }); @@ -718,7 +729,10 @@ impl CastStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_casts_by_mention(&store, mention, &page_options); deferred_settle_messages(deferred, &channel, messages); diff --git a/apps/hubble/src/addon/src/store/link_store.rs b/apps/hubble/src/addon/src/store/link_store.rs index b40ad50495..5aca6fa0b3 100644 --- a/apps/hubble/src/addon/src/store/link_store.rs +++ b/apps/hubble/src/addon/src/store/link_store.rs @@ -2,6 +2,7 @@ use std::{borrow::Borrow, convert::TryInto, sync::Arc}; use crate::db::{RocksDB, RocksDbTransactionBatch}; use crate::logger::LOGGER; +use crate::metrics::StoreAction; use crate::protos::link_body::Target; use crate::protos::message_data::Body; use crate::protos::{message_data, LinkBody, Message, MessageData, MessageType}; @@ -512,7 +513,10 @@ impl LinkStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_link_adds_by_fid(&store, fid, link_type, &page_options); deferred_settle_messages(deferred, &channel, messages); @@ -531,7 +535,10 @@ impl LinkStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_link_removes_by_fid(&store, fid, link_type, &page_options); deferred_settle_messages(deferred, &channel, messages); @@ -655,7 +662,10 @@ impl LinkStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_links_by_target(&store, &target, link_type, &page_options); deferred_settle_messages(deferred, &channel, messages); @@ -678,7 +688,10 @@ impl LinkStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = Self::get_all_link_messages_by_fid(&store, fid, &page_options); deferred_settle_messages(deferred, &channel, messages); @@ -689,6 +702,10 @@ impl LinkStore { } impl StoreDef for LinkStore { + fn debug_name(&self) -> &'static str { + "LinkStore" + } + fn postfix(&self) -> u8 { UserPostfix::LinkMessage.as_u8() } diff --git a/apps/hubble/src/addon/src/store/reaction_store.rs b/apps/hubble/src/addon/src/store/reaction_store.rs index 31318808cd..4fb1eda213 100644 --- a/apps/hubble/src/addon/src/store/reaction_store.rs +++ b/apps/hubble/src/addon/src/store/reaction_store.rs @@ -8,6 +8,7 @@ use super::{ }; use crate::{ db::{RocksDB, RocksDbTransactionBatch}, + metrics::StoreAction, protos::{self, reaction_body::Target, Message, MessageType, ReactionBody, ReactionType}, }; use crate::{protos::message_data, THREAD_POOL}; @@ -24,6 +25,10 @@ pub struct ReactionStoreDef { } impl StoreDef for ReactionStoreDef { + fn debug_name(&self) -> &'static str { + "ReactionStoreDef" + } + fn postfix(&self) -> u8 { UserPostfix::ReactionMessage.as_u8() } @@ -481,7 +486,10 @@ impl ReactionStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = ReactionStore::get_reaction_adds_by_fid(&store, fid, reaction_type, &page_options); @@ -524,7 +532,10 @@ impl ReactionStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = ReactionStore::get_reaction_removes_by_fid( &store, fid, @@ -629,7 +640,10 @@ impl ReactionStore { let channel = cx.channel(); let (deferred, promise) = cx.promise(); + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let messages = ReactionStore::get_reactions_by_target( &store, &target, diff --git a/apps/hubble/src/addon/src/store/store.rs b/apps/hubble/src/addon/src/store/store.rs index 33ec9e8027..0042c07c32 100644 --- a/apps/hubble/src/addon/src/store/store.rs +++ b/apps/hubble/src/addon/src/store/store.rs @@ -6,6 +6,7 @@ use super::{ }; use crate::{ db::{RocksDB, RocksDbTransactionBatch}, + metrics::{FidLockSource, StoreAction, StoreLifetimeCounter}, protos::{ self, hub_event, link_body::Target, message_data::Body, HubEvent, HubEventType, MergeMessageBody, Message, MessageType, @@ -107,6 +108,8 @@ pub struct PageOptions { /// Some methods in this trait provide default implementations. These methods can be overridden /// by implementing the trait for a specific type. pub trait StoreDef: Send + Sync { + fn debug_name(&self) -> &'static str; + fn postfix(&self) -> u8; fn add_message_type(&self) -> u8; fn remove_message_type(&self) -> u8; @@ -120,6 +123,7 @@ pub trait StoreDef: Send + Sync { self.remove_message_type() != MessageType::None as u8 } + /* todo: what is this? */ fn is_compact_state_type(&self, message: &Message) -> bool; // If the store supports compaction state messages, this should return true @@ -339,7 +343,7 @@ pub trait StoreDef: Send + Sync { pub struct Store { store_def: Box, store_event_handler: Arc, - fid_locks: Arc<[Mutex<()>; 4]>, + fid_locks: Arc<[Mutex<()>]>, db: Arc, logger: slog::Logger, } @@ -357,12 +361,13 @@ impl Store { Store { store_def, store_event_handler, - fid_locks: Arc::new([ - Mutex::new(()), - Mutex::new(()), - Mutex::new(()), - Mutex::new(()), - ]), + fid_locks: { + let mut locks = Vec::with_capacity(FID_LOCKS_COUNT); + for _ in 0..FID_LOCKS_COUNT { + locks.push(Mutex::new(())); + } + locks.into() + }, db, logger: LOGGER.new(o!("component" => "Store")), } @@ -400,6 +405,8 @@ impl Store { }); } + let _metric = self.metric(StoreAction::GetAdd); + let adds_key = self.store_def.make_add_key(partial_message)?; let message_ts_hash = self.db.get(&adds_key)?; @@ -434,6 +441,8 @@ impl Store { }); } + let _metric = self.metric(StoreAction::GetRemoves); + let removes_key = self.store_def.make_remove_key(partial_message)?; let message_ts_hash = self.db.get(&removes_key)?; @@ -458,6 +467,8 @@ impl Store { where F: Fn(&protos::Message) -> bool, { + let _metric = self.metric(StoreAction::GetAddsByFid); + let prefix = make_message_primary_key(fid, self.store_def.postfix(), None); let messages_page = message::get_messages_page_by_prefix(&self.db, &prefix, &page_options, |message| { @@ -484,6 +495,8 @@ impl Store { }); } + let _metric = self.metric(StoreAction::GetRemovesByFid); + let prefix = make_message_primary_key(fid, self.store_def.postfix(), None); let messages = message::get_messages_page_by_prefix(&self.db, &prefix, &page_options, |message| { @@ -506,6 +519,8 @@ impl Store { }); } + let _metric = self.metric(StoreAction::PutAddCompactStateTransaction); + let compact_state_key = self.store_def.make_compact_state_add_key(message)?; txn.put(compact_state_key, message_encode(&message)); @@ -518,6 +533,8 @@ impl Store { ts_hash: &[u8; TS_HASH_LENGTH], message: &Message, ) -> Result<(), HubError> { + let _metric = self.metric(StoreAction::PutAddTransaction); + put_message_transaction(txn, &message)?; let adds_key = self.store_def.make_add_key(message)?; @@ -542,6 +559,8 @@ impl Store { }); } + let _metric = self.metric(StoreAction::DeleteCompactStateTransaction); + let compact_state_key = self.store_def.make_compact_state_add_key(message)?; txn.delete(compact_state_key); @@ -554,6 +573,8 @@ impl Store { ts_hash: &[u8; TS_HASH_LENGTH], message: &Message, ) -> Result<(), HubError> { + let _metric = self.metric(StoreAction::DeleteAddTransaction); + self.store_def .delete_secondary_indices(txn, ts_hash, message)?; @@ -569,6 +590,8 @@ impl Store { ts_hash: &[u8; TS_HASH_LENGTH], message: &Message, ) -> Result<(), HubError> { + let _metric = self.metric(StoreAction::PutRemoveTransaction); + if !self.store_def.remove_type_supported() { return Err(HubError { code: "bad_request.validation_failure".to_string(), @@ -589,6 +612,8 @@ impl Store { txn: &mut RocksDbTransactionBatch, message: &Message, ) -> Result<(), HubError> { + let _metric = self.metric(StoreAction::DeleteMoveTransaction); + if !self.store_def.remove_type_supported() { return Err(HubError { code: "bad_request.validation_failure".to_string(), @@ -610,6 +635,8 @@ impl Store { txn: &mut RocksDbTransactionBatch, messages: &Vec, ) -> Result<(), HubError> { + let _metric = self.metric(StoreAction::DeleteMany(messages.len())); + for message in messages { if self.store_def.is_compact_state_type(message) { self.delete_compact_state_transaction(txn, message)?; @@ -627,13 +654,23 @@ impl Store { } pub fn merge(&self, message: &Message) -> Result, HubError> { + let _metric = self.metric(StoreAction::Merge); + // Grab a merge lock. The typescript code does this by individual fid, but we don't have a // good way of doing that efficiently here. We'll just use an array of locks, with each fid // deterministically mapped to a lock. - let _fid_lock = &self.fid_locks - [message.data.as_ref().unwrap().fid as usize % FID_LOCKS_COUNT] - .lock() - .unwrap(); + + let _fid_lock = 'get_lock: { + let index = message.data.as_ref().unwrap().fid as usize % FID_LOCKS_COUNT; + let lock = &self.fid_locks[index]; + + if let Ok(lock) = lock.try_lock() { + break 'get_lock lock; + } + + let _metric = self.metric(StoreAction::FidLock(FidLockSource::Merge)); + lock.lock().unwrap() + }; if !self.store_def.is_add_type(message) && !(self.store_def.remove_type_supported() && self.store_def.is_remove_type(message)) @@ -658,6 +695,8 @@ impl Store { } pub fn revoke(&self, message: &Message) -> Result, HubError> { + let _metric = self.metric(StoreAction::Revoke); + // Start a transaction let mut txn = self.db.txn(); @@ -721,6 +760,8 @@ impl Store { } pub fn merge_compact_state(&self, message: &Message) -> Result, HubError> { + let _metric = self.metric(StoreAction::MergeCompactState); + let mut merge_conflicts = vec![]; // First, find if there's an existing compact state message, and if there is, @@ -814,6 +855,8 @@ impl Store { ts_hash: &[u8; TS_HASH_LENGTH], message: &Message, ) -> Result, HubError> { + let _metric = self.metric(StoreAction::MergeAdd); + // If the store supports compact state messages, we don't merge messages that don't exist in the compact state if self.store_def.compact_state_type_supported() { // Get the compact state message @@ -878,6 +921,8 @@ impl Store { ts_hash: &[u8; TS_HASH_LENGTH], message: &Message, ) -> Result, HubError> { + let _metric = self.metric(StoreAction::MergeRemove); + // If the store supports compact state messages, we don't merge remove messages before its timestamp // If the store supports compact state messages, we don't merge messages that don't exist in the compact state if self.store_def.compact_state_type_supported() { @@ -938,6 +983,20 @@ impl Store { cached_count: u64, units: u64, ) -> Result, HubError> { + // TODO: uncomment this code and monitor in production + // // Concurrent writes on same memory space kills RocksDB performance. Ensure this doesn't happen with a lock. + // let _fid_lock = 'get_lock: { + // let index = fid as usize % FID_LOCKS_COUNT; + // let lock = &self.fid_locks[index]; + + // if let Ok(lock) = lock.try_lock() { + // break 'get_lock lock; + // } + + // let _metric = self.metric(StoreAction::FidLock(FidLockSource::Prune)); + // lock.lock().unwrap() + // }; + let mut pruned_events = vec![]; let mut count = cached_count; @@ -993,6 +1052,8 @@ impl Store { fid: u32, page_options: &PageOptions, ) -> Result { + let _metric = self.metric(StoreAction::GetAllMessagesByFid(page_options.page_size)); + let prefix = make_message_primary_key(fid, self.store_def.postfix(), None); let messages = message::get_messages_page_by_prefix(&self.db, &prefix, &page_options, |message| { @@ -1003,6 +1064,10 @@ impl Store { Ok(messages) } + + pub fn metric(&self, action: StoreAction) -> StoreLifetimeCounter { + StoreLifetimeCounter::new(self.store_def.as_ref(), action) + } } // Neon bindings @@ -1070,7 +1135,10 @@ impl Store { // We run the merge in a threadpool because it can be very CPU intensive and it will block // the NodeJS main thread. + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + let results = messages .into_iter() .map(|message| match message { @@ -1147,7 +1215,10 @@ impl Store { // We run the prune in a threadpool because it can be very CPU intensive and it will block // the NodeJS main thread. + let metric = store.metric(StoreAction::ThreadPoolWait); THREAD_POOL.lock().unwrap().execute(move || { + drop(metric); + // Run the prune job in a separate thread let prune_result = store.prune_messages(fid, cached_count, units); diff --git a/apps/hubble/src/addon/src/store/user_data_store.rs b/apps/hubble/src/addon/src/store/user_data_store.rs index 2a44e2929b..54cbad2b78 100644 --- a/apps/hubble/src/addon/src/store/user_data_store.rs +++ b/apps/hubble/src/addon/src/store/user_data_store.rs @@ -27,6 +27,10 @@ pub struct UserDataStoreDef { } impl StoreDef for UserDataStoreDef { + fn debug_name(&self) -> &'static str { + "UserDataStore" + } + fn postfix(&self) -> u8 { UserPostfix::UserDataMessage as u8 } diff --git a/apps/hubble/src/addon/src/store/username_proof_store.rs b/apps/hubble/src/addon/src/store/username_proof_store.rs index 7d391fd7d2..7603e49c29 100644 --- a/apps/hubble/src/addon/src/store/username_proof_store.rs +++ b/apps/hubble/src/addon/src/store/username_proof_store.rs @@ -25,6 +25,10 @@ pub struct UsernameProofStoreDef { } impl StoreDef for UsernameProofStoreDef { + fn debug_name(&self) -> &'static str { + "UsernameProofStore" + } + fn postfix(&self) -> u8 { UserPostfix::UsernameProofMessage.as_u8() } diff --git a/apps/hubble/src/addon/src/store/verification_store.rs b/apps/hubble/src/addon/src/store/verification_store.rs index 03b1211641..0aa391bc96 100644 --- a/apps/hubble/src/addon/src/store/verification_store.rs +++ b/apps/hubble/src/addon/src/store/verification_store.rs @@ -29,6 +29,10 @@ pub struct VerificationStoreDef { } impl StoreDef for VerificationStoreDef { + fn debug_name(&self) -> &'static str { + "VerificationStore" + } + fn postfix(&self) -> u8 { UserPostfix::VerificationMessage as u8 } diff --git a/apps/hubble/src/addon/src/trie/merkle_trie.rs b/apps/hubble/src/addon/src/trie/merkle_trie.rs index 424ef08c46..dcef3c2fdc 100644 --- a/apps/hubble/src/addon/src/trie/merkle_trie.rs +++ b/apps/hubble/src/addon/src/trie/merkle_trie.rs @@ -2,7 +2,7 @@ use super::trie_node::{TrieNode, TIMESTAMP_LENGTH}; use crate::{ db::{RocksDB, RocksDbTransactionBatch}, logger::LOGGER, - statsd::statsd, + metrics::statsd, store::{encode_node_metadata_to_js_object, get_merkle_trie, hub_error_to_js_throw, HubError}, THREAD_POOL, }; diff --git a/apps/hubble/src/rpc/httpServer.ts b/apps/hubble/src/rpc/httpServer.ts index 7f9ca586a8..ba860eea04 100644 --- a/apps/hubble/src/rpc/httpServer.ts +++ b/apps/hubble/src/rpc/httpServer.ts @@ -33,6 +33,7 @@ import { PageOptions } from "../storage/stores/types.js"; import Engine from "../storage/engine/index.js"; import { statsd } from "../utils/statsd.js"; import { DeepPartial } from "storage/stores/rustStoreBase.js"; +import { rsCountersString } from "../rustfunctions.js"; const log = logger.child({ component: "HttpAPIServer" }); @@ -676,6 +677,12 @@ export class HttpAPIServer { } }); }); + + // @doc-tag: /metrics + this.app.get("/v0/metrics", (_request, reply) => { + const data = rsCountersString(); + reply.send(data); + }); } async start(ip = "0.0.0.0", port = 0): Promise> { diff --git a/apps/hubble/src/rustfunctions.ts b/apps/hubble/src/rustfunctions.ts index 6fcc65580f..2fac096e59 100644 --- a/apps/hubble/src/rustfunctions.ts +++ b/apps/hubble/src/rustfunctions.ts @@ -743,3 +743,8 @@ export const rsMerkleTrieRootHash = async (trie: RustMerkleTrie): Promise => { return await lib.merkleTrieUnloadChildren.call(trie); }; + +export const rsCountersString = (): String => { + const store = lib.countersString(); + return store as String; +};