From 0d5a512825705cce3f2c30707e27b3e437a9bc91 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 4 Nov 2024 16:22:46 +0100 Subject: [PATCH] safekeeper: add walreceiver metrics (#9450) ## Problem We don't have any observability for Safekeeper WAL receiver queues. ## Summary of changes Adds a few WAL receiver metrics: * `safekeeper_wal_receivers`: gauge of currently connected WAL receivers. * `safekeeper_wal_receiver_queue_depth`: histogram of queue depths per receiver, sampled every 5 seconds. * `safekeeper_wal_receiver_queue_depth_total`: gauge of total queued messages across all receivers. * `safekeeper_wal_receiver_queue_size_total`: gauge of total queued message sizes across all receivers. There are already metrics for ingested WAL volume: `written_wal_bytes` counter per timeline, and `safekeeper_write_wal_bytes` per-request histogram. --- libs/metrics/src/lib.rs | 81 +++++++++++++++++++++++++++++++++++ safekeeper/src/metrics.rs | 52 +++++++++++++++++++--- safekeeper/src/receive_wal.rs | 46 +++++++++++++++++++- safekeeper/src/safekeeper.rs | 64 +++++++++++++++++++++++++++ 4 files changed, 234 insertions(+), 9 deletions(-) diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 64e56cb69177..0f6c2a0937e5 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -110,6 +110,23 @@ static MAXRSS_KB: Lazy = Lazy::new(|| { pub const DISK_FSYNC_SECONDS_BUCKETS: &[f64] = &[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0]; +/// Constructs histogram buckets that are powers of two starting at 1 (i.e. 2^0), covering the end +/// points. For example, passing start=5,end=20 yields 4,8,16,32 as does start=4,end=32. +pub fn pow2_buckets(start: usize, end: usize) -> Vec { + assert_ne!(start, 0); + assert!(start <= end); + let start = match start.checked_next_power_of_two() { + Some(n) if n == start => n, // start already power of two + Some(n) => n >> 1, // power of two below start + None => panic!("start too large"), + }; + let end = end.checked_next_power_of_two().expect("end too large"); + std::iter::successors(Some(start), |n| n.checked_mul(2)) + .take_while(|n| n <= &end) + .map(|n| n as f64) + .collect() +} + pub struct BuildInfo { pub revision: &'static str, pub build_tag: &'static str, @@ -595,3 +612,67 @@ where self.dec.collect_into(metadata, labels, name, &mut enc.0) } } + +#[cfg(test)] +mod tests { + use super::*; + + const POW2_BUCKETS_MAX: usize = 1 << (usize::BITS - 1); + + #[test] + fn pow2_buckets_cases() { + assert_eq!(pow2_buckets(1, 1), vec![1.0]); + assert_eq!(pow2_buckets(1, 2), vec![1.0, 2.0]); + assert_eq!(pow2_buckets(1, 3), vec![1.0, 2.0, 4.0]); + assert_eq!(pow2_buckets(1, 4), vec![1.0, 2.0, 4.0]); + assert_eq!(pow2_buckets(1, 5), vec![1.0, 2.0, 4.0, 8.0]); + assert_eq!(pow2_buckets(1, 6), vec![1.0, 2.0, 4.0, 8.0]); + assert_eq!(pow2_buckets(1, 7), vec![1.0, 2.0, 4.0, 8.0]); + assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]); + assert_eq!( + pow2_buckets(1, 200), + vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0] + ); + + assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]); + assert_eq!(pow2_buckets(2, 8), vec![2.0, 4.0, 8.0]); + assert_eq!(pow2_buckets(3, 8), vec![2.0, 4.0, 8.0]); + assert_eq!(pow2_buckets(4, 8), vec![4.0, 8.0]); + assert_eq!(pow2_buckets(5, 8), vec![4.0, 8.0]); + assert_eq!(pow2_buckets(6, 8), vec![4.0, 8.0]); + assert_eq!(pow2_buckets(7, 8), vec![4.0, 8.0]); + assert_eq!(pow2_buckets(8, 8), vec![8.0]); + assert_eq!(pow2_buckets(20, 200), vec![16.0, 32.0, 64.0, 128.0, 256.0]); + + // Largest valid values. + assert_eq!( + pow2_buckets(1, POW2_BUCKETS_MAX).len(), + usize::BITS as usize + ); + assert_eq!(pow2_buckets(POW2_BUCKETS_MAX, POW2_BUCKETS_MAX).len(), 1); + } + + #[test] + #[should_panic] + fn pow2_buckets_zero_start() { + pow2_buckets(0, 1); + } + + #[test] + #[should_panic] + fn pow2_buckets_end_lt_start() { + pow2_buckets(2, 1); + } + + #[test] + #[should_panic] + fn pow2_buckets_end_overflow_min() { + pow2_buckets(1, POW2_BUCKETS_MAX + 1); + } + + #[test] + #[should_panic] + fn pow2_buckets_end_overflow_max() { + pow2_buckets(1, usize::MAX); + } +} diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index e8fdddcdc1ea..bb56e923f888 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -5,23 +5,23 @@ use std::{ time::{Instant, SystemTime}, }; -use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS}; use anyhow::Result; use futures::Future; use metrics::{ core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, + pow2_buckets, proto::MetricFamily, - register_histogram_vec, register_int_counter, register_int_counter_pair, - register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge, - HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec, + register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair, + register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge, GaugeVec, + Histogram, HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, + IntGauge, IntGaugeVec, DISK_FSYNC_SECONDS_BUCKETS, }; use once_cell::sync::Lazy; - use postgres_ffi::XLogSegNo; -use utils::pageserver_feedback::PageserverFeedback; -use utils::{id::TenantTimelineId, lsn::Lsn}; +use utils::{id::TenantTimelineId, lsn::Lsn, pageserver_feedback::PageserverFeedback}; use crate::{ + receive_wal::MSG_QUEUE_SIZE, state::{TimelineMemState, TimelinePersistentState}, GlobalTimelines, }; @@ -204,6 +204,44 @@ pub static WAL_BACKUP_TASKS: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter") }); +pub static WAL_RECEIVERS: Lazy = Lazy::new(|| { + register_int_gauge!( + "safekeeper_wal_receivers", + "Number of currently connected WAL receivers (i.e. connected computes)" + ) + .expect("Failed to register safekeeper_wal_receivers") +}); +pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy = Lazy::new(|| { + // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and + // full queues respectively. + let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE); + buckets.insert(0, 0.0); + buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64); + assert!(buckets.len() <= 12, "too many histogram buckets"); + + register_histogram!( + "safekeeper_wal_receiver_queue_depth", + "Number of queued messages per WAL receiver (sampled every 5 seconds)", + buckets + ) + .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram") +}); +pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy = Lazy::new(|| { + register_int_gauge!( + "safekeeper_wal_receiver_queue_depth_total", + "Total number of queued messages across all WAL receivers", + ) + .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge") +}); +// TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio +// MPSC channel to update counters on send, receive, and drop, while forwarding all other methods. +pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy = Lazy::new(|| { + register_int_gauge!( + "safekeeper_wal_receiver_queue_size_total", + "Total memory byte size of queued messages across all WAL receivers", + ) + .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge") +}); // Metrics collected on operations on the storage repository. #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)] diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 2410e22f450f..a0a96c6e99cc 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -3,6 +3,10 @@ //! sends replies back. use crate::handler::SafekeeperPostgresHandler; +use crate::metrics::{ + WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL, + WAL_RECEIVER_QUEUE_SIZE_TOTAL, +}; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; use crate::safekeeper::ServerInfo; @@ -86,6 +90,7 @@ impl WalReceivers { }; self.update_num(&shared); + WAL_RECEIVERS.inc(); WalReceiverGuard { id: pos, @@ -144,6 +149,7 @@ impl WalReceivers { let mut shared = self.mutex.lock(); shared.slots[id] = None; self.update_num(&shared); + WAL_RECEIVERS.dec(); } /// Broadcast pageserver feedback to connected walproposers. @@ -390,6 +396,7 @@ async fn read_network_loop( loop { let started = Instant::now(); + let size = next_msg.size(); match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await { Ok(()) => {} // Slow send, log a message and keep trying. Log context has timeline ID. @@ -409,6 +416,11 @@ async fn read_network_loop( // WalAcceptor terminated. Err(SendTimeoutError::Closed(_)) => return Ok(()), } + + // Update metrics. Will be decremented in WalAcceptor. + WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc(); + WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64); + next_msg = read_message(pgb_reader).await?; } } @@ -466,6 +478,12 @@ async fn network_write( /// walproposer, even when it's writing a steady stream of messages. const FLUSH_INTERVAL: Duration = Duration::from_secs(1); +/// The metrics computation interval. +/// +/// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth +/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines. +const METRICS_INTERVAL: Duration = Duration::from_secs(5); + /// Encapsulates a task which takes messages from msg_rx, processes and pushes /// replies to reply_tx. /// @@ -512,12 +530,15 @@ impl WalAcceptor { async fn run(&mut self) -> anyhow::Result<()> { let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id); - // Periodically flush the WAL. + // Periodically flush the WAL and compute metrics. let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); flush_ticker.tick().await; // skip the initial, immediate tick - // Tracks unflushed appends. + let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL); + metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // Tracks whether we have unflushed appends. let mut dirty = false; loop { @@ -529,6 +550,10 @@ impl WalAcceptor { break; }; + // Update gauge metrics. + WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec(); + WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64); + // Update walreceiver state in shmem for reporting. if let ProposerAcceptorMessage::Elected(_) = &msg { walreceiver_guard.get().status = WalReceiverStatus::Streaming; @@ -565,6 +590,12 @@ impl WalAcceptor { .process_msg(&ProposerAcceptorMessage::FlushWAL) .await? } + + // Update histogram metrics periodically. + _ = metrics_ticker.tick() => { + WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64); + None // no reply + } }; // Send reply, if any. @@ -585,3 +616,14 @@ impl WalAcceptor { Ok(()) } } + +/// On drop, drain msg_rx and update metrics to avoid leaks. +impl Drop for WalAcceptor { + fn drop(&mut self) { + self.msg_rx.close(); // prevent further sends + while let Ok(msg) = self.msg_rx.try_recv() { + WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec(); + WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64); + } + } +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index b3e006ab05e4..cf41d7a0abc2 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -422,6 +422,70 @@ impl ProposerAcceptorMessage { _ => bail!("unknown proposer-acceptor message tag: {}", tag), } } + + /// The memory size of the message, including byte slices. + pub fn size(&self) -> usize { + const BASE_SIZE: usize = std::mem::size_of::(); + + // For most types, the size is just the base enum size including the nested structs. Some + // types also contain byte slices; add them. + // + // We explicitly list all fields, to draw attention here when new fields are added. + let mut size = BASE_SIZE; + size += match self { + Self::Greeting(ProposerGreeting { + protocol_version: _, + pg_version: _, + proposer_id: _, + system_id: _, + timeline_id: _, + tenant_id: _, + tli: _, + wal_seg_size: _, + }) => 0, + + Self::VoteRequest(VoteRequest { term: _ }) => 0, + + Self::Elected(ProposerElected { + term: _, + start_streaming_at: _, + term_history: _, + timeline_start_lsn: _, + }) => 0, + + Self::AppendRequest(AppendRequest { + h: + AppendRequestHeader { + term: _, + term_start_lsn: _, + begin_lsn: _, + end_lsn: _, + commit_lsn: _, + truncate_lsn: _, + proposer_uuid: _, + }, + wal_data, + }) => wal_data.len(), + + Self::NoFlushAppendRequest(AppendRequest { + h: + AppendRequestHeader { + term: _, + term_start_lsn: _, + begin_lsn: _, + end_lsn: _, + commit_lsn: _, + truncate_lsn: _, + proposer_uuid: _, + }, + wal_data, + }) => wal_data.len(), + + Self::FlushWAL => 0, + }; + + size + } } /// Acceptor -> Proposer messages