From 955d5f2fb08fa12300fcdaa7dafdc9edb7db9942 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Mon, 4 Mar 2024 15:24:16 -0800 Subject: [PATCH] Use FIOFFS instead of fsync (on supported platforms) --- Cargo.toml | 2 +- downstairs/Cargo.toml | 1 + downstairs/src/extent.rs | 78 +++++++++++- downstairs/src/extent_inner_raw.rs | 33 +++-- downstairs/src/extent_inner_sqlite.rs | 59 +++++++-- downstairs/src/lib.rs | 36 ++---- downstairs/src/region.rs | 177 ++++++++++++++++++++------ 7 files changed, 282 insertions(+), 104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 35e39bb24..626a44ddd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,7 @@ itertools = "0.12.1" libc = "0.2" mime_guess = "2.0.4" nbd = "0.3.1" -nix = { version = "0.28", features = [ "feature", "uio" ] } +nix = { version = "0.28", features = [ "feature", "uio", "ioctl" ] } num_enum = "0.7" num-derive = "0.4" num-traits = "0.2" diff --git a/downstairs/Cargo.toml b/downstairs/Cargo.toml index bd0e3de84..df83e244b 100644 --- a/downstairs/Cargo.toml +++ b/downstairs/Cargo.toml @@ -71,3 +71,4 @@ asm = ["usdt/asm"] default = [] zfs_snapshot = [] integration-tests = [] # Enables creating SQLite volumes +omicron-build = [] # Uses FIOFSS for flushes instead of fsync diff --git a/downstairs/src/extent.rs b/downstairs/src/extent.rs index a4ac70f84..7d255bea5 100644 --- a/downstairs/src/extent.rs +++ b/downstairs/src/extent.rs @@ -36,13 +36,45 @@ pub(crate) trait ExtentInner: Send + Sync + Debug { fn flush_number(&self) -> Result; fn dirty(&self) -> Result; - fn flush( + /// Performs any metadata updates needed before a flush + fn pre_flush( + &mut self, + new_flush: u64, + new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError>; + + /// Syncs all relevant data to persistant storage + fn flush_inner( + &mut self, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError>; + + /// Performs any metadata updates after syncing data to persistent storage + fn post_flush( &mut self, new_flush: u64, new_gen: u64, job_id: JobOrReconciliationId, ) -> Result<(), CrucibleError>; + /// Performs a full flush (pre/inner/post) + /// + /// This is only exposed for the sake of unit testing; normal code should + /// use the fine-grained functions and be forced to consider performance. + #[cfg(test)] + fn flush( + &mut self, + new_flush: u64, + new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { + self.pre_flush(new_flush, new_gen, job_id)?; + self.flush_inner(job_id)?; + self.post_flush(new_flush, new_gen, job_id)?; + Ok(()) + } + fn read( &mut self, job_id: JobId, @@ -578,14 +610,17 @@ impl Extent { Ok(()) } - #[instrument] - pub(crate) fn flush + Debug>( + /// Prepares for a flush + /// + /// Returns `false` if we should skip the flush (because this extent is not + /// dirty), or `true` if we should proceed. + pub(crate) fn pre_flush + Debug>( &mut self, new_flush: u64, new_gen: u64, id: I, // only used for logging log: &Logger, - ) -> Result<(), CrucibleError> { + ) -> Result { let job_id: JobOrReconciliationId = id.into(); if !self.inner.dirty()? { @@ -593,7 +628,7 @@ impl Extent { * If we have made no writes to this extent since the last flush, * we do not need to update the extent on disk */ - return Ok(()); + return Ok(false); } // Read only extents should never have the dirty bit set. If they do, @@ -604,7 +639,38 @@ impl Extent { crucible_bail!(ModifyingReadOnlyRegion); } - self.inner.flush(new_flush, new_gen, job_id) + self.inner.pre_flush(new_flush, new_gen, job_id)?; + Ok(true) + } + + /// Performs post-flush cleanup + pub(crate) fn post_flush + Debug>( + &mut self, + new_flush: u64, + new_gen: u64, + id: I, // only used for logging + ) -> Result<(), CrucibleError> { + let job_id: JobOrReconciliationId = id.into(); + self.inner.post_flush(new_flush, new_gen, job_id) + } + + /// Flushes this extent if it is dirty + #[instrument] + pub(crate) fn flush< + I: Into + Debug + Copy + Clone, + >( + &mut self, + new_flush: u64, + new_gen: u64, + id: I, // only used for logging + log: &Logger, + ) -> Result<(), CrucibleError> { + if !self.pre_flush(new_flush, new_gen, id, log)? { + return Ok(()); + } + self.inner.flush_inner(id.into())?; + self.post_flush(new_flush, new_gen, id)?; + Ok(()) } pub fn get_meta_info(&self) -> ExtentMeta { diff --git a/downstairs/src/extent_inner_raw.rs b/downstairs/src/extent_inner_raw.rs index a2bad6695..d7c93327c 100644 --- a/downstairs/src/extent_inner_raw.rs +++ b/downstairs/src/extent_inner_raw.rs @@ -387,20 +387,12 @@ impl ExtentInner for RawInner { Ok(ExtentReadResponse { data: buf, blocks }) } - fn flush( + fn pre_flush( &mut self, new_flush: u64, new_gen: u64, job_id: JobOrReconciliationId, ) -> Result<(), CrucibleError> { - if !self.dirty()? { - /* - * If we have made no writes to this extent since the last flush, - * we do not need to update the extent on disk - */ - return Ok(()); - } - cdt::extent__flush__start!(|| { (job_id.get(), self.extent_number.0, 0) }); @@ -409,10 +401,17 @@ impl ExtentInner for RawInner { // operation atomic. self.set_flush_number(new_flush, new_gen)?; + Ok(()) + } + + fn flush_inner( + &mut self, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { // Now, we fsync to ensure data is flushed to disk. It's okay to crash // before this point, because setting the flush number is atomic. cdt::extent__flush__file__start!(|| { - (job_id.get(), self.extent_number.0, 0) + (job_id.get(), self.extent_number.0) }); if let Err(e) = self.file.sync_all() { /* @@ -425,9 +424,17 @@ impl ExtentInner for RawInner { } self.context_slot_dirty.fill(0); cdt::extent__flush__file__done!(|| { - (job_id.get(), self.extent_number.0, 0) + (job_id.get(), self.extent_number.0) }); + Ok(()) + } + fn post_flush( + &mut self, + _new_flush: u64, + _new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { // Check for fragmentation in the context slots leading to worse // performance, and defragment if that's the case. let extra_syscalls_per_rw = self @@ -442,9 +449,7 @@ impl ExtentInner for RawInner { Ok(()) }; - cdt::extent__flush__done!(|| { - (job_id.get(), self.extent_number.0, 0) - }); + cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number.0) }); r } diff --git a/downstairs/src/extent_inner_sqlite.rs b/downstairs/src/extent_inner_sqlite.rs index 5c19ec80e..119180542 100644 --- a/downstairs/src/extent_inner_sqlite.rs +++ b/downstairs/src/extent_inner_sqlite.rs @@ -36,13 +36,32 @@ impl ExtentInner for SqliteInner { self.0.lock().unwrap().dirty() } - fn flush( + fn pre_flush( &mut self, new_flush: u64, new_gen: u64, job_id: JobOrReconciliationId, ) -> Result<(), CrucibleError> { - self.0.lock().unwrap().flush(new_flush, new_gen, job_id) + self.0.lock().unwrap().pre_flush(new_flush, new_gen, job_id) + } + + fn flush_inner( + &mut self, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { + self.0.lock().unwrap().flush_inner(job_id) + } + + fn post_flush( + &mut self, + new_flush: u64, + new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { + self.0 + .lock() + .unwrap() + .post_flush(new_flush, new_gen, job_id) } fn read( @@ -194,10 +213,10 @@ impl SqliteMoreInner { Ok(self.dirty.get()) } - fn flush( + fn pre_flush( &mut self, - new_flush: u64, - new_gen: u64, + _new_flush: u64, + _new_gen: u64, job_id: JobOrReconciliationId, ) -> Result<(), CrucibleError> { // Used for profiling @@ -207,12 +226,19 @@ impl SqliteMoreInner { (job_id.get(), self.extent_number.0, n_dirty_blocks) }); + Ok(()) + } + + fn flush_inner( + &mut self, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { /* * We must first fsync to get any outstanding data written to disk. * This must be done before we update the flush number. */ cdt::extent__flush__file__start!(|| { - (job_id.get(), self.extent_number.0, n_dirty_blocks) + (job_id.get(), self.extent_number.0) }); if let Err(e) = self.file.sync_all() { /* @@ -225,9 +251,18 @@ impl SqliteMoreInner { ); } cdt::extent__flush__file__done!(|| { - (job_id.get(), self.extent_number.0, n_dirty_blocks) + (job_id.get(), self.extent_number.0) }); + Ok(()) + } + + fn post_flush( + &mut self, + new_flush: u64, + new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { // Clear old block contexts. In order to be crash consistent, only // perform this after the extent fsync is done. For each block // written since the last flush, remove all block context rows where @@ -237,7 +272,7 @@ impl SqliteMoreInner { // file is rehashed, since in that case we don't have that luxury. cdt::extent__flush__collect__hashes__start!(|| { - (job_id.get(), self.extent_number.0, n_dirty_blocks) + (job_id.get(), self.extent_number.0) }); // Rehash any parts of the file that we *may have written* data to since @@ -250,7 +285,7 @@ impl SqliteMoreInner { }); cdt::extent__flush__sqlite__insert__start!(|| { - (job_id.get(), self.extent_number.0, n_dirty_blocks) + (job_id.get(), self.extent_number.0) }); // We put all of our metadb updates into a single transaction to @@ -265,7 +300,7 @@ impl SqliteMoreInner { )?; cdt::extent__flush__sqlite__insert__done!(|| { - (job_id.get(), self.extent_number.0, n_dirty_blocks) + (job_id.get(), self.extent_number.0) }); self.set_flush_number(new_flush, new_gen)?; @@ -275,9 +310,7 @@ impl SqliteMoreInner { // Finally, reset the file's seek offset to 0 self.file.seek(SeekFrom::Start(0))?; - cdt::extent__flush__done!(|| { - (job_id.get(), self.extent_number.0, n_dirty_blocks) - }); + cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number.0) }); Ok(()) } diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 686c7595d..37b5c2478 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -741,44 +741,24 @@ pub mod cdt { fn submit__writeunwritten__done(_: u64) {} fn submit__write__done(_: u64) {} fn submit__flush__done(_: u64) {} - fn extent__flush__start(job_id: u64, extent_id: u32, extent_size: u64) {} - fn extent__flush__done(job_id: u64, extent_id: u32, extent_size: u64) {} - fn extent__flush__file__start( + fn extent__flush__start( job_id: u64, extent_id: u32, - extent_size: u64, - ) { - } - fn extent__flush__file__done( - job_id: u64, - extent_id: u32, - extent_size: u64, - ) { - } - fn extent__flush__collect__hashes__start( - job_id: u64, - extent_id: u32, - num_dirty: u64, + num_dirty_blocks: u64, ) { } + fn extent__flush__done(job_id: u64, extent_id: u32) {} + fn extent__flush__file__start(job_id: u64, extent_id: u32) {} + fn extent__flush__file__done(job_id: u64, extent_id: u32) {} + fn extent__flush__collect__hashes__start(job_id: u64, extent_id: u32) {} fn extent__flush__collect__hashes__done( job_id: u64, extent_id: u32, num_rehashed: u64, ) { } - fn extent__flush__sqlite__insert__start( - job_id: u64, - extent_id: u32, - extent_size: u64, - ) { - } - fn extent__flush__sqlite__insert__done( - _job_id: u64, - _extent_id: u32, - extent_size: u64, - ) { - } + fn extent__flush__sqlite__insert__start(job_id: u64, extent_id: u32) {} + fn extent__flush__sqlite__insert__done(job_id: u64, extent_id: u32) {} fn extent__write__start(job_id: u64, extent_id: u32, n_blocks: u64) {} fn extent__write__done(job_id: u64, extent_id: u32, n_blocks: u64) {} fn extent__write__get__hashes__start( diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index 5c06a8e1f..7e8a1b5e9 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -114,6 +114,7 @@ pub struct Region { log: Logger, /// Thread pool for doing long-running CPU work outside the Tokio runtime + #[allow(unused)] pool: rayon::ThreadPool, } @@ -871,7 +872,7 @@ impl Region { */ #[instrument] pub(crate) fn region_flush_extent< - I: Into + Debug, + I: Into + Copy + Clone + Debug, >( &mut self, eid: ExtentId, @@ -934,49 +935,9 @@ impl Region { } }; - // This is a bit sneaky: we want to perform each flush in a separate - // task for *parallelism*, but can't call `self.get_opened_extent_mut` - // multiple times. In addition, we can't use the tokio thread pool to - // spawn a task, because that requires a 'static lifetime, which we - // can't get from a borrowed Extent. - // - // We'll combine two tricks to work around the issue: - // - Do the work in the rayon thread pool instead of using tokio tasks - // - Carefully walk self.extents.as_mut_slice() to mutably borrow - // multiple at the same time. - let mut results = vec![Ok(()); dirty_extents.len()]; - let log = self.log.clone(); - run_blocking(|| { - let mut slice_start = 0; - let mut slice = self.extents.as_mut_slice(); - self.pool.scope(|s| { - for (eid, r) in dirty_extents.iter().zip(results.iter_mut()) { - let next = eid.0 - slice_start; - slice = &mut slice[next as usize..]; - let (extent, rest) = slice.split_first_mut().unwrap(); - let ExtentState::Opened(extent) = extent else { - panic!("can't flush closed extent"); - }; - slice = rest; - slice_start += next + 1; - let log = log.clone(); - s.spawn(move |_| { - *r = - extent.flush(flush_number, gen_number, job_id, &log) - }); - } - }) - }); - + self.flush_extents(&dirty_extents, flush_number, gen_number, job_id)?; cdt::os__flush__done!(|| job_id.0); - for result in results { - // If any extent flush failed, then return that as an error. Because - // the results were all collected above, each extent flush has - // completed at this point. - result?; - } - // Now everything has succeeded, we can remove these extents from the // flush candidates match extent_limit { @@ -1053,6 +1014,138 @@ impl Region { Ok(()) } + #[cfg(not(feature = "omicron-build"))] + #[allow(clippy::unused_async)] + fn flush_extents( + &mut self, + dirty_extents: &BTreeSet, + flush_number: u64, + gen_number: u64, + job_id: JobId, + ) -> Result<()> { + // This is a bit sneaky: we want to perform each flush in a separate + // task for *parallelism*, but can't call `self.get_opened_extent_mut` + // multiple times. In addition, we can't use the tokio thread pool to + // spawn a task, because that requires a 'static lifetime, which we + // can't get from a borrowed Extent. + // + // We'll combine two tricks to work around the issue: + // - Do the work in the rayon thread pool instead of using tokio tasks + // - Carefully walk self.extents.as_mut_slice() to mutably borrow + // multiple at the same time. + let mut results = vec![Ok(()); dirty_extents.len()]; + let log = self.log.clone(); + run_blocking(|| { + let mut slice_start = 0; + let mut slice = self.extents.as_mut_slice(); + self.pool.scope(|s| { + for (eid, r) in dirty_extents.iter().zip(results.iter_mut()) { + let next = eid.0 - slice_start; + slice = &mut slice[next as usize..]; + let (extent, rest) = slice.split_first_mut().unwrap(); + let ExtentState::Opened(extent) = extent else { + panic!("can't flush closed extent"); + }; + slice = rest; + slice_start += next + 1; + let log = log.clone(); + s.spawn(move |_| { + *r = + extent.flush(flush_number, gen_number, job_id, &log) + }); + } + }) + }); + + for result in results { + // If any extent flush failed, then return that as an error. Because + // the results were all collected above, each extent flush has + // completed at this point. + result?; + } + Ok(()) + } + + /// Flush extents using the FIOFFS ioctl + /// + /// If using `omicron-build`, then we're running on illumos and the + /// region is backed by a ZFS dataset. Issue a `_FIOFFS` call, which + /// will result in a `zfs_sync` to the entire region dataset. + /// + /// The codepath is roughly as follows + /// - The IOCTL is received by `zfs_ioctl` through `zfs_dvnodeops_template` + /// - `zfs_ioctl` calls `zfs_sync(vp->v_vfsp, 0, cred)` + /// - `zfs_sync` calls `zil_commit(zfsvfs->z_log, 0)` if `zfsvfs->z_log` is + /// present; it should always be present, because it's only set to `NULL` + /// during `zfsvfs_teardown`. + /// + /// Quoth the `zil_commit` docstring: + /// + /// > If "foid" is zero, this means all "synchronous" and "asynchronous" + /// > txs, for all objects in the dataset, will be committed to stable + /// > storage prior to zil_commit() returning. + /// + /// The extents all live within the same dataset, so this is what we want! + #[cfg(feature = "omicron-build")] + fn flush_extents( + &mut self, + dirty_extents: &BTreeSet, + flush_number: u64, + gen_number: u64, + job_id: JobId, + ) -> Result<(), CrucibleError> { + #[cfg(not(target_os = "illumos"))] + compile_error!("FIOFFS-based flush is only supported on illumos"); + + // Prepare to flush extents by writing metadata to files (if needed) + // + // Some extents may not _actually_ be dirty, so we'll skip them here + let mut flushed_extents = vec![]; + let log = self.log.clone(); + for eid in dirty_extents { + let extent = self.get_opened_extent_mut(*eid); + if extent.pre_flush(flush_number, gen_number, job_id, &log)? { + flushed_extents.push(*eid); + } else { + warn!( + self.log, + "extent {eid} was in dirty_extents but not actually dirty" + ); + } + } + + // Send the ioctl! + use std::os::fd::AsRawFd; + // Open the region's mountpoint + let dir = self.dir.clone(); + // "file system flush", defined in illumos' sys/filio.h + const FIOFFS_MAGIC: u8 = b'f'; + const FIOFFS_TYPE_MODE: u8 = 66; + nix::ioctl_none!(zfs_fioffs, FIOFFS_MAGIC, FIOFFS_TYPE_MODE); + + // Do the flush in a worker thread if possible, to avoid blocking the + // main tokio thread pool. + run_blocking(move || -> Result<(), CrucibleError> { + let file = File::open(&dir)?; + let rc = unsafe { zfs_fioffs(file.as_raw_fd()) }; + if let Err(e) = rc { + let e: std::io::Error = e.into(); + Err(CrucibleError::from(e)) + } else { + Ok(()) + } + })?; + + // After the bits have been committed to durable storage, execute any + // post flush routine that needs to happen + for eid in flushed_extents { + let extent = self.get_opened_extent_mut(eid); + extent.post_flush(flush_number, gen_number, job_id)?; + } + + Ok(()) + } + /** * Create the copy directory for this extent. */