Skip to content

Commit

Permalink
safekeeper: add walreceiver metrics (#9450)
Browse files Browse the repository at this point in the history
## Problem

We don't have any observability for Safekeeper WAL receiver queues.

## Summary of changes

Adds a few WAL receiver metrics:

* `safekeeper_wal_receivers`: gauge of currently connected WAL
receivers.
* `safekeeper_wal_receiver_queue_depth`: histogram of queue depths per
receiver, sampled every 5 seconds.
* `safekeeper_wal_receiver_queue_depth_total`: gauge of total queued
messages across all receivers.
* `safekeeper_wal_receiver_queue_size_total`: gauge of total queued
message sizes across all receivers.

There are already metrics for ingested WAL volume: `written_wal_bytes`
counter per timeline, and `safekeeper_write_wal_bytes` per-request
histogram.
  • Loading branch information
erikgrinaker authored Nov 4, 2024
1 parent 8ad1dbc commit 0d5a512
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 9 deletions.
81 changes: 81 additions & 0 deletions libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,23 @@ static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
pub const DISK_FSYNC_SECONDS_BUCKETS: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0];

/// Constructs histogram buckets that are powers of two starting at 1 (i.e. 2^0), covering the end
/// points. For example, passing start=5,end=20 yields 4,8,16,32 as does start=4,end=32.
pub fn pow2_buckets(start: usize, end: usize) -> Vec<f64> {
assert_ne!(start, 0);
assert!(start <= end);
let start = match start.checked_next_power_of_two() {
Some(n) if n == start => n, // start already power of two
Some(n) => n >> 1, // power of two below start
None => panic!("start too large"),
};
let end = end.checked_next_power_of_two().expect("end too large");
std::iter::successors(Some(start), |n| n.checked_mul(2))
.take_while(|n| n <= &end)
.map(|n| n as f64)
.collect()
}

pub struct BuildInfo {
pub revision: &'static str,
pub build_tag: &'static str,
Expand Down Expand Up @@ -595,3 +612,67 @@ where
self.dec.collect_into(metadata, labels, name, &mut enc.0)
}
}

#[cfg(test)]
mod tests {
use super::*;

const POW2_BUCKETS_MAX: usize = 1 << (usize::BITS - 1);

#[test]
fn pow2_buckets_cases() {
assert_eq!(pow2_buckets(1, 1), vec![1.0]);
assert_eq!(pow2_buckets(1, 2), vec![1.0, 2.0]);
assert_eq!(pow2_buckets(1, 3), vec![1.0, 2.0, 4.0]);
assert_eq!(pow2_buckets(1, 4), vec![1.0, 2.0, 4.0]);
assert_eq!(pow2_buckets(1, 5), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(1, 6), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(1, 7), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(
pow2_buckets(1, 200),
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0]
);

assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(2, 8), vec![2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(3, 8), vec![2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(4, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(5, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(6, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(7, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(8, 8), vec![8.0]);
assert_eq!(pow2_buckets(20, 200), vec![16.0, 32.0, 64.0, 128.0, 256.0]);

// Largest valid values.
assert_eq!(
pow2_buckets(1, POW2_BUCKETS_MAX).len(),
usize::BITS as usize
);
assert_eq!(pow2_buckets(POW2_BUCKETS_MAX, POW2_BUCKETS_MAX).len(), 1);
}

#[test]
#[should_panic]
fn pow2_buckets_zero_start() {
pow2_buckets(0, 1);
}

#[test]
#[should_panic]
fn pow2_buckets_end_lt_start() {
pow2_buckets(2, 1);
}

#[test]
#[should_panic]
fn pow2_buckets_end_overflow_min() {
pow2_buckets(1, POW2_BUCKETS_MAX + 1);
}

#[test]
#[should_panic]
fn pow2_buckets_end_overflow_max() {
pow2_buckets(1, usize::MAX);
}
}
52 changes: 45 additions & 7 deletions safekeeper/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ use std::{
time::{Instant, SystemTime},
};

use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS};
use anyhow::Result;
use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
pow2_buckets,
proto::MetricFamily,
register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge, GaugeVec,
Histogram, HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
IntGauge, IntGaugeVec, DISK_FSYNC_SECONDS_BUCKETS,
};
use once_cell::sync::Lazy;

use postgres_ffi::XLogSegNo;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
use utils::{id::TenantTimelineId, lsn::Lsn, pageserver_feedback::PageserverFeedback};

use crate::{
receive_wal::MSG_QUEUE_SIZE,
state::{TimelineMemState, TimelinePersistentState},
GlobalTimelines,
};
Expand Down Expand Up @@ -204,6 +204,44 @@ pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
});
pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_wal_receivers",
"Number of currently connected WAL receivers (i.e. connected computes)"
)
.expect("Failed to register safekeeper_wal_receivers")
});
pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
// Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
// full queues respectively.
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
buckets.insert(0, 0.0);
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
assert!(buckets.len() <= 12, "too many histogram buckets");

register_histogram!(
"safekeeper_wal_receiver_queue_depth",
"Number of queued messages per WAL receiver (sampled every 5 seconds)",
buckets
)
.expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
});
pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_wal_receiver_queue_depth_total",
"Total number of queued messages across all WAL receivers",
)
.expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
});
// TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
// MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_wal_receiver_queue_size_total",
"Total memory byte size of queued messages across all WAL receivers",
)
.expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
});

// Metrics collected on operations on the storage repository.
#[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
Expand Down
46 changes: 44 additions & 2 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
//! sends replies back.

use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::{
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
};
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
Expand Down Expand Up @@ -86,6 +90,7 @@ impl WalReceivers {
};

self.update_num(&shared);
WAL_RECEIVERS.inc();

WalReceiverGuard {
id: pos,
Expand Down Expand Up @@ -144,6 +149,7 @@ impl WalReceivers {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
self.update_num(&shared);
WAL_RECEIVERS.dec();
}

/// Broadcast pageserver feedback to connected walproposers.
Expand Down Expand Up @@ -390,6 +396,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(

loop {
let started = Instant::now();
let size = next_msg.size();
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
Ok(()) => {}
// Slow send, log a message and keep trying. Log context has timeline ID.
Expand All @@ -409,6 +416,11 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
// WalAcceptor terminated.
Err(SendTimeoutError::Closed(_)) => return Ok(()),
}

// Update metrics. Will be decremented in WalAcceptor.
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);

next_msg = read_message(pgb_reader).await?;
}
}
Expand Down Expand Up @@ -466,6 +478,12 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
/// walproposer, even when it's writing a steady stream of messages.
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);

/// The metrics computation interval.
///
/// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
const METRICS_INTERVAL: Duration = Duration::from_secs(5);

/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
///
Expand Down Expand Up @@ -512,12 +530,15 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);

// Periodically flush the WAL.
// Periodically flush the WAL and compute metrics.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
flush_ticker.tick().await; // skip the initial, immediate tick

// Tracks unflushed appends.
let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

// Tracks whether we have unflushed appends.
let mut dirty = false;

loop {
Expand All @@ -529,6 +550,10 @@ impl WalAcceptor {
break;
};

// Update gauge metrics.
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);

// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
Expand Down Expand Up @@ -565,6 +590,12 @@ impl WalAcceptor {
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
}

// Update histogram metrics periodically.
_ = metrics_ticker.tick() => {
WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
None // no reply
}
};

// Send reply, if any.
Expand All @@ -585,3 +616,14 @@ impl WalAcceptor {
Ok(())
}
}

/// On drop, drain msg_rx and update metrics to avoid leaks.
impl Drop for WalAcceptor {
fn drop(&mut self) {
self.msg_rx.close(); // prevent further sends
while let Ok(msg) = self.msg_rx.try_recv() {
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
}
}
}
64 changes: 64 additions & 0 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,70 @@ impl ProposerAcceptorMessage {
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
}

/// The memory size of the message, including byte slices.
pub fn size(&self) -> usize {
const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();

// For most types, the size is just the base enum size including the nested structs. Some
// types also contain byte slices; add them.
//
// We explicitly list all fields, to draw attention here when new fields are added.
let mut size = BASE_SIZE;
size += match self {
Self::Greeting(ProposerGreeting {
protocol_version: _,
pg_version: _,
proposer_id: _,
system_id: _,
timeline_id: _,
tenant_id: _,
tli: _,
wal_seg_size: _,
}) => 0,

Self::VoteRequest(VoteRequest { term: _ }) => 0,

Self::Elected(ProposerElected {
term: _,
start_streaming_at: _,
term_history: _,
timeline_start_lsn: _,
}) => 0,

Self::AppendRequest(AppendRequest {
h:
AppendRequestHeader {
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),

Self::NoFlushAppendRequest(AppendRequest {
h:
AppendRequestHeader {
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),

Self::FlushWAL => 0,
};

size
}
}

/// Acceptor -> Proposer messages
Expand Down

1 comment on commit 0d5a512

@github-actions
Copy link

@github-actions github-actions bot commented on 0d5a512 Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5445 tests run: 5212 passed, 1 failed, 232 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_sharded_ingest[release-pg16-github-actions-selfhosted-1]"
Flaky tests (2)

Postgres 17

Postgres 16

  • test_remote_storage_backup_and_restore[False-real_s3]: release-arm64

Code coverage* (full report)

  • functions: 31.5% (7787 of 24720 functions)
  • lines: 48.9% (61103 of 124909 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
0d5a512 at 2024-11-04T17:31:25.044Z :recycle:

Please sign in to comment.