Skip to content

Commit

Permalink
safekeeper: lift benchmarking utils into safekeeper crate (#10200)
Browse files Browse the repository at this point in the history
## Problem

The benchmarking utilities are also useful for testing. We want to write
tests in the safekeeper crate.

## Summary of changes

This commit lifts the utils to the safekeeper crate. They are compiled
if the benchmarking features is enabled or if in test mode.
  • Loading branch information
VladLazar authored Dec 19, 2024
1 parent afda6d4 commit 502d512
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 28 deletions.
6 changes: 3 additions & 3 deletions libs/postgres_ffi/src/wal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
const TIMELINE_ID: u32 = 1;

/// Creates a new WAL generator with the given record generator.
pub fn new(record_generator: R) -> WalGenerator<R> {
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
Self {
record_generator,
lsn: Lsn(0),
prev_lsn: Lsn(0),
lsn: start_lsn,
prev_lsn: start_lsn,
}
}

Expand Down
2 changes: 2 additions & 0 deletions safekeeper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
benchmarking = []

[dependencies]
async-stream.workspace = true
Expand Down Expand Up @@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
[[bench]]
name = "receive_wal"
harness = false
required-features = ["benchmarking"]
23 changes: 12 additions & 11 deletions safekeeper/benches/receive_wal.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
//! WAL ingestion benchmarks.
#[path = "benchutils.rs"]
mod benchutils;

use std::io::Write as _;

use benchutils::Env;
use bytes::BytesMut;
use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
Expand All @@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
};
use safekeeper::test_utils::Env;
use tokio::io::AsyncWriteExt as _;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
Expand Down Expand Up @@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];

let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));

// Set up the Safekeeper.
let env = Env::new(fsync)?;
let mut safekeeper =
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
let mut safekeeper = runtime.block_on(env.make_safekeeper(
NodeId(1),
TenantTimelineId::generate(),
Lsn(0),
))?;

b.iter_batched_ref(
// Pre-construct WAL records and requests. Criterion will batch them.
Expand Down Expand Up @@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded

let env = Env::new(fsync)?;
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
let walgen =
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));

// Create buffered channels that can fit all requests, to avoid blocking on channels.
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
Expand All @@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
// TODO: WalAcceptor doesn't actually need a full timeline, only
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;
Expand Down Expand Up @@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];

let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));

// Construct and spawn the WalAcceptor task.
let env = Env::new(fsync)?;
Expand All @@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {

runtime.block_on(async {
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;
Expand Down
3 changes: 3 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;

#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;

mod timelines_global_map;
use std::sync::Arc;
pub use timelines_global_map::GlobalTimelines;
Expand Down
28 changes: 15 additions & 13 deletions safekeeper/benches/benchutils.rs → safekeeper/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::sync::Arc;

use crate::rate_limit::RateLimiter;
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path;
use crate::{control_file, wal_storage, SafeKeeperConf};
use camino_tempfile::Utf8TempDir;
use safekeeper::rate_limit::RateLimiter;
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use safekeeper::state::{TimelinePersistentState, TimelineState};
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use safekeeper::timelines_set::TimelinesSet;
use safekeeper::wal_backup::remote_timeline_path;
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;

/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
pub struct Env {
/// Whether to enable fsync.
pub fsync: bool,
Expand All @@ -21,7 +21,7 @@ pub struct Env {
}

impl Env {
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
/// enable fsyncing.
pub fn new(fsync: bool) -> anyhow::Result<Self> {
let tempdir = camino_tempfile::tempdir()?;
Expand All @@ -47,6 +47,7 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
let conf = self.make_conf(node_id);

Expand All @@ -67,9 +68,9 @@ impl Env {
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
term: 1,
start_streaming_at: Lsn(0),
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
timeline_start_lsn: Lsn(0),
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;

Expand All @@ -82,12 +83,13 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;

let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));

let timeline = Timeline::new(
Expand Down
2 changes: 1 addition & 1 deletion safekeeper/tests/walproposer_sim/walproposer_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl DiskWalProposer {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
}),
})
}
Expand Down

1 comment on commit 502d512

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7245 tests run: 6936 passed, 1 failed, 308 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (3)

Postgres 17

  • test_deletion_queue_recovery[validate-lose]: release-x86-64
  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-arm64

Postgres 14

Code coverage* (full report)

  • functions: 31.2% (8398 of 26876 functions)
  • lines: 47.9% (66651 of 139012 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
502d512 at 2024-12-19T16:22:27.261Z :recycle:

Please sign in to comment.