From 502d512fe2ca9f07392428d70d5262cf3f5103e2 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 19 Dec 2024 14:04:42 +0000 Subject: [PATCH] safekeeper: lift benchmarking utils into safekeeper crate (#10200) ## Problem The benchmarking utilities are also useful for testing. We want to write tests in the safekeeper crate. ## Summary of changes This commit lifts the utils to the safekeeper crate. They are compiled if the benchmarking features is enabled or if in test mode. --- libs/postgres_ffi/src/wal_generator.rs | 6 ++-- safekeeper/Cargo.toml | 2 ++ safekeeper/benches/receive_wal.rs | 23 +++++++-------- safekeeper/src/lib.rs | 3 ++ .../benchutils.rs => src/test_utils.rs} | 28 ++++++++++--------- .../tests/walproposer_sim/walproposer_disk.rs | 2 +- 6 files changed, 36 insertions(+), 28 deletions(-) rename safekeeper/{benches/benchutils.rs => src/test_utils.rs} (78%) diff --git a/libs/postgres_ffi/src/wal_generator.rs b/libs/postgres_ffi/src/wal_generator.rs index 69cc4b771fa1..a72b035e17bc 100644 --- a/libs/postgres_ffi/src/wal_generator.rs +++ b/libs/postgres_ffi/src/wal_generator.rs @@ -106,11 +106,11 @@ impl WalGenerator { const TIMELINE_ID: u32 = 1; /// Creates a new WAL generator with the given record generator. - pub fn new(record_generator: R) -> WalGenerator { + pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator { Self { record_generator, - lsn: Lsn(0), - prev_lsn: Lsn(0), + lsn: start_lsn, + prev_lsn: start_lsn, } } diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 086407603f80..3ebb7097f200 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -9,6 +9,7 @@ default = [] # Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, # which adds some runtime cost to run tests on outage conditions testing = ["fail/failpoints"] +benchmarking = [] [dependencies] async-stream.workspace = true @@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] } [[bench]] name = "receive_wal" harness = false +required-features = ["benchmarking"] diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs index 313d945b942f..996c4d9b8c87 100644 --- a/safekeeper/benches/receive_wal.rs +++ b/safekeeper/benches/receive_wal.rs @@ -1,11 +1,7 @@ //! WAL ingestion benchmarks. -#[path = "benchutils.rs"] -mod benchutils; - use std::io::Write as _; -use benchutils::Env; use bytes::BytesMut; use camino_tempfile::tempfile; use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion}; @@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor}; use safekeeper::safekeeper::{ AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, }; +use safekeeper::test_utils::Env; use tokio::io::AsyncWriteExt as _; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; @@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) { assert!(size >= prefixlen); let message = vec![0; size - prefixlen]; - let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message)); + let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0)); // Set up the Safekeeper. let env = Env::new(fsync)?; - let mut safekeeper = - runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?; + let mut safekeeper = runtime.block_on(env.make_safekeeper( + NodeId(1), + TenantTimelineId::generate(), + Lsn(0), + ))?; b.iter_batched_ref( // Pre-construct WAL records and requests. Criterion will batch them. @@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) { let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded let env = Env::new(fsync)?; - let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message")); + let walgen = + &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0)); // Create buffered channels that can fit all requests, to avoid blocking on channels. let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n); @@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) { // TODO: WalAcceptor doesn't actually need a full timeline, only // Safekeeper::process_msg(). Consider decoupling them to simplify the setup. let tli = env - .make_timeline(NodeId(1), TenantTimelineId::generate()) + .make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0)) .await? .wal_residence_guard() .await?; @@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) { assert!(size >= prefixlen); let message = vec![0; size - prefixlen]; - let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message)); + let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0)); // Construct and spawn the WalAcceptor task. let env = Env::new(fsync)?; @@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) { runtime.block_on(async { let tli = env - .make_timeline(NodeId(1), TenantTimelineId::generate()) + .make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0)) .await? .wal_residence_guard() .await?; diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index abe6e00a665b..7acf355e6a71 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -43,6 +43,9 @@ pub mod wal_reader_stream; pub mod wal_service; pub mod wal_storage; +#[cfg(any(test, feature = "benchmarking"))] +pub mod test_utils; + mod timelines_global_map; use std::sync::Arc; pub use timelines_global_map::GlobalTimelines; diff --git a/safekeeper/benches/benchutils.rs b/safekeeper/src/test_utils.rs similarity index 78% rename from safekeeper/benches/benchutils.rs rename to safekeeper/src/test_utils.rs index 48d796221b43..c40a8bae5a7d 100644 --- a/safekeeper/benches/benchutils.rs +++ b/safekeeper/src/test_utils.rs @@ -1,18 +1,18 @@ use std::sync::Arc; +use crate::rate_limit::RateLimiter; +use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory}; +use crate::state::{TimelinePersistentState, TimelineState}; +use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline}; +use crate::timelines_set::TimelinesSet; +use crate::wal_backup::remote_timeline_path; +use crate::{control_file, wal_storage, SafeKeeperConf}; use camino_tempfile::Utf8TempDir; -use safekeeper::rate_limit::RateLimiter; -use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory}; -use safekeeper::state::{TimelinePersistentState, TimelineState}; -use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline}; -use safekeeper::timelines_set::TimelinesSet; -use safekeeper::wal_backup::remote_timeline_path; -use safekeeper::{control_file, wal_storage, SafeKeeperConf}; use tokio::fs::create_dir_all; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; -/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop. +/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop. pub struct Env { /// Whether to enable fsync. pub fsync: bool, @@ -21,7 +21,7 @@ pub struct Env { } impl Env { - /// Creates a new benchmarking environment in a temporary directory. fsync controls whether to + /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to /// enable fsyncing. pub fn new(fsync: bool) -> anyhow::Result { let tempdir = camino_tempfile::tempdir()?; @@ -47,6 +47,7 @@ impl Env { &self, node_id: NodeId, ttid: TenantTimelineId, + start_lsn: Lsn, ) -> anyhow::Result> { let conf = self.make_conf(node_id); @@ -67,9 +68,9 @@ impl Env { safekeeper .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected { term: 1, - start_streaming_at: Lsn(0), - term_history: TermHistory(vec![(1, Lsn(0)).into()]), - timeline_start_lsn: Lsn(0), + start_streaming_at: start_lsn, + term_history: TermHistory(vec![(1, start_lsn).into()]), + timeline_start_lsn: start_lsn, })) .await?; @@ -82,12 +83,13 @@ impl Env { &self, node_id: NodeId, ttid: TenantTimelineId, + start_lsn: Lsn, ) -> anyhow::Result> { let conf = Arc::new(self.make_conf(node_id)); let timeline_dir = get_timeline_dir(&conf, &ttid); let remote_path = remote_timeline_path(&ttid)?; - let safekeeper = self.make_safekeeper(node_id, ttid).await?; + let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?; let shared_state = SharedState::new(StateSK::Loaded(safekeeper)); let timeline = Timeline::new( diff --git a/safekeeper/tests/walproposer_sim/walproposer_disk.rs b/safekeeper/tests/walproposer_sim/walproposer_disk.rs index aefb3919a1b3..7dc7f485487b 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_disk.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_disk.rs @@ -18,7 +18,7 @@ impl DiskWalProposer { internal_available_lsn: Lsn(0), prev_lsn: Lsn(0), disk: BlockStorage::new(), - wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])), + wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)), }), }) }