diff --git a/Cargo.lock b/Cargo.lock index d78c8c5c28e9c6..0cf0b56878bc6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5089,6 +5089,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-prometheus", "solana-rayon-threadlimit", "solana-replica-lib", "solana-rpc", @@ -5868,6 +5869,7 @@ dependencies = [ "solana-gossip", "solana-runtime", "solana-sdk 1.10.39", + "solana-streamer", "solana-vote-program", ] @@ -6516,6 +6518,7 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", + "solana-prometheus", "solana-replica-lib", "solana-rpc", "solana-runtime", diff --git a/core/Cargo.toml b/core/Cargo.toml index 6c6882b38d4734..be0133d03558d0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -59,6 +59,7 @@ solana-streamer = { path = "../streamer", version = "=1.10.39" } solana-transaction-status = { path = "../transaction-status", version = "=1.10.39" } solana-version = { path = "../version", version = "=1.10.39" } solana-vote-program = { path = "../programs/vote", version = "=1.10.39" } +solana-prometheus = { path = "../prometheus" } sys-info = "0.9.1" tempfile = "3.3.0" thiserror = "1.0" diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index d15e9c9bd0d4de..a889ad77404eb8 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -26,6 +26,7 @@ use { time::Duration, }, }; +use solana_prometheus::collector::{PrometheusCollector}; pub struct FetchStage { thread_hdls: Vec>, @@ -39,6 +40,7 @@ impl FetchStage { tpu_vote_sockets: Vec, exit: &Arc, poh_recorder: &Arc>, + prometheus_collector: Option, coalesce_ms: u64, ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { let (sender, receiver) = unbounded(); @@ -56,6 +58,7 @@ impl FetchStage { forward_receiver, poh_recorder, coalesce_ms, + prometheus_collector, None, ), receiver, @@ -75,6 +78,7 @@ impl FetchStage { forward_receiver: PacketBatchReceiver, poh_recorder: &Arc>, coalesce_ms: u64, + prometheus_collector: Option, in_vote_only_mode: Option>, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); @@ -91,6 +95,7 @@ impl FetchStage { forward_receiver, poh_recorder, coalesce_ms, + prometheus_collector, in_vote_only_mode, ) } @@ -149,6 +154,7 @@ impl FetchStage { forward_receiver: PacketBatchReceiver, poh_recorder: &Arc>, coalesce_ms: u64, + prometheus_collector: Option, in_vote_only_mode: Option>, ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); @@ -234,6 +240,13 @@ impl FetchStage { tpu_vote_stats.report(); tpu_forward_stats.report(); + if let Some(collector) = &prometheus_collector { + let stats = { + tpu_stats.total_stats.lock().unwrap().clone() + }; + collector.lock().unwrap().save_tpu_receiver_stats(stats) + } + if exit.load(Ordering::Relaxed) { return; } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index fc3a18e77cb582..74f8988a615b69 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -41,6 +41,7 @@ use { time::Duration, }, }; +use solana_prometheus::collector::PrometheusCollector; pub const DEFAULT_TPU_COALESCE_MS: u64 = 5; @@ -99,6 +100,7 @@ impl Tpu { cost_model: &Arc>, connection_cache: &Arc, keypair: &Keypair, + prometheus_collector: Option, enable_quic_servers: bool, ) -> Self { let TpuSockets { @@ -124,6 +126,7 @@ impl Tpu { forwarded_packet_receiver, poh_recorder, tpu_coalesce_ms, + prometheus_collector, Some(bank_forks.read().unwrap().get_vote_only_mode_signal()), ); diff --git a/core/src/validator.rs b/core/src/validator.rs index 375af3121837ba..4f4ec3ddc0024a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -109,6 +109,7 @@ use { time::{Duration, Instant}, }, }; +use solana_prometheus::collector::PrometheusCollector; const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; @@ -379,6 +380,7 @@ impl Validator { socket_addr_space: SocketAddrSpace, use_quic: bool, tpu_connection_pool_size: usize, + prometheus_collector: Option, ) -> Self { let id = identity_keypair.pubkey(); assert_eq!(id, node.info.id); @@ -740,6 +742,7 @@ impl Validator { connection_cache.clone(), max_complete_transaction_status_slot, config.vote_accounts_to_monitor.clone(), + prometheus_collector.clone(), )), if !config.rpc_config.full_api { None @@ -996,6 +999,7 @@ impl Validator { &cost_model, &connection_cache, &identity_keypair, + prometheus_collector, config.enable_quic_servers, ); diff --git a/prometheus/Cargo.toml b/prometheus/Cargo.toml index 574de5dfeaca8c..44ab060c64fc43 100644 --- a/prometheus/Cargo.toml +++ b/prometheus/Cargo.toml @@ -13,6 +13,7 @@ serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" solana-gossip = { path = "../gossip" } solana-runtime = { path = "../runtime" } +solana-streamer = { path = "../streamer" } solana-sdk = { path = "../sdk" } solana-vote-program = { path = "../programs/vote" } solana-config-program = { path = "../programs/config" } diff --git a/prometheus/src/collector.rs b/prometheus/src/collector.rs new file mode 100644 index 00000000000000..4b5cab46bd4811 --- /dev/null +++ b/prometheus/src/collector.rs @@ -0,0 +1,42 @@ +use crate::utils::{write_metric, Metric, MetricFamily}; +use solana_streamer::streamer::StreamerReceiveStatsTotal; +use std::io; +use std::sync::{Arc, Mutex}; + +pub type PrometheusCollector = Arc>; + +pub struct MetricsCollector { + tpu_receiver_stats: Option, +} + +impl MetricsCollector { + pub fn new() -> Self { + Self { + tpu_receiver_stats: None, + } + } + + pub fn save_tpu_receiver_stats(&mut self, stats: StreamerReceiveStatsTotal) { + self.tpu_receiver_stats = Some(stats) + } + + pub fn write_metrics(&self, out: &mut W) -> io::Result<()> { + if self.tpu_receiver_stats.is_none() { + return Ok(()); + } + + let tpu_metrics = self.tpu_receiver_stats.as_ref().unwrap(); + + write_metric( + out, + &MetricFamily { + name: "solana_validator_tpu_packets_count_total", + help: "Packets received by Transaction Processing Unit", + type_: "counter", + metrics: vec![Metric::new(tpu_metrics.packets_count_total as u64)], + }, + )?; + + Ok(()) + } +} diff --git a/prometheus/src/lib.rs b/prometheus/src/lib.rs index 6c599371f20ad2..7c9972671e175c 100644 --- a/prometheus/src/lib.rs +++ b/prometheus/src/lib.rs @@ -1,10 +1,12 @@ mod bank_metrics; pub mod banks_with_commitments; mod cluster_metrics; +pub mod collector; pub mod identity_info; mod snapshot_metrics; mod utils; +use crate::collector::PrometheusCollector; use banks_with_commitments::BanksWithCommitments; use identity_info::IdentityInfoMap; use solana_gossip::cluster_info::ClusterInfo; @@ -21,6 +23,7 @@ pub fn render_prometheus( vote_accounts: &Arc>, identity_config: &Arc, snapshot_config: &Option, + collector: &Option, ) -> Vec { // There are 3 levels of commitment for a bank: // - finalized: most recent block *confirmed* by supermajority of the @@ -41,5 +44,14 @@ pub fn render_prometheus( if let Some(snapshot_config) = snapshot_config { snapshot_metrics::write_snapshot_metrics(snapshot_config, &mut out).expect("IO error"); } + + if let Some(collector) = collector { + collector + .lock() + .unwrap() + .write_metrics(&mut out) + .expect("IO error"); + } + out } diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index 872d96c985842c..2e8acfc5ebef70 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -63,6 +63,9 @@ pub struct ReplicaNodeConfig { pub replica_exit: Arc>, pub socket_addr_space: SocketAddrSpace, pub vote_accounts_to_monitor: Arc>, + // TODO: for now it does not makes sense to pass it to RPC as we + // only get TPU metrics. + // pub prometheus_collector: PrometheusCollector, } pub struct ReplicaNode { @@ -257,6 +260,8 @@ fn start_client_rpc_services( connection_cache, max_complete_transaction_status_slot, replica_config.vote_accounts_to_monitor.clone(), + // replica_config.prometheus_collector.clone(), + None, )), Some(pubsub_service), Some(OptimisticallyConfirmedBankTracker::new( diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index b8bd68a186fc9f..7790fe28399899 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -56,6 +56,7 @@ use { }, tokio_util::codec::{BytesCodec, FramedRead}, }; +use solana_prometheus::collector::PrometheusCollector; const FULL_SNAPSHOT_REQUEST_PATH: &str = "/snapshot.tar.bz2"; const INCREMENTAL_SNAPSHOT_REQUEST_PATH: &str = "/incremental-snapshot.tar.bz2"; @@ -82,6 +83,7 @@ struct RpcRequestMiddleware { /// Initialized based on vote_accounts_to_monitor, maps identity /// pubkey associated with the vote account to the validator info. identity_info_map: Arc, + prometheus_collector: Option, } impl RpcRequestMiddleware { @@ -92,6 +94,7 @@ impl RpcRequestMiddleware { health: Arc, block_commitment_cache: Arc>, vote_accounts_to_monitor: Arc>, + prometheus_collector: Option, ) -> Self { Self { ledger_path, @@ -112,6 +115,7 @@ impl RpcRequestMiddleware { health, block_commitment_cache, vote_accounts_to_monitor, + prometheus_collector, } } @@ -319,6 +323,7 @@ impl RequestMiddleware for RpcRequestMiddleware { &self.vote_accounts_to_monitor, &self.identity_info_map, &self.snapshot_config, + &self.prometheus_collector, ))) .unwrap() .into() @@ -375,6 +380,7 @@ impl JsonRpcService { connection_cache: Arc, current_transaction_status_slot: Arc, vote_accounts_to_monitor: Arc>, + prometheus_collector: Option, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -524,6 +530,7 @@ impl JsonRpcService { health.clone(), block_commitment_cache.clone(), vote_accounts_to_monitor, + prometheus_collector, ); let server = ServerBuilder::with_meta_extractor( io, diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4d8ee2d1c0a924..64a639b79f5818 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -1,6 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! +use std::sync::Mutex; use { crate::{ packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH}, @@ -56,8 +57,29 @@ pub struct StreamerReceiveStats { pub packet_batches_count: AtomicUsize, pub full_packet_batches_count: AtomicUsize, pub max_channel_len: AtomicUsize, + pub total_stats: Arc>, } +#[derive(Default, Clone)] +pub struct StreamerReceiveStatsTotal { + pub packets_count_total: usize, + pub packet_batches_count_total: usize, + pub full_packet_batches_count_total: usize, + pub max_channel_len_total: usize, +} + +impl StreamerReceiveStatsTotal { + pub fn new() -> Self { + Self{ + packets_count_total: Default::default(), + packet_batches_count_total: Default::default(), + full_packet_batches_count_total: Default::default(), + max_channel_len_total: Default::default() + } + } +} + + impl StreamerReceiveStats { pub fn new(name: &'static str) -> Self { Self { @@ -66,30 +88,44 @@ impl StreamerReceiveStats { packet_batches_count: AtomicUsize::default(), full_packet_batches_count: AtomicUsize::default(), max_channel_len: AtomicUsize::default(), + total_stats: Arc::new(Mutex::new(StreamerReceiveStatsTotal::default())), } } pub fn report(&self) { + let packets_count = self.packets_count.swap(0, Ordering::Relaxed); + let packet_batches_count = self.packet_batches_count.swap(0, Ordering::Relaxed); + let full_packet_batches_count = self.full_packet_batches_count.swap(0, Ordering::Relaxed); + let max_channel_len = self.max_channel_len.swap(0, Ordering::Relaxed); + + { + let mut stats = self.total_stats.lock().unwrap(); + stats.packets_count_total += packets_count; + stats.packet_batches_count_total += packet_batches_count; + stats.full_packet_batches_count_total += full_packet_batches_count; + stats.max_channel_len_total += max_channel_len; + } + datapoint_info!( self.name, ( "packets_count", - self.packets_count.swap(0, Ordering::Relaxed) as i64, + packets_count as i64, i64 ), ( "packet_batches_count", - self.packet_batches_count.swap(0, Ordering::Relaxed) as i64, + packet_batches_count as i64, i64 ), ( "full_packet_batches_count", - self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64, + full_packet_batches_count as i64, i64 ), ( "channel_len", - self.max_channel_len.swap(0, Ordering::Relaxed) as i64, + max_channel_len as i64, i64 ), ); @@ -451,6 +487,7 @@ mod test { write!(io::sink(), "{:?}", Packet::default()).unwrap(); write!(io::sink(), "{:?}", PacketBatch::default()).unwrap(); } + #[test] fn streamer_send_test() { let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 47f788a5ff1b31..19379a2e8e37ad 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -729,6 +729,7 @@ impl TestValidator { socket_addr_space, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + None, )); // Needed to avoid panics in `solana-responder-gossip` in tests that create a number of diff --git a/validator/Cargo.toml b/validator/Cargo.toml index f1eb30337a6f14..f0158d34a10d87 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -53,6 +53,7 @@ solana-streamer = { path = "../streamer", version = "=1.10.39" } solana-test-validator = { path = "../test-validator", version = "=1.10.39" } solana-version = { path = "../version", version = "=1.10.39" } solana-vote-program = { path = "../programs/vote", version = "=1.10.39" } +solana-prometheus = { path = "../prometheus" } symlink = "0.1.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/validator/src/main.rs b/validator/src/main.rs index 24db1ca7018a58..ca5fee0318c7ef 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,4 +1,6 @@ #![allow(clippy::integer_arithmetic)] + +use std::sync::Mutex; #[cfg(not(target_env = "msvc"))] use jemallocator::Jemalloc; use { @@ -88,6 +90,7 @@ use { time::{Duration, SystemTime}, }, }; +use solana_prometheus::collector::MetricsCollector; #[cfg(not(target_env = "msvc"))] #[global_allocator] @@ -2621,6 +2624,9 @@ pub fn main() { validator_config.vote_accounts_to_monitor = Arc::new(get_vote_accounts_to_monitor(&matches)); + // TODO: create collector only if prometheus metrics are enabled + let prometheus_collector = Some(Arc::new(Mutex::new(MetricsCollector::new()))); + let dynamic_port_range = solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) .expect("invalid dynamic_port_range"); @@ -2997,6 +3003,7 @@ pub fn main() { socket_addr_space, tpu_use_quic, tpu_connection_pool_size, + prometheus_collector, ); *admin_service_post_init.write().unwrap() = Some(admin_rpc_service::AdminRpcRequestMetadataPostInit {