diff --git a/betree/Cargo.toml b/betree/Cargo.toml index a93612b2..e634df15 100644 --- a/betree/Cargo.toml +++ b/betree/Cargo.toml @@ -70,7 +70,7 @@ criterion = "0.3" tempfile = "3.6.0" [features] -default = ["init_env_logger", "figment_config"] +default = ["init_env_logger", "figment_config", "nvm"] # unlock unstable API for consumption by bectl and other debugging tools internal-api = [] diff --git a/betree/include/betree.h b/betree/include/betree.h index d7131aec..364815f9 100644 --- a/betree/include/betree.h +++ b/betree/include/betree.h @@ -1,7 +1,7 @@ #ifndef betree_h #define betree_h -/* Generated with cbindgen:0.24.3 */ +/* Generated with cbindgen:0.24.5 */ /* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */ @@ -467,6 +467,12 @@ int betree_object_read_at(struct obj_t *obj, unsigned long *n_read, struct err_t **err); +/** + * Return the objects size in bytes. If the size could not be determined + * it is assumed the object is zero-sized. + */ +unsigned long betree_object_size(const struct obj_t *obj, struct err_t **err); + /** * Try to write `buf_len` bytes from `buf` into `obj`, starting at `offset` bytes into the objects * data. diff --git a/betree/pmem-hashmap/Cargo.toml b/betree/pmem-hashmap/Cargo.toml index 59bf49d1..7aa043af 100644 --- a/betree/pmem-hashmap/Cargo.toml +++ b/betree/pmem-hashmap/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" bindgen = "0.65" [dependencies] +core_affinity = "0.8.0" errno = "0.3.1" thiserror = "1.0.40" twox-hash = "1.6.3" diff --git a/betree/pmem-hashmap/c_impl/hashmap_tx.c b/betree/pmem-hashmap/c_impl/hashmap_tx.c index 9e17f92c..f883e0c6 100644 --- a/betree/pmem-hashmap/c_impl/hashmap_tx.c +++ b/betree/pmem-hashmap/c_impl/hashmap_tx.c @@ -410,3 +410,12 @@ int hm_tx_cmd(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, unsigned cmd, return -EINVAL; } } + +int empty_constr(PMEMobjpool *pop, void *ptr, void *arg) { return 0; } + +int haura_alloc(PMEMobjpool *pop, PMEMoid *oidp, size_t size, uint64_t type_num, + void *arg) { + return pmemobj_alloc(pop, oidp, size, type_num, &empty_constr, arg); +} + +void *haura_direct(PMEMoid oid) { return pmemobj_direct(oid); } diff --git a/betree/pmem-hashmap/c_impl/hashmap_tx.h b/betree/pmem-hashmap/c_impl/hashmap_tx.h index 8b6f6b87..0124bdfa 100644 --- a/betree/pmem-hashmap/c_impl/hashmap_tx.h +++ b/betree/pmem-hashmap/c_impl/hashmap_tx.h @@ -13,6 +13,12 @@ #define HASHMAP_TX_TYPE_OFFSET 1004 #endif +int empty_constr(PMEMobjpool *pop, void *ptr, void *arg); + +int haura_alloc(PMEMobjpool *pop, PMEMoid *oidp, size_t size, uint64_t type_num, + void *arg); +void *haura_direct(PMEMoid oid); + struct hashmap_tx; TOID_DECLARE(struct hashmap_tx, HASHMAP_TX_TYPE_OFFSET + 0); diff --git a/betree/pmem-hashmap/src/allocator.rs b/betree/pmem-hashmap/src/allocator.rs new file mode 100644 index 00000000..27b462e6 --- /dev/null +++ b/betree/pmem-hashmap/src/allocator.rs @@ -0,0 +1,312 @@ +use super::*; +use errno::errno; +use std::alloc::{AllocError, Allocator}; +use std::{ffi::c_void, ptr::NonNull, sync::Arc}; +use thiserror::Error; + +// A friendly persistent memory allocator. +#[derive(Clone, Debug)] +pub struct Pal { + pool: Arc>, +} + +// Parallel access should be fine, but can be exploited. TODO: Add safeguard for this? +unsafe impl Sync for Pal {} +// No thread-local data +unsafe impl Send for Pal {} + +impl Drop for Pal { + fn drop(&mut self) { + // there should exist no further reference to this resources otherwise we risk some invalid fetches + if Arc::strong_count(&self.pool) == 0 && Arc::weak_count(&self.pool) == 0 { + self.close() + } + } +} + +impl Into for PalError { + fn into(self) -> AllocError { + AllocError + } +} + +unsafe impl Allocator for Pal { + fn allocate(&self, layout: std::alloc::Layout) -> Result, AllocError> { + let ptr = self.allocate(layout.size()).map_err(|_| AllocError)?; + Ok(NonNull::new(unsafe { + core::slice::from_raw_parts_mut(ptr.load() as *mut u8, layout.size()) + }) + .ok_or_else(|| AllocError)?) + } + + unsafe fn deallocate(&self, ptr: NonNull, _layout: std::alloc::Layout) { + let mut oid = unsafe { pmemobj_oid(ptr.as_ptr() as *const c_void) }; + unsafe { pmemobj_free(&mut oid) } + } +} + +#[inline] +fn oid_is_null(oid: PMEMoid) -> bool { + oid.off == 0 +} + +#[derive(Debug, Error)] +pub enum PalError { + #[error("Opening failed (`{0}`). This error originates in libpmemobj.")] + OpenFailed(String), + #[error("Allocation failed (`{0}`). This error originates in libpmemobj.")] + AllocationFailed(String), + #[error("Null")] + NullEncountered, + #[error("Could not deduce path: `{0}`")] + InvalidPath(#[from] std::ffi::NulError), +} + +// A friendly persistent pointer. Useless without the according handle to the +// original arena. +#[derive(Debug, Clone, Copy)] +pub struct PalPtr { + inner: PMEMoid, + size: usize, +} + +impl PalPtr { + /// Translate this persistent ptr to a volatile one. + pub fn load(&self) -> *mut c_void { + unsafe { haura_direct(self.inner) } + } + + /// Copy a range of bytes behind this pointer to a given buffer. Data is + /// copied until the allocated area is read entirely or the given buffer is + /// filled entirely. + pub fn copy_to(&self, other: &mut [u8], arena: &Pal) { + unsafe { + pmemobj_memcpy( + arena.pool.as_ptr(), + other.as_mut_ptr() as *mut c_void, + self.load(), + self.size.min(other.len()), + PMEMOBJ_F_MEM_NOFLUSH, + ) + }; + } + + /// Copy a range of bytes to the location of this pointer. Data is copied + /// until the allocated area is filled or the given buffer ends. + pub fn copy_from(&self, other: &[u8], arena: &Pal) { + unsafe { + pmemobj_memcpy( + arena.pool.as_ptr(), + self.load(), + other.as_ptr() as *const c_void, + self.size.min(other.len()), + PMEMOBJ_F_MEM_NONTEMPORAL, + ); + }; + } + + /// Deallocate this object. Required if this value is no longer needed. + /// There is *no* automatic deallocation logic. + pub fn free(mut self) { + unsafe { pmemobj_free(&mut self.inner) } + } +} + +// TODO: Impl Deref with typization? + +impl Pal { + /// Open an existing file representing an arena. + pub fn open>(path: P) -> Result { + let pobjpool = { + let path = + std::ffi::CString::new(path.into().to_string_lossy().into_owned())?.into_raw(); + unsafe { pmemobj_open(path, std::ptr::null()) } + }; + Self::new(pobjpool) + } + + /// Create a new arena on persistent memory. + pub fn create>( + path: P, + bytes: usize, + permissions: u32, + ) -> Result { + let pobjpool = { + let path = + std::ffi::CString::new(path.into().to_string_lossy().into_owned())?.into_raw(); + unsafe { pmemobj_create(path, std::ptr::null(), bytes, permissions) } + }; + Self::new(pobjpool) + } + + fn new(pool: *mut pmemobjpool) -> Result { + NonNull::new(pool) + .map(|valid| Pal { + pool: Arc::new(valid), + }) + .ok_or_else(|| { + let err = unsafe { std::ffi::CString::from_raw(pmemobj_errormsg() as *mut i8) }; + let err_msg = format!( + "Failed to create memory pool. filepath: {}", + err.to_string_lossy() + ); + PalError::OpenFailed(err_msg) + }) + } + + /// Close the allocation arena. Only do this when all values you want to be + /// lost are dropped and all values you want to be remembered are + /// (ironically) std::mem::forget(ten). + pub fn close(&mut self) { + unsafe { pmemobj_close(self.pool.as_ptr()) }; + } + + /// Allocate an area of size in the persistent memory. + pub fn allocate(&self, size: usize) -> Result { + let mut oid = std::mem::MaybeUninit::::uninit(); + if unsafe { + haura_alloc( + self.pool.as_ptr(), + oid.as_mut_ptr(), + size, + 0, // BOGUS + std::ptr::null_mut(), + ) != 0 + } { + let err = unsafe { std::ffi::CString::from_raw(pmemobj_errormsg() as *mut i8) }; + let err_msg = format!( + "Failed to create memory pool. filepath: {}", + err.to_string_lossy() + ); + return Err(PalError::AllocationFailed(err_msg)); + } + + if unsafe { oid_is_null(oid.assume_init_read()) } { + return Err(PalError::NullEncountered); + } + Ok(PalPtr { + inner: unsafe { oid.assume_init() }, + size, + }) + } + + /// Access and allocate the root object if needed. The root object may be + /// extended by calling this function again with a larger value. It may + /// never be shrunken. + /// + /// If called with size 0 an existing root object might be opened, if none + /// exists EINVAL is returned. + pub fn root(&self, size: usize) -> Result { + let oid = unsafe { pmemobj_root(self.pool.as_ptr(), size) }; + if oid_is_null(oid) { + return Err(PalError::AllocationFailed(format!("{}", errno()))); + } + Ok(PalPtr { inner: oid, size }) + } + + /// Return the maximum size of the current root object. + pub fn root_size(&self) -> usize { + unsafe { pmemobj_root_size(self.pool.as_ptr()) } + } +} + +#[cfg(test)] +mod tests { + + extern crate alloc; + + use std::{collections::BTreeMap, path::PathBuf, process::Command}; + + use alloc::collections::vec_deque::VecDeque; + use tempfile::Builder; + + use super::*; + + struct TestFile(PathBuf); + + impl TestFile { + pub fn new() -> Self { + TestFile( + Builder::new() + .tempfile() + .expect("Could not get tmpfile") + .path() + .to_path_buf(), + ) + } + + pub fn path(&self) -> &PathBuf { + &self.0 + } + } + impl Drop for TestFile { + fn drop(&mut self) { + if !Command::new("rm") + .arg(self.0.to_str().expect("Could not pass tmpfile")) + .output() + .expect("Could not delete") + .status + .success() + { + eprintln!("Could not delete tmpfile"); + } + } + } + + #[test] + fn alloc_vec_deque() { + let file = TestFile::new(); + const SIZE: usize = 64 * 1024 * 1024; + let pal = Pal::create(file.path(), 128 * 1024 * 1024, 0o666).unwrap(); + let mut list: VecDeque = VecDeque::with_capacity_in(SIZE, pal); + for _ in 0..SIZE { + list.push_back(0); + } + } + + #[test] + fn alloc_btree_map() { + let file = TestFile::new(); + { + let mut pal = Pal::create(file.path(), 128 * 1024 * 1024, 0o666).unwrap(); + let mut map: BTreeMap = BTreeMap::new_in(pal.clone()); + let root_ptr = pal + .root(std::mem::size_of::>()) + .unwrap(); + unsafe { + (root_ptr.load() as *mut BTreeMap) + .copy_from(&map, std::mem::size_of::>()) + }; + std::mem::forget(map); + let map: &mut BTreeMap = unsafe { + (root_ptr.load() as *mut BTreeMap) + .as_mut() + .unwrap() + }; + for id in 0..100 { + map.insert(id, id); + } + for id in 100..0 { + assert_eq!(map.get(&id), Some(&id)); + } + std::mem::forget(map); + pal.close(); + } + { + let mut pal = Pal::open(file.path()).unwrap(); + let root_ptr = pal + .root(std::mem::size_of::>()) + .unwrap(); + let map: &mut BTreeMap = unsafe { + (root_ptr.load() as *mut BTreeMap) + .as_mut() + .unwrap() + }; + for id in 100..0 { + assert_eq!(map.get(&id), Some(&id)); + } + std::mem::forget(map); + pal.close(); + } + } +} diff --git a/betree/pmem-hashmap/src/bin/bench_pal.rs b/betree/pmem-hashmap/src/bin/bench_pal.rs new file mode 100644 index 00000000..144d2531 --- /dev/null +++ b/betree/pmem-hashmap/src/bin/bench_pal.rs @@ -0,0 +1,132 @@ +use std::{process::Command, sync::Arc}; + +use pmem_hashmap::allocator::{Pal, PalPtr}; + +fn bench_pal_sub(rank: usize, barrier: Arc, tx: std::sync::mpsc::Sender) { + Command::new("rm") + .arg(format!("/home/wuensche/pmem/foobar{rank}")) + .status() + .unwrap(); + + let pal = + Arc::new(Pal::create(format!("/home/wuensche/pmem/foobar{rank}"), SIZE, 0o666).unwrap()); + + enum CMD { + Read(Vec), + Write(Vec), + Wait, + } + + // Initiate workers + let channels = (0..WORKERS).map(|_| std::sync::mpsc::sync_channel::(0)); + + let mut threads: Vec<_> = channels + .enumerate() + .map(|(id, (tx, rx))| { + let hdl = Arc::clone(&pal); + ( + vec![], + tx, + std::thread::spawn(move || { + assert!(core_affinity::set_for_current(core_affinity::CoreId { id })); + let bar = hdl; + let mut buf = vec![42u8; BS]; + while let Ok(cmd) = rx.recv() { + match cmd { + CMD::Write(ptrs) => { + for ptr in ptrs.iter() { + ptr.copy_from(&buf, &bar); + } + } + CMD::Read(ptrs) => { + for ptr in ptrs.iter() { + ptr.copy_to(&mut buf, &bar); + } + } + CMD::Wait => { + // NO-OP + } + } + } + }), + ) + }) + .collect(); + + // Create all allocations. + for id in 0..ITER { + let ptr = pal.allocate(BS).unwrap(); + threads[id % WORKERS].0.push(ptr.clone()); + } + for id in 0..WORKERS { + threads[id % WORKERS] + .1 + .send(CMD::Write(threads[id % WORKERS].0.clone())) + .unwrap(); + } + + barrier.wait(); + let start = std::time::Instant::now(); + + for (_, tx, _) in threads.iter() { + tx.send(CMD::Wait).unwrap(); + } + tx.send( + (ITER as f32 * BS as f32 / 1024f32 / 1024f32 / 1024f32) / start.elapsed().as_secs_f32(), + ) + .unwrap(); + + barrier.wait(); + let start = std::time::Instant::now(); + for id in 0..WORKERS { + threads[id % WORKERS] + .1 + .send(CMD::Read(threads[id].0.clone())) + .unwrap(); + } + for (_, tx, _) in threads.iter() { + tx.send(CMD::Wait).unwrap(); + } + tx.send( + (ITER as f32 * BS as f32 / 1024f32 / 1024f32 / 1024f32) / start.elapsed().as_secs_f32(), + ) + .unwrap(); + barrier.wait(); + for (_, s, thread) in threads.into_iter() { + drop(s); + thread.join().unwrap(); + } +} + +const JOBS: usize = 1; +const BS: usize = 4 * 1024 * 1024; +const SIZE: usize = 64 * 1024 * 1024 * 1024; +const EFFECTIVE_SIZE: usize = 32 * 1024 * 1024 * 1024; +const ITER: usize = EFFECTIVE_SIZE / BS; +const WORKERS: usize = 2; + +fn main() { + let (tx, rx) = std::sync::mpsc::channel(); + let barrier = Arc::new(std::sync::Barrier::new(JOBS)); + let jobs: Vec<_> = (0..JOBS) + .map(|rank| { + let foo = Arc::clone(&barrier); + let bar = tx.clone(); + std::thread::spawn(move || bench_pal_sub(rank, foo, bar)) + }) + .collect(); + let mut bw: f32 = 0f32; + for _ in 0..JOBS { + bw += rx.recv().unwrap(); + } + println!("Write: Achieved {} GiB/s", bw,); + + let mut bw: f32 = 0f32; + for _ in 0..JOBS { + bw += rx.recv().unwrap(); + } + println!("Read: Achieved {} GiB/s", bw,); + for thread in jobs.into_iter() { + thread.join().unwrap(); + } +} diff --git a/betree/pmem-hashmap/src/bin/bench_pmap.rs b/betree/pmem-hashmap/src/bin/bench_pmap.rs new file mode 100644 index 00000000..36b1615c --- /dev/null +++ b/betree/pmem-hashmap/src/bin/bench_pmap.rs @@ -0,0 +1,66 @@ +use std::process::Command; + +use pmem_hashmap::PMap; + +const BS: usize = 4 * 1024 * 1024; +const SIZE: usize = 64 * 1024 * 1024 * 1024; +const EFFECTIVE_SIZE: usize = 32 * 1024 * 1024 * 1024; +const ITER: usize = EFFECTIVE_SIZE / BS; +const WORKERS: usize = 2; + +fn main() { + let _ = Command::new("rm").arg("/home/wuensche/pmem/bar").status(); + let mut pmap = PMap::create("/home/wuensche/pmem/bar", SIZE).unwrap(); + + enum CMD { + Read(usize), + Wait, + } + // Initiate workers + let channels = (0..WORKERS).map(|_| std::sync::mpsc::sync_channel::(0)); + + let start = std::time::Instant::now(); + let buf = vec![42u8; BS]; + for id in 0..ITER { + pmap.insert(id, &buf).unwrap(); + } + println!( + "Write: Achieved {} GiB/s", + (ITER as f32 * BS as f32 / 1024f32 / 1024f32 / 1024f32) / start.elapsed().as_secs_f32() + ); + + let pmap = std::sync::Arc::new(pmap); + + let threads: Vec<_> = channels + .map(|(tx, rx)| { + let foo = pmap.clone(); + ( + tx, + std::thread::spawn(move || { + let mut buf = vec![42u8; BS]; + while let Ok(msg) = rx.recv() { + match msg { + CMD::Read(id) => buf.copy_from_slice(foo.get(id).unwrap()), + CMD::Wait => {} + } + } + }), + ) + }) + .collect(); + let start = std::time::Instant::now(); + for id in 0..ITER { + threads[id % WORKERS].0.send(CMD::Read(id)).unwrap(); + } + for id in 0..WORKERS { + threads[id % WORKERS].0.send(CMD::Wait).unwrap(); + } + for (s, thread) in threads.into_iter() { + drop(s); + thread.join().unwrap(); + } + println!( + "Read: Achieved {} GiB/s", + (ITER as f32 * BS as f32 / 1024f32 / 1024f32 / 1024f32) / start.elapsed().as_secs_f32() + ); +} diff --git a/betree/pmem-hashmap/src/bin/bench_pmemobj.rs b/betree/pmem-hashmap/src/bin/bench_pmemobj.rs new file mode 100644 index 00000000..05856457 --- /dev/null +++ b/betree/pmem-hashmap/src/bin/bench_pmemobj.rs @@ -0,0 +1,126 @@ +use std::{mem::MaybeUninit, process::Command}; + +use pmem_hashmap::{ + haura_alloc, haura_direct, pmemobj_create, pmemobj_memcpy, pmemobj_memcpy_persist, PMEMoid, +}; + +const BS: usize = 4 * 1024 * 1024; +const SIZE: usize = 64 * 1024 * 1024 * 1024; +const EFFECTIVE_SIZE: usize = 32 * 1024 * 1024 * 1024; +const ITER: usize = EFFECTIVE_SIZE / BS; +const WORKERS: usize = 2; + +struct Taco(T); + +unsafe impl Send for Taco {} +unsafe impl Sync for Taco {} + +fn main() { + Command::new("rm") + .arg("/home/wuensche/pmem/baz") + .status() + .unwrap(); + let pobjpool = { + let path = std::ffi::CString::new("/home/wuensche/pmem/baz") + .unwrap() + .into_raw(); + unsafe { pmemobj_create(path, std::ptr::null(), SIZE, 0o666) } + }; + + enum CMD { + Read(MaybeUninit), + Write(MaybeUninit), + Wait, + } + + // Initiate workers + let channels = (0..WORKERS).map(|_| std::sync::mpsc::sync_channel::(0)); + + let threads: Vec<_> = channels + .enumerate() + .map(|(_id, (tx, rx))| { + let hdl = Taco(pobjpool.clone()); + ( + tx, + std::thread::spawn(move || { + let bar = hdl; + let mut buf = vec![42u8; BS]; + while let Ok(cmd) = rx.recv() { + match cmd { + CMD::Read(mut oid) => { + let foo = unsafe { haura_direct(*oid.as_mut_ptr()) }; + unsafe { + pmemobj_memcpy( + bar.0, + buf.as_mut_ptr() as *mut std::ffi::c_void, + foo, + BS, + 0, + ) + }; + } + CMD::Write(mut oid) => { + let foo = unsafe { haura_direct(*oid.as_mut_ptr()) }; + unsafe { + pmemobj_memcpy_persist( + bar.0, + foo, + buf.as_ptr() as *const std::ffi::c_void, + BS, + ) + }; + } + CMD::Wait => { + // NO-OP + } + } + } + }), + ) + }) + .collect(); + + let mut oids = Vec::with_capacity(ITER); + let start = std::time::Instant::now(); + for id in 0..ITER { + let mut oid = std::mem::MaybeUninit::::uninit(); + if unsafe { + haura_alloc( + pobjpool, + oid.as_mut_ptr(), + BS, + 0, // BOGUS + std::ptr::null_mut(), + ) != 0 + } { + panic!("Oh no, something went wrong.."); + } + assert!(unsafe { oid.assume_init_read().off != 0 }); + oids.push(oid.clone()); + threads[id % WORKERS].0.send(CMD::Write(oid)).unwrap(); + } + + for (tx, _) in threads.iter() { + tx.send(CMD::Wait).unwrap(); + } + println!( + "Write: Achieved {} GiB/s", + (ITER as f32 * BS as f32 / 1024f32 / 1024f32 / 1024f32) / start.elapsed().as_secs_f32() + ); + + let start = std::time::Instant::now(); + for id in 0..ITER { + threads[id % WORKERS].0.send(CMD::Read(oids[id])).unwrap(); + } + for (tx, _) in threads.iter() { + tx.send(CMD::Wait).unwrap(); + } + println!( + "Read: Achieved {} GiB/s", + (ITER as f32 * BS as f32 / 1024f32 / 1024f32 / 1024f32) / start.elapsed().as_secs_f32() + ); + for (s, thread) in threads.into_iter() { + drop(s); + thread.join().unwrap(); + } +} diff --git a/betree/pmem-hashmap/src/lib.rs b/betree/pmem-hashmap/src/lib.rs index 6a76b053..0b05d921 100644 --- a/betree/pmem-hashmap/src/lib.rs +++ b/betree/pmem-hashmap/src/lib.rs @@ -23,9 +23,13 @@ #![allow(non_upper_case_globals)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] +#![feature(allocator_api)] +#![feature(btreemap_alloc)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +pub mod allocator; + use std::{ ffi::c_int, hash::{Hash, Hasher}, @@ -75,6 +79,7 @@ pub struct PMap { /// We can guarantee that no thread-local shaenanigans are done within our /// library. unsafe impl Send for PMap {} +unsafe impl Sync for PMap {} impl PMap { /// Open an existing hashmap. Will fail if no hashmap has been created before. @@ -158,7 +163,7 @@ impl PMap { return Err(PMapError::AllocationError(format!("{}", errno::errno()))); } - let mut mv = unsafe { access_map_value(oid.assume_init()) }; + let mv = unsafe { access_map_value(oid.assume_init()) }; unsafe { (*mv).len = val.len() as u64; (*mv).buf.as_mut_slice(val.len()).copy_from_slice(val); @@ -189,7 +194,7 @@ impl PMap { } /// Raw "pre-hashed" access, which skips the first hashing round. - pub fn get_hashed(&mut self, k: u64) -> Result<&mut [u8], PMapError> { + pub fn get_hashed(&self, k: u64) -> Result<&mut [u8], PMapError> { let val = unsafe { hm_tx_get(self.pobjpool.as_ptr(), self.inner, k) }; if val.off == 0 { return Err(PMapError::DoesNotExist); @@ -200,7 +205,7 @@ impl PMap { } /// Return a given value from the hashmap. The key has to be valid - pub fn get(&mut self, key: K) -> Result<&mut [u8], PMapError> { + pub fn get(&self, key: K) -> Result<&mut [u8], PMapError> { let k = self.hash(key); self.get_hashed(k) } diff --git a/betree/pmem/Cargo.toml b/betree/pmem/Cargo.toml index e40e9a06..72910042 100644 --- a/betree/pmem/Cargo.toml +++ b/betree/pmem/Cargo.toml @@ -10,3 +10,6 @@ authors = ["Sajad Karim ", "Johannes WΓΌnsche Result<(), std::io::Error> { - let _ = pmem_memcpy_persist( + let _ = pmem_memcpy( self.ptr.as_ptr().add(offset), data.as_ptr() as *mut c_void, len, + PMEM_F_MEM_NONTEMPORAL, ); Ok(()) } diff --git a/betree/pmem/src/main.rs b/betree/pmem/src/main.rs index 676c01ae..071e13d7 100644 --- a/betree/pmem/src/main.rs +++ b/betree/pmem/src/main.rs @@ -1,93 +1,97 @@ +use std::sync::Arc; + use pmem::PMem; -const BUFFER_SIZE: usize = 4096; -const DEST_FILEPATH: &str = "/pmem0/pmempool0\0"; -const TEXT: &str = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. -Donec dictum, massa sit amet tempus blandit, mi purus suscipit arcu, a egestas -erat orci et ipsum. Phasellus vel urna non urna cursus imperdiet. Aliquam turpis -ex, maximus id tortor eget, tincidunt feugiat metus. Ut ultrices auctor massa, -quis convallis lectus vulputate et. Maecenas at mi orci. Donec id leo vitae -risus tempus imperdiet ut a elit. Mauris quis dolor urna. Mauris dictum enim vel -turpis aliquam tincidunt. Pellentesque et eros ac quam lobortis hendrerit non ut -nulla. Quisque maximus magna tristique risus lacinia, et facilisis erat -molestie. -Morbi eget sapien accumsan, rhoncus metus in, interdum libero. Nam gravida mi et -viverra porttitor. Sed malesuada odio semper sapien bibendum ornare. Curabitur -scelerisque lacinia ex, a rhoncus magna viverra eu. Maecenas sed libero vel ex -dictum congue at sed nulla. Lorem ipsum dolor sit amet, consectetur adipiscing -elit. Aliquam erat volutpat. Proin condimentum augue eu nulla consequat -efficitur. Vivamus sodales pretium erat, id iaculis risus pellentesque sit amet. -Integer tempus porta diam ac facilisis. Duis ex eros, mattis nec ultrices vel, -varius vel lectus. Proin varius sapien est, nec euismod ex varius nec. Quisque -in sem sit amet metus scelerisque ornare at a nisi. Maecenas ac scelerisque -metus. In ut velit placerat, fringilla eros non, semper risus. Cras sed ante -maximus, vestibulum nunc nec, rutrum leo. \0"; -const TEXT2: &str = "hello world!"; +const BUFFER_SIZE: usize = 4 * 1024; +const SIZE: usize = 64 * 1024 * 1024 * 1024; +const ITER: usize = SIZE / BUFFER_SIZE; +const JOBS: usize = 8; +const OPS_PER_JOB: usize = ITER / JOBS; +const REM_OPS: usize = ITER % JOBS; +enum CMD { + READ(usize), + WRITE(usize), + WAIT, +} fn basic_read_write_test() -> Result<(), std::io::Error> { - let pmem = match PMem::create(&DEST_FILEPATH, 64 * 1024 * 1024) { + let pmem = Arc::new(match PMem::create("/home/wuensche/pmem/foo", SIZE) { Ok(value) => value, - Err(_) => PMem::open(&DEST_FILEPATH)?, - }; - - // Writing the long text (TEXT1) - let mut text_array = [0u8; BUFFER_SIZE]; - TEXT.bytes() - .zip(text_array.iter_mut()) - .for_each(|(b, ptr)| *ptr = b); - unsafe { pmem.write(0, &text_array, TEXT.chars().count())? }; - - // Writing the short text (TEXT2) - TEXT2 - .bytes() - .zip(text_array.iter_mut()) - .for_each(|(b, ptr)| *ptr = b); - unsafe { pmem.write(TEXT.chars().count(), &text_array, TEXT2.chars().count())? }; - - // Reading the long text (TEXT1) - let mut buffer = vec![0; TEXT.chars().count()]; - pmem.read(0, &mut buffer, TEXT.chars().count())?; - - // Reading the short text (TEXT2) - let mut buffer2 = vec![0; TEXT2.chars().count()]; - pmem.read(TEXT.chars().count(), &mut buffer2, TEXT2.chars().count())?; - - // Writing the long text (TEXT1) starting offset 1000 - TEXT.bytes() - .zip(text_array.iter_mut()) - .for_each(|(b, ptr)| *ptr = b); - unsafe { pmem.write(1000, &text_array, TEXT.chars().count())? }; - - // Reading the recently text - let mut buffer3 = vec![0; TEXT.chars().count()]; - pmem.read(1000, &mut buffer3, TEXT.chars().count())?; - - drop(pmem); - - // Comparing the read text with the actual one - let read_string = match std::str::from_utf8(&buffer) { - Ok(string) => string, - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - - assert_eq!(TEXT, read_string); - - let read_string2 = match std::str::from_utf8(&buffer2) { - Ok(string) => string, - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - - assert_eq!(TEXT2, read_string2); - - let read_string3 = match std::str::from_utf8(&buffer3) { - Ok(string) => string, - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - - assert_eq!(TEXT, read_string3); + Err(_) => PMem::open("/home/wuensche/pmem/foo")?, + }); + + let threads: Vec<_> = (0..JOBS) + .map(|id| { + let p = Arc::clone(&pmem); + let (tx, rx) = std::sync::mpsc::sync_channel::(0); + ( + tx, + std::thread::spawn(move || { + assert!(core_affinity::set_for_current(core_affinity::CoreId { id: id })); + let mut buf = vec![0u8; BUFFER_SIZE]; + while let Ok(msg) = rx.recv() { + match msg { + CMD::READ(offset) => + { + for it in 0..OPS_PER_JOB { + p.read((it * BUFFER_SIZE) + (id * BUFFER_SIZE), &mut buf, BUFFER_SIZE).unwrap() + } + if (id < REM_OPS) { + p.read(JOBS * OPS_PER_JOB * BUFFER_SIZE + (id * BUFFER_SIZE), &mut buf, BUFFER_SIZE).unwrap() + } + + }, + CMD::WRITE(_) => unsafe { + for it in 0..OPS_PER_JOB { + p.write((it * BUFFER_SIZE) + (id * BUFFER_SIZE), &buf, BUFFER_SIZE).unwrap() + } + if (id < REM_OPS) { + p.write(JOBS * OPS_PER_JOB * BUFFER_SIZE + (id * BUFFER_SIZE), &buf, BUFFER_SIZE).unwrap() + } + }, + CMD::WAIT => {} + } + } + }), + ) + }) + .collect(); + + // Write + let start = std::time::Instant::now(); + for id in 0..JOBS { + threads[id] + .0 + .send(CMD::WRITE(0)) + .unwrap(); + } + for id in 0..JOBS { + threads[id % JOBS].0.send(CMD::WAIT).unwrap(); + } + + println!( + "Write: Achieved {} GiB/s", + SIZE as f32 / 1024f32 / 1024f32 / 1024f32 / start.elapsed().as_secs_f32() + ); + + // Read + let start = std::time::Instant::now(); + for id in 0..JOBS { + threads[id % JOBS] + .0 + .send(CMD::READ(0)) + .unwrap(); + } + for id in 0..JOBS { + threads[id % JOBS].0.send(CMD::WAIT).unwrap(); + } + + println!( + "Read: Achieved {} GiB/s", + SIZE as f32 / 1024f32 / 1024f32 / 1024f32 / start.elapsed().as_secs_f32() + ); - println!("Successfully written and read text to/from PMDK!"); Ok(()) } diff --git a/betree/src/c_interface.rs b/betree/src/c_interface.rs index b4717b08..71b2e1cc 100644 --- a/betree/src/c_interface.rs +++ b/betree/src/c_interface.rs @@ -961,16 +961,17 @@ pub unsafe extern "C" fn betree_object_write_at( .handle_result(err) } -/* -/// Return the objects size in bytes. +/// Return the objects size in bytes. If the size could not be determined +/// it is assumed the object is zero-sized. #[no_mangle] -pub unsafe extern "C" fn betree_object_status(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { +pub unsafe extern "C" fn betree_object_size(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { let obj = &(*obj).0; let info = obj.info(); - obj. - obj.size() + info.and_then(|ok_opt| Ok(ok_opt.map(|obj_info| obj_info.size).unwrap_or(0))) + .unwrap_or(0) } +/* /// Returns the last modification timestamp in microseconds since the Unix epoch. #[no_mangle] pub unsafe extern "C" fn betree_object_mtime_us(obj: *const obj_t) -> c_ulong { diff --git a/betree/src/checksum.rs b/betree/src/checksum.rs index fadc9a4a..47b27c19 100644 --- a/betree/src/checksum.rs +++ b/betree/src/checksum.rs @@ -2,12 +2,17 @@ use crate::size::{Size, StaticSize}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{error::Error, fmt, hash::Hasher, iter::once}; +use std::{ + error::Error, + fmt, + hash::{Hash, Hasher}, + iter::once, +}; use twox_hash; /// A checksum to verify data integrity. pub trait Checksum: - Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static + Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static { /// Builds a new `Checksum`. type Builder: Builder; @@ -67,7 +72,7 @@ impl Error for ChecksumError { /// `XxHash` contains a digest of `xxHash` /// which is an "extremely fast non-cryptographic hash algorithm" /// () -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct XxHash(u64); impl StaticSize for XxHash { diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index 7d9032b1..165d86fd 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -5,6 +5,8 @@ use super::{ object_ptr::ObjectPointer, CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference, }; +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; use crate::{ allocator::{Action, SegmentAllocator, SegmentId}, buffer::Buf, @@ -60,6 +62,8 @@ where next_modified_node_id: AtomicU64, next_disk_id: AtomicU64, report_tx: Option>, + #[cfg(feature = "nvm")] + persistent_cache: Option>>>>, } impl Dmu @@ -76,6 +80,9 @@ where alloc_strategy: [[Option; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES], cache: E, handler: Handler>>, + #[cfg(feature = "nvm")] persistent_cache: Option< + PersistentCache>, + >, ) -> Self { let allocation_data = (0..pool.storage_class_count()) .map(|class| { @@ -103,6 +110,8 @@ where next_modified_node_id: AtomicU64::new(1), next_disk_id: AtomicU64::new(0), report_tx: None, + #[cfg(feature = "nvm")] + persistent_cache: persistent_cache.map(|cache| Arc::new(Mutex::new(cache))), } } @@ -226,6 +235,23 @@ where let offset = op.offset(); let generation = op.generation(); + #[cfg(feature = "nvm")] + let compressed_data = { + let mut buf = None; + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut cache = pcache_mtx.lock(); + if let Ok(buffer) = cache.get(offset) { + buf = Some(Buf::from_zero_padded(buffer.to_vec())) + } + } + if let Some(b) = buf { + b + } else { + self.pool + .read(op.size(), op.offset(), op.checksum().clone())? + } + }; + #[cfg(not(feature = "nvm"))] let compressed_data = self .pool .read(op.size(), op.offset(), op.checksum().clone())?; @@ -329,7 +355,30 @@ where let mid = match key { ObjectKey::InWriteback(_) => unreachable!(), - ObjectKey::Unmodified { .. } => return Ok(()), + ObjectKey::Unmodified { + offset, + generation: _, + } => { + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut pcache = pcache_mtx.lock(); + // TODO: Specify correct constant instead of magic πŸͺ„ + let mut vec = Vec::with_capacity(4 * 1024 * 1024); + object.value_mut().get_mut().pack(&mut vec)?; + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, &vec, None) + .insert(|maybe_offset, data| { + // TODO: Write eventually not synced data to disk finally. + if let Some(offset) = maybe_offset { + self.pool + .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + } + Ok(()) + })?; + } + return Ok(()); + } ObjectKey::Modified(mid) => mid, }; @@ -341,6 +390,8 @@ where drop(cache); let object = CacheValueRef::write(entry); + // Eviction at this points only writes a singular node as all children + // need to be unmodified beforehand. self.handle_write_back(object, mid, true, pk)?; Ok(()) } @@ -412,7 +463,37 @@ where state.finish() }; - self.pool.begin_write(compressed_data, offset)?; + #[cfg(feature = "nvm")] + let skip_write_back = self.persistent_cache.is_some(); + #[cfg(not(feature = "nvm"))] + let skip_write_back = false; + + if !skip_write_back { + self.pool.begin_write(compressed_data, offset)?; + } else { + let bar = compressed_data.clone(); + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let away = Arc::clone(pcache_mtx); + self.pool.begin_foo(offset, move || { + let mut pcache = away.lock(); + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, &compressed_data, None) + .insert(|maybe_offset, data| { + // TODO: Deduplicate this? + // if let Some(offset) = maybe_offset { + // self.pool + // .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + // } + panic!("This should not have happnened in the debug run!"); + Ok(()) + }) + .unwrap(); + })?; + } + self.pool.begin_write(bar, offset)?; + } let obj_ptr = ObjectPointer { offset, @@ -499,11 +580,7 @@ where { warn!( "Storage tier {class} does not have enough space remaining. {} blocks of {}", - self.handler - .free_space_tier(class) - .unwrap() - .free - .as_u64(), + self.handler.free_space_tier(class).unwrap().free.as_u64(), size.as_u64() ); continue; diff --git a/betree/src/data_management/errors.rs b/betree/src/data_management/errors.rs index 4bbb7d34..5a0238f0 100644 --- a/betree/src/data_management/errors.rs +++ b/betree/src/data_management/errors.rs @@ -1,10 +1,12 @@ #![allow(missing_docs, unused_doc_comments)] use crate::{storage_pool::DiskOffset, vdev::Block}; +#[cfg(feature = "nvm")] +use pmem_hashmap::PMapError; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("The storage pool encountered an error.")] + #[error("The storage pool encountered an error. `{source}`")] VdevError { #[from] source: crate::vdev::Error, @@ -33,6 +35,12 @@ pub enum Error { CallbackError, #[error("A raw allocation has failed.")] RawAllocationError { at: DiskOffset, size: Block }, + #[cfg(feature = "nvm")] + #[error("A error occured while accessing the persistent cache. `{source}`")] + PersistentCacheError { + #[from] + source: PMapError, + }, } // To avoid recursive error types here, define a simple translation from diff --git a/betree/src/data_management/object_ptr.rs b/betree/src/data_management/object_ptr.rs index 0dbbd6d1..a394e1bf 100644 --- a/betree/src/data_management/object_ptr.rs +++ b/betree/src/data_management/object_ptr.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)] /// A pointer to an on-disk serialized object. pub struct ObjectPointer { pub(super) decompression_tag: DecompressionTag, diff --git a/betree/src/database/errors.rs b/betree/src/database/errors.rs index 4dcee1ca..5863d97a 100644 --- a/betree/src/database/errors.rs +++ b/betree/src/database/errors.rs @@ -17,7 +17,7 @@ pub enum Error { #[from] source: crate::storage_pool::Error, }, - #[error("A tree operation encountered an error. This is likely an internal error.")] + #[error("A tree operation encountered an error. This is likely an internal error. `{source}`")] TreeError { #[from] source: crate::tree::Error, @@ -56,7 +56,7 @@ pub enum Error { InUse, #[error("Message surpasses the maximum length. If you cannot shrink your value, use an object store instead.")] MessageTooLarge, - #[error("Could not serialize the given data. This is an internal error.")] + #[error("Could not serialize the given data. This is an internal error. `{source}`")] SerializeFailed { #[from] source: serde_json::Error, diff --git a/betree/src/database/mod.rs b/betree/src/database/mod.rs index e8aabaa3..a0954eaf 100644 --- a/betree/src/database/mod.rs +++ b/betree/src/database/mod.rs @@ -21,6 +21,10 @@ use crate::{ vdev::Block, StoragePreference, }; + +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; + use bincode::{deserialize, serialize_into}; use byteorder::{BigEndian, ByteOrder, LittleEndian}; use crossbeam_channel::Sender; @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration { compression: CompressionConfiguration::None, cache_size: DEFAULT_CACHE_SIZE, #[cfg(feature = "nvm")] - persistent_cache_path: None, + persistent_cache: None, access_mode: AccessMode::OpenIfExists, sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS), metrics: None, @@ -233,6 +237,12 @@ impl DatabaseConfiguration { } } + #[cfg(feature = "nvm")] + let pcache = self.persistent_cache.as_ref().map(|config| { + PersistentCache::create(&config.path, config.bytes) + .unwrap_or_else(|_| PersistentCache::open(&config.path).unwrap()) + }); + Dmu::new( self.compression.to_builder(), XxHashBuilder, @@ -241,6 +251,8 @@ impl DatabaseConfiguration { strategy, ClockCache::new(self.cache_size), handler, + #[cfg(feature = "nvm")] + pcache, ) } diff --git a/betree/src/replication/lru.rs b/betree/src/replication/lru.rs index 6fd217ee..d32ee188 100644 --- a/betree/src/replication/lru.rs +++ b/betree/src/replication/lru.rs @@ -1,9 +1,10 @@ +use super::{DataKey, Persistent, PREFIX_LRU}; use pmem_hashmap::{PMap, PMapError}; +use std::{marker::PhantomData, mem::size_of, ptr::NonNull}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] /// Key pointing to an LRU node in a persistent cache. pub struct LruKey(u64); - impl LruKey { /// Fetch and cast a pmem pointer to a [PlruNode]. /// @@ -11,7 +12,7 @@ impl LruKey { /// ====== /// The internals of this method are highly unsafe. Concurrent usage and /// removal are urgently discouraged. - fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> { + fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> { map.get(self.key()).and_then(|node_raw| unsafe { Ok(std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( node_raw.try_into().unwrap(), @@ -34,16 +35,22 @@ impl LruKey { /// Persistent LRU #[repr(C)] -pub struct Plru { +pub struct Plru { head: Option, tail: Option, // in Blocks? Return evicted element when full capacity: u64, count: u64, size: u64, + // Satisfy the rust compiler by adding a zero sized phantom here + // Also, avoid the drop checker by using a raw ptr + key_type: PhantomData<*const T>, } -impl Plru { +// hack ⛏ +const PLRU_ROOT_LENGTH_READ_COMMENT: usize = size_of::>(); + +impl Plru { pub fn create(map: &mut PMap, capacity: u64) -> Result, PMapError> { let this = Self { head: None, @@ -51,9 +58,20 @@ impl Plru { capacity, size: 0, count: 0, + key_type: PhantomData::default(), }; map.insert([super::PREFIX_LRU_ROOT], unsafe { - &std::mem::transmute::<_, [u8; size_of::()]>(this) + // This does not work as of + // https://github.com/rust-lang/rust/issues/43408 + // + // &std::mem::transmute::<_, [u8; size_of::>()]>(this) + // + // While annoying, we can avoid this problem by specifying a certain + // type to the struct and use the result from this constant for + // every other type as the type parameter is only a phantom. πŸ‘» + // + // instead let's do this: + &std::mem::transmute::<_, [u8; PLRU_ROOT_LENGTH_READ_COMMENT]>(this) })?; Self::open(map) } @@ -78,7 +96,7 @@ impl Plru { // Fixate new head let old_head_ptr = self.head.expect("Invalid State"); - let old_head = old_head_ptr.fetch(map).unwrap(); + let old_head: &mut PlruNode = old_head_ptr.fetch(map).unwrap(); old_head.fwd = Some(node_ptr); node.back = self.head; self.head = Some(node_ptr); @@ -87,19 +105,26 @@ impl Plru { } /// Add a new entry into the LRU. Will fail if already present. - pub fn insert(&mut self, map: &mut PMap, node_ptr: LruKey, size: u64) -> Result<(), PMapError> { + pub fn insert( + &mut self, + map: &mut PMap, + node_ptr: LruKey, + size: u64, + baggage: T, + ) -> Result<(), PMapError> { let node = PlruNode { fwd: None, back: self.head, size, data: node_ptr.0, + key: baggage, }; - map.insert(node_ptr.key(), &unsafe { - std::mem::transmute::<_, [u8; PLRU_NODE_SIZE]>(node) + map.insert(node_ptr.key(), unsafe { + std::mem::transmute::<_, &[u8; PLRU_NODE_SIZE]>(&node) })?; if let Some(head_ptr) = self.head { - let head = head_ptr.fetch(map)?; + let head: &mut PlruNode = head_ptr.fetch(map)?; head.fwd = Some(node_ptr); self.head = Some(node_ptr); } else { @@ -114,28 +139,29 @@ impl Plru { /// Checks whether an eviction is necessary and which entry to evict. /// This call does not perform the removal itself. - pub fn evict(&mut self) -> Option { - if let (Some(tail), true) = (self.tail, self.size > self.capacity) { - return Some(DataKey(tail.0)); + pub fn evict(&self, map: &mut PMap, size: u64) -> Result, PMapError> { + if let (Some(tail), true) = (self.tail, self.size + size > self.capacity) { + let node = tail.fetch(map)?; + return Ok(Some((DataKey(tail.0), node.key))); } - None + Ok(None) } fn cut_node_and_stitch( &mut self, map: &mut PMap, - node: &mut PlruNode, + node: &mut PlruNode, ) -> Result<(), PMapError> { let node_ptr = LruKey(node.data); if let Some(forward_ptr) = node.fwd { - let forward = forward_ptr.fetch(map)?; + let forward: &mut PlruNode = forward_ptr.fetch(map)?; forward.back = node.back; } if self.tail == Some(node_ptr) { self.tail = node.fwd; } if let Some(back_ptr) = node.back { - let back = back_ptr.fetch(map)?; + let back: &mut PlruNode = back_ptr.fetch(map)?; back.fwd = node.fwd; } if self.head == Some(node_ptr) { @@ -159,12 +185,6 @@ impl Plru { } } -use std::{mem::size_of, ptr::NonNull}; - -use super::{DataKey, Persistent, PREFIX_LRU}; - -const PLRU_NODE_SIZE: usize = size_of::(); - /// Ephemeral Wrapper around a byte array for sane access code. /// /// Structure @@ -175,17 +195,44 @@ const PLRU_NODE_SIZE: usize = size_of::(); /// .. β”‚ β”œβ”€β”€β”€β”€> /// β””β”€β”€β”€β”˜ back /// +/// Size Constraint +/// =============== +/// This structure allows for a generic member which is to be returned when +/// evictions are happening. The size available to the entire object is 256 +/// bytes of which the custom type can occupy at most 208 bytes. +/// /// Safety /// ====== /// Using this wrapper requires transmuting the underlying byte array, which /// invalidates, when used on references, all borrow checker guarantees. Use /// with extrem caution, and be sure what you are doing. #[repr(C)] -pub struct PlruNode { +pub struct PlruNode { fwd: Option, back: Option, size: u64, data: u64, + key: T, +} +const PLRU_NODE_SIZE: usize = 256; + +impl PlruNode { + const SIZE_CONSTRAINT: () = assert!( + std::mem::size_of::>() < PLRU_NODE_SIZE, + "Size of attached data to LRU entry surpasses size constraint." + ); + + pub fn new(fwd: Option, back: Option, size: u64, data: u64, key: T) -> Self { + // has to remain to ensure that the code path is evaluated by rustc + let _ = Self::SIZE_CONSTRAINT; + Self { + fwd, + back, + size, + data, + key, + } + } } #[cfg(test)] @@ -229,7 +276,7 @@ mod tests { fn new() { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let _ = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + let _ = Plru::<()>::create(&mut pmap, 32 * 1024 * 1024).unwrap(); } #[test] @@ -237,11 +284,11 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); + lru.insert(&mut pmap, LruKey(100), 321, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(100))); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); + lru.insert(&mut pmap, LruKey(101), 322, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(101))); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + lru.insert(&mut pmap, LruKey(102), 323, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(102))); } @@ -250,9 +297,9 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + lru.insert(&mut pmap, LruKey(100), 321, ()).unwrap(); + lru.insert(&mut pmap, LruKey(101), 322, ()).unwrap(); + lru.insert(&mut pmap, LruKey(102), 323, ()).unwrap(); } #[test] @@ -260,21 +307,33 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) .unwrap(); - lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024) + lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024, ()) + .unwrap(); + lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024, ()) .unwrap(); - lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024).unwrap(); assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), Some(LruKey(100)) ); lru.remove(&mut pmap, LruKey(100)).unwrap(); - lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024).unwrap(); - assert_eq!(lru.evict().and_then(|opt| Some(LruKey(opt.0))), None); - lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024, ()) + .unwrap(); + assert_eq!( + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), + None + ); + lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024, ()) + .unwrap(); assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), Some(LruKey(101)) ); } @@ -283,11 +342,28 @@ mod tests { fn remove() { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) + let mut lru: Persistent> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) .unwrap(); lru.remove(&mut pmap, LruKey(100)).unwrap(); assert_eq!(lru.head, None); assert_eq!(lru.tail, None); } + + #[test] + fn reinit() { + let file = TestFile::new(); + { + let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); + let mut lru: Persistent> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) + .unwrap(); + } + { + let mut pmap = PMap::open(file.path()).unwrap(); + let lru: Persistent> = Plru::open(&mut pmap).unwrap(); + assert_eq!(lru.head, Some(LruKey(100))); + assert_eq!(lru.tail, Some(LruKey(100))); + } + } } diff --git a/betree/src/replication/mod.rs b/betree/src/replication/mod.rs index 019e86de..a552e842 100644 --- a/betree/src/replication/mod.rs +++ b/betree/src/replication/mod.rs @@ -10,17 +10,6 @@ //! 3.3 SLOW //! 3.4 SLOWEST //! -//! -//! ``` -//! 1 -//! / \ -//! 1 1 -//! / \ / \ -//! 1 2 1 2 -//! /|\ | /|\ |\ -//! 321 2 313 22 -//! ``` -//! //! Map Keys //! ======== //! @@ -34,6 +23,7 @@ const PREFIX_LRU_ROOT: u8 = 2; use std::{ hash::Hash, + marker::PhantomData, ops::{Deref, DerefMut}, path::PathBuf, ptr::NonNull, @@ -89,10 +79,12 @@ impl DataKey { /// Persistent byte array cache. Optimized for read performance, avoid frequent /// updates. -pub struct PersistentCache { +pub struct PersistentCache { pmap: PMap, // Persistent Pointer - lru: Persistent, + lru: Persistent>, + // Fix key types + key_type: PhantomData, } /// Configuration for a persistent cache. @@ -104,13 +96,58 @@ pub struct PersistentCacheConfig { pub bytes: usize, } -impl PersistentCache { +/// Ephemeral struct created in the preparation for an insertion. +pub struct PersistentCacheInsertion<'a, K, T> { + cache: &'a mut PersistentCache, + key: DataKey, + value: &'a [u8], + baggage: T, +} + +impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { + /// Performs an execution and calls the given function on each evicted entry + /// from the store. On error the entire insertion is aborted and has to be + /// initiated anew. + pub fn insert(self, f: F) -> Result<(), PMapError> + where + F: Fn(&T, &[u8]) -> Result<(), crate::vdev::Error>, + { + while let Ok(Some((key, baggage))) = self + .cache + .lru + .evict(&mut self.cache.pmap, self.value.len() as u64) + { + let data = self.cache.pmap.get(key.key())?; + if f(baggage, data).is_err() { + return Err(PMapError::ExternalError("Writeback failed".into())); + } + // Finally actually remove the entries + self.cache.pmap.remove(key.key())?; + self.cache + .lru + .remove(&mut self.cache.pmap, key.to_lru_key())?; + } + + self.cache.pmap.insert(self.key.key(), self.value)?; + self.cache.lru.insert( + &mut self.cache.pmap, + self.key.to_lru_key(), + self.value.len() as u64, + self.baggage, + )?; + + Ok(()) + } +} + +impl PersistentCache { /// Open an existent [PersistentCache]. Fails if no cache exist or invalid. pub fn open>(path: P) -> Result { let mut pmap = PMap::open(path.into())?; Ok(Self { lru: Plru::open(&mut pmap)?, pmap, + key_type: PhantomData::default(), }) } @@ -120,32 +157,35 @@ impl PersistentCache { Ok(Self { lru: Plru::create(&mut pmap, size as u64)?, pmap, + key_type: PhantomData::default(), }) } /// Fetch an entry from the hashmap. - pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { + pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { let k = DataKey::from(key, &mut self.pmap); self.lru.touch(&mut self.pmap, k.to_lru_key())?; Ok(self.pmap.get(k.key())?) } - /// Insert an entry and remove infrequent values. - pub fn insert(&mut self, key: K, value: &[u8]) -> Result<(), PMapError> { - // TODO: Update old values? Convenience + /// Start an insertion. An insertion can only be successfully completed if values are properly evicted from the cache + pub fn prepare_insert<'a>( + &'a mut self, + key: K, + value: &'a [u8], + baggage: T, + ) -> PersistentCacheInsertion<'a, K, T> { let k = DataKey::from(key, &mut self.pmap); - self.pmap.insert(k.key(), value)?; - self.lru - .insert(&mut self.pmap, k.to_lru_key(), value.len() as u64)?; - while let Some(id) = self.lru.evict() { - self.pmap.remove(id.key())?; - self.lru.remove(&mut self.pmap, id.to_lru_key())?; + PersistentCacheInsertion { + cache: self, + key: k, + value, + baggage, } - Ok(()) } /// Remove an entry. - pub fn remove(&mut self, key: K) -> Result<(), PMapError> { + pub fn remove(&mut self, key: K) -> Result<(), PMapError> { let k = DataKey::from(key, &mut self.pmap); self.pmap.remove(k.key())?; self.lru.remove(&mut self.pmap, k.to_lru_key())?; diff --git a/betree/src/storage_pool/mod.rs b/betree/src/storage_pool/mod.rs index 66bbe0f6..581a4b94 100644 --- a/betree/src/storage_pool/mod.rs +++ b/betree/src/storage_pool/mod.rs @@ -58,6 +58,10 @@ pub trait StoragePoolLayer: Clone + Send + Sync + 'static { /// Issues a write request that might happen in the background. fn begin_write(&self, data: Buf, offset: DiskOffset) -> VdevResult<()>; + fn begin_foo(&self, offset: DiskOffset, f: F) -> VdevResult<()> + where + F: FnOnce() + Send + 'static; + /// Writes the given `data` at `offset` for every `LeafVdev`. fn write_raw(&self, data: Buf, offset: Block) -> VdevResult<()>; diff --git a/betree/src/storage_pool/unit.rs b/betree/src/storage_pool/unit.rs index 13e7373e..99272f24 100644 --- a/betree/src/storage_pool/unit.rs +++ b/betree/src/storage_pool/unit.rs @@ -166,8 +166,6 @@ impl StoragePoolLayer for StoragePoolUnit { .write(data, offset.block_offset()) .await; - // TODO: what about multiple writes to same offset? - // NOTE: This is currently covered in the tests and fails as expected inner.write_back_queue.mark_completed(&offset).await; res })?; @@ -183,6 +181,25 @@ impl StoragePoolLayer for StoragePoolUnit { ret } + fn begin_foo(&self, offset: DiskOffset, f: F) -> vdev::Result<()> + where + F: FnOnce() + Send + 'static, + { + let (enqueue_done, wait_for_enqueue) = futures::channel::oneshot::channel(); + let write = self.inner.pool.spawn_with_handle(async move { + wait_for_enqueue.await.unwrap(); + f(); + Ok(()) + })?; + + let ret = self.inner.write_back_queue.enqueue(offset, Box::pin(write)); + + enqueue_done + .send(()) + .expect("Couldn't unlock enqueued write task"); + ret + } + fn write_raw(&self, data: Buf, offset: Block) -> Result<(), VdevError> { let vec = self .inner diff --git a/betree/src/tree/errors.rs b/betree/src/tree/errors.rs index 79cc9541..f2859350 100644 --- a/betree/src/tree/errors.rs +++ b/betree/src/tree/errors.rs @@ -3,7 +3,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Storage operation could not be performed")] + #[error("Storage operation could not be performed. `{source}`")] DmuError { #[from] source: crate::data_management::Error, diff --git a/docs/src/rfc/2-persistent-cache-replication.md b/docs/src/rfc/2-persistent-cache-replication.md new file mode 100644 index 00000000..a8a62b6f --- /dev/null +++ b/docs/src/rfc/2-persistent-cache-replication.md @@ -0,0 +1,51 @@ +- Title: Persistent Cache / Data Replication +- Status: DRAFT + +# Description + +A persistent cache is a way to utilize a storage device capabilities without +integrating it further into the existing storage hierarchy by keeping it +essentially in the memory layer semantically. For this we present two +approaches, either may be used as they have different drawbacks and advantages. + +## Fallback Cache + +Once entries are evicted from the volatile cache we remove them by eviction and +allocate resources on the underlying storage media. To prolong this process we +skip this part and write them first to the non-volatile cache and eventually +when they are evicted from there they actually get written to disk. Problems +with this approach are mostly the actual utilization (read + write traffic) +which is not optimal for some storage devices (*cough* PMem). + +Copy-on-Write is another issue as we may need to ensure writes to the storage +device when modifying the entry, but this can be taken care of in the DMU. + +Regardless, it is easy to implement and should remain an option we try. + +## Read-heavy-cache + +Another approach which promises overall a better specific device utilization is +the focus on *read* heavy data which seldomly gets modified. Most of the issues +mentioned above are offloaded are resolved by this approach by it does depend on +a *decision maker* which moves data from storage or cache to the device. Most +issues in this approach are produced by this decision, with the actual result +being completely depending on this. An idea is to use the migration policies as +done with inter-tier data migration, but for this additional classification is +required. Possible workflow node independent classification schemes are still in +need to be constructed for this, which is partially an open topic for research. + +# Purpose + +This RFC tries to avoid to integrate stuff like PMem into the storage hierarchy +as non-latency bound workflows do not really gain an advantage with them, rather +a separate stack is build in the cache level to reduce latency on accesses there +(if the data fits into storage). + +# Drawbacks + + + +# Alternatives + +> Find atleast one alternative, this thought process helps preventing being +> stuck in certain implemenation details diff --git a/fio-haura/src/fio-engine-haura.c b/fio-haura/src/fio-engine-haura.c index 5aa16a6e..678186f3 100644 --- a/fio-haura/src/fio-engine-haura.c +++ b/fio-haura/src/fio-engine-haura.c @@ -304,11 +304,14 @@ static int fio_haura_setup(struct thread_data *td) { /* Haura needs some additional space to provide extra data like object * pointers and metadata. This is more of a hack, but nonetheless. */ creat(td->files[idx]->file_name, 0644); - if (truncate(td->files[idx]->file_name, td->o.size + (50 * 1024 * 1024))) { + if (truncate(td->files[idx]->file_name, td->o.file_size_high)) { fprintf( stderr, "Could not retruncate file to provide enough storage for Haura.\n"); } + // Set the already allocated size so that fio avoids the internal initial + // layouting + td->files[idx]->real_file_size = td->o.file_size_high; } td->io_ops_data = malloc(sizeof(size_t)); @@ -324,7 +327,7 @@ static int fio_haura_setup(struct thread_data *td) { return bail(error); } fio_haura_translate(td, cfg); - if ((global_data.db = betree_create_db(cfg, &error)) == NULL) { + if ((global_data.db = betree_open_or_create_db(cfg, &error)) == NULL) { return bail(error); } if ((global_data.obj_s = betree_create_object_store( @@ -355,6 +358,11 @@ static int fio_haura_setup(struct thread_data *td) { ((u_int32_t *)buf)[off] = random(); } while (max_io_size > total_written) { + if (betree_object_size(global_data.objs[idx], &error) >= + max_io_size) { + printf("haura: skipping prepopulation of object %lu", idx + 1); + break; + } unsigned long written = 0; betree_object_write_at(global_data.objs[idx], buf, block_size,