Skip to content

Commit

Permalink
Use FIOFFS instead of fsync (on supported platforms)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Aug 1, 2024
1 parent 07cb9ed commit 955d5f2
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ itertools = "0.12.1"
libc = "0.2"
mime_guess = "2.0.4"
nbd = "0.3.1"
nix = { version = "0.28", features = [ "feature", "uio" ] }
nix = { version = "0.28", features = [ "feature", "uio", "ioctl" ] }
num_enum = "0.7"
num-derive = "0.4"
num-traits = "0.2"
Expand Down
1 change: 1 addition & 0 deletions downstairs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ asm = ["usdt/asm"]
default = []
zfs_snapshot = []
integration-tests = [] # Enables creating SQLite volumes
omicron-build = [] # Uses FIOFSS for flushes instead of fsync
78 changes: 72 additions & 6 deletions downstairs/src/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,45 @@ pub(crate) trait ExtentInner: Send + Sync + Debug {
fn flush_number(&self) -> Result<u64, CrucibleError>;
fn dirty(&self) -> Result<bool, CrucibleError>;

fn flush(
/// Performs any metadata updates needed before a flush
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError>;

/// Syncs all relevant data to persistant storage
fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError>;

/// Performs any metadata updates after syncing data to persistent storage
fn post_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError>;

/// Performs a full flush (pre/inner/post)
///
/// This is only exposed for the sake of unit testing; normal code should
/// use the fine-grained functions and be forced to consider performance.
#[cfg(test)]
fn flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.pre_flush(new_flush, new_gen, job_id)?;
self.flush_inner(job_id)?;
self.post_flush(new_flush, new_gen, job_id)?;
Ok(())
}

fn read(
&mut self,
job_id: JobId,
Expand Down Expand Up @@ -578,22 +610,25 @@ impl Extent {
Ok(())
}

#[instrument]
pub(crate) fn flush<I: Into<JobOrReconciliationId> + Debug>(
/// Prepares for a flush
///
/// Returns `false` if we should skip the flush (because this extent is not
/// dirty), or `true` if we should proceed.
pub(crate) fn pre_flush<I: Into<JobOrReconciliationId> + Debug>(
&mut self,
new_flush: u64,
new_gen: u64,
id: I, // only used for logging
log: &Logger,
) -> Result<(), CrucibleError> {
) -> Result<bool, CrucibleError> {
let job_id: JobOrReconciliationId = id.into();

if !self.inner.dirty()? {
/*
* If we have made no writes to this extent since the last flush,
* we do not need to update the extent on disk
*/
return Ok(());
return Ok(false);
}

// Read only extents should never have the dirty bit set. If they do,
Expand All @@ -604,7 +639,38 @@ impl Extent {
crucible_bail!(ModifyingReadOnlyRegion);
}

self.inner.flush(new_flush, new_gen, job_id)
self.inner.pre_flush(new_flush, new_gen, job_id)?;
Ok(true)
}

/// Performs post-flush cleanup
pub(crate) fn post_flush<I: Into<JobOrReconciliationId> + Debug>(
&mut self,
new_flush: u64,
new_gen: u64,
id: I, // only used for logging
) -> Result<(), CrucibleError> {
let job_id: JobOrReconciliationId = id.into();
self.inner.post_flush(new_flush, new_gen, job_id)
}

/// Flushes this extent if it is dirty
#[instrument]
pub(crate) fn flush<
I: Into<JobOrReconciliationId> + Debug + Copy + Clone,
>(
&mut self,
new_flush: u64,
new_gen: u64,
id: I, // only used for logging
log: &Logger,
) -> Result<(), CrucibleError> {
if !self.pre_flush(new_flush, new_gen, id, log)? {
return Ok(());
}
self.inner.flush_inner(id.into())?;
self.post_flush(new_flush, new_gen, id)?;
Ok(())
}

pub fn get_meta_info(&self) -> ExtentMeta {
Expand Down
33 changes: 19 additions & 14 deletions downstairs/src/extent_inner_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,12 @@ impl ExtentInner for RawInner {
Ok(ExtentReadResponse { data: buf, blocks })
}

fn flush(
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
if !self.dirty()? {
/*
* If we have made no writes to this extent since the last flush,
* we do not need to update the extent on disk
*/
return Ok(());
}

cdt::extent__flush__start!(|| {
(job_id.get(), self.extent_number.0, 0)
});
Expand All @@ -409,10 +401,17 @@ impl ExtentInner for RawInner {
// operation atomic.
self.set_flush_number(new_flush, new_gen)?;

Ok(())
}

fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Now, we fsync to ensure data is flushed to disk. It's okay to crash
// before this point, because setting the flush number is atomic.
cdt::extent__flush__file__start!(|| {
(job_id.get(), self.extent_number.0, 0)
(job_id.get(), self.extent_number.0)
});
if let Err(e) = self.file.sync_all() {
/*
Expand All @@ -425,9 +424,17 @@ impl ExtentInner for RawInner {
}
self.context_slot_dirty.fill(0);
cdt::extent__flush__file__done!(|| {
(job_id.get(), self.extent_number.0, 0)
(job_id.get(), self.extent_number.0)
});
Ok(())
}

fn post_flush(
&mut self,
_new_flush: u64,
_new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Check for fragmentation in the context slots leading to worse
// performance, and defragment if that's the case.
let extra_syscalls_per_rw = self
Expand All @@ -442,9 +449,7 @@ impl ExtentInner for RawInner {
Ok(())
};

cdt::extent__flush__done!(|| {
(job_id.get(), self.extent_number.0, 0)
});
cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number.0) });

r
}
Expand Down
59 changes: 46 additions & 13 deletions downstairs/src/extent_inner_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,32 @@ impl ExtentInner for SqliteInner {
self.0.lock().unwrap().dirty()
}

fn flush(
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.0.lock().unwrap().flush(new_flush, new_gen, job_id)
self.0.lock().unwrap().pre_flush(new_flush, new_gen, job_id)
}

fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.0.lock().unwrap().flush_inner(job_id)
}

fn post_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.0
.lock()
.unwrap()
.post_flush(new_flush, new_gen, job_id)
}

fn read(
Expand Down Expand Up @@ -194,10 +213,10 @@ impl SqliteMoreInner {
Ok(self.dirty.get())
}

fn flush(
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
_new_flush: u64,
_new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Used for profiling
Expand All @@ -207,12 +226,19 @@ impl SqliteMoreInner {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
});

Ok(())
}

fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
/*
* We must first fsync to get any outstanding data written to disk.
* This must be done before we update the flush number.
*/
cdt::extent__flush__file__start!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});
if let Err(e) = self.file.sync_all() {
/*
Expand All @@ -225,9 +251,18 @@ impl SqliteMoreInner {
);
}
cdt::extent__flush__file__done!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

Ok(())
}

fn post_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Clear old block contexts. In order to be crash consistent, only
// perform this after the extent fsync is done. For each block
// written since the last flush, remove all block context rows where
Expand All @@ -237,7 +272,7 @@ impl SqliteMoreInner {
// file is rehashed, since in that case we don't have that luxury.

cdt::extent__flush__collect__hashes__start!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

// Rehash any parts of the file that we *may have written* data to since
Expand All @@ -250,7 +285,7 @@ impl SqliteMoreInner {
});

cdt::extent__flush__sqlite__insert__start!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

// We put all of our metadb updates into a single transaction to
Expand All @@ -265,7 +300,7 @@ impl SqliteMoreInner {
)?;

cdt::extent__flush__sqlite__insert__done!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

self.set_flush_number(new_flush, new_gen)?;
Expand All @@ -275,9 +310,7 @@ impl SqliteMoreInner {
// Finally, reset the file's seek offset to 0
self.file.seek(SeekFrom::Start(0))?;

cdt::extent__flush__done!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
});
cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number.0) });
Ok(())
}

Expand Down
36 changes: 8 additions & 28 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,44 +741,24 @@ pub mod cdt {
fn submit__writeunwritten__done(_: u64) {}
fn submit__write__done(_: u64) {}
fn submit__flush__done(_: u64) {}
fn extent__flush__start(job_id: u64, extent_id: u32, extent_size: u64) {}
fn extent__flush__done(job_id: u64, extent_id: u32, extent_size: u64) {}
fn extent__flush__file__start(
fn extent__flush__start(
job_id: u64,
extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__file__done(
job_id: u64,
extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__collect__hashes__start(
job_id: u64,
extent_id: u32,
num_dirty: u64,
num_dirty_blocks: u64,
) {
}
fn extent__flush__done(job_id: u64, extent_id: u32) {}
fn extent__flush__file__start(job_id: u64, extent_id: u32) {}
fn extent__flush__file__done(job_id: u64, extent_id: u32) {}
fn extent__flush__collect__hashes__start(job_id: u64, extent_id: u32) {}
fn extent__flush__collect__hashes__done(
job_id: u64,
extent_id: u32,
num_rehashed: u64,
) {
}
fn extent__flush__sqlite__insert__start(
job_id: u64,
extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__sqlite__insert__done(
_job_id: u64,
_extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__sqlite__insert__start(job_id: u64, extent_id: u32) {}
fn extent__flush__sqlite__insert__done(job_id: u64, extent_id: u32) {}
fn extent__write__start(job_id: u64, extent_id: u32, n_blocks: u64) {}
fn extent__write__done(job_id: u64, extent_id: u32, n_blocks: u64) {}
fn extent__write__get__hashes__start(
Expand Down
Loading

0 comments on commit 955d5f2

Please sign in to comment.