From f18aa04b902204f2356b7ff67dc04bb4d3176327 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 7 Nov 2024 16:09:57 +0100 Subject: [PATCH] safekeeper: use `set_len()` to zero out segments (#9665) ## Problem When we create a new segment, we zero it out in order to avoid changing the length and fsyncing metadata on every write. However, we zeroed it out by writing 8 KB zero-pages, and Tokio file writes have non-trivial overhead. ## Summary of changes Zero out the segment using [`File::set_len()`](https://docs.rs/tokio/latest/i686-unknown-linux-gnu/tokio/fs/struct.File.html#method.set_len) instead. This will typically (depending on the filesystem) just write a sparse file and omit the 16 MB of data entirely. This improves WAL append throughput for large messages by over 400% with fsync disabled, and 100% with fsync enabled. --- safekeeper/src/wal_storage.rs | 45 ++++++++----------- .../regress/test_wal_acceptor_async.py | 2 +- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 33b8bfe28e1b..4e67940c51d2 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -31,7 +31,6 @@ use crate::state::TimelinePersistentState; use crate::wal_backup::{read_object, remote_timeline_path}; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::XLogFileName; -use postgres_ffi::XLOG_BLCKSZ; use pq_proto::SystemId; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -223,6 +222,15 @@ impl PhysicalStorage { ) } + /// Call fsync if config requires so. + async fn fsync_file(&mut self, file: &File) -> Result<()> { + if !self.no_sync { + self.metrics + .observe_flush_seconds(time_io_closure(file.sync_all()).await?); + } + Ok(()) + } + /// Call fdatasync if config requires so. async fn fdatasync_file(&mut self, file: &File) -> Result<()> { if !self.no_sync { @@ -256,11 +264,15 @@ impl PhysicalStorage { // half initialized segment, first bake it under tmp filename and // then rename. let tmp_path = self.timeline_dir.join("waltmp"); - let mut file = File::create(&tmp_path) + let file = File::create(&tmp_path) .await .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?; - write_zeroes(&mut file, self.wal_seg_size).await?; + fail::fail_point!("sk-zero-segment", |_| { + info!("sk-zero-segment failpoint hit"); + Err(anyhow::anyhow!("failpoint: sk-zero-segment")) + }); + file.set_len(self.wal_seg_size as u64).await?; // Note: this doesn't get into observe_flush_seconds metric. But // segment init should be separate metric, if any. @@ -486,12 +498,12 @@ impl Storage for PhysicalStorage { // Remove all segments after the given LSN. remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?; - let (mut file, is_partial) = self.open_or_create(segno).await?; + let (file, is_partial) = self.open_or_create(segno).await?; // Fill end with zeroes - file.seek(SeekFrom::Start(xlogoff as u64)).await?; - write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?; - self.fdatasync_file(&file).await?; + file.set_len(xlogoff as u64).await?; + file.set_len(self.wal_seg_size as u64).await?; + self.fsync_file(&file).await?; if !is_partial { // Make segment partial once again @@ -751,25 +763,6 @@ impl WalReader { } } -/// Zero block for filling created WAL segments. -const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - -/// Helper for filling file with zeroes. -async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { - fail::fail_point!("sk-write-zeroes", |_| { - info!("write_zeroes hit failpoint"); - Err(anyhow::anyhow!("failpoint: sk-write-zeroes")) - }); - - while count >= XLOG_BLCKSZ { - file.write_all(ZERO_BLOCK).await?; - count -= XLOG_BLCKSZ; - } - file.write_all(&ZERO_BLOCK[0..count]).await?; - file.flush().await?; - Ok(()) -} - /// Helper function for opening WAL segment `segno` in `dir`. Returns file and /// whether it is .partial. pub(crate) async fn open_wal_file( diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 92306469f826..f328974264fa 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -602,7 +602,7 @@ async def run_segment_init_failure(env: NeonEnv): sk = env.safekeepers[0] sk_http = sk.http_client() - sk_http.configure_failpoints([("sk-write-zeroes", "return")]) + sk_http.configure_failpoints([("sk-zero-segment", "return")]) conn = await ep.connect_async() ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary # next insertion should hang until failpoint is disabled.