diff --git a/betree/Cargo.toml b/betree/Cargo.toml index a93612b2..e634df15 100644 --- a/betree/Cargo.toml +++ b/betree/Cargo.toml @@ -70,7 +70,7 @@ criterion = "0.3" tempfile = "3.6.0" [features] -default = ["init_env_logger", "figment_config"] +default = ["init_env_logger", "figment_config", "nvm"] # unlock unstable API for consumption by bectl and other debugging tools internal-api = [] diff --git a/betree/include/betree.h b/betree/include/betree.h index d7131aec..364815f9 100644 --- a/betree/include/betree.h +++ b/betree/include/betree.h @@ -1,7 +1,7 @@ #ifndef betree_h #define betree_h -/* Generated with cbindgen:0.24.3 */ +/* Generated with cbindgen:0.24.5 */ /* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */ @@ -467,6 +467,12 @@ int betree_object_read_at(struct obj_t *obj, unsigned long *n_read, struct err_t **err); +/** + * Return the objects size in bytes. If the size could not be determined + * it is assumed the object is zero-sized. + */ +unsigned long betree_object_size(const struct obj_t *obj, struct err_t **err); + /** * Try to write `buf_len` bytes from `buf` into `obj`, starting at `offset` bytes into the objects * data. diff --git a/betree/pmem-hashmap/src/allocator.rs b/betree/pmem-hashmap/src/allocator.rs index 27b462e6..6c50977f 100644 --- a/betree/pmem-hashmap/src/allocator.rs +++ b/betree/pmem-hashmap/src/allocator.rs @@ -1,6 +1,9 @@ use super::*; use errno::errno; use std::alloc::{AllocError, Allocator}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::ops::Deref; use std::{ffi::c_void, ptr::NonNull, sync::Arc}; use thiserror::Error; @@ -18,8 +21,8 @@ unsafe impl Send for Pal {} impl Drop for Pal { fn drop(&mut self) { // there should exist no further reference to this resources otherwise we risk some invalid fetches - if Arc::strong_count(&self.pool) == 0 && Arc::weak_count(&self.pool) == 0 { - self.close() + if Arc::strong_count(&self.pool) == 1 && Arc::weak_count(&self.pool) == 0 { + // self.close() } } } @@ -32,11 +35,11 @@ impl Into for PalError { unsafe impl Allocator for Pal { fn allocate(&self, layout: std::alloc::Layout) -> Result, AllocError> { - let ptr = self.allocate(layout.size()).map_err(|_| AllocError)?; - Ok(NonNull::new(unsafe { - core::slice::from_raw_parts_mut(ptr.load() as *mut u8, layout.size()) - }) - .ok_or_else(|| AllocError)?) + let mut ptr: PalPtr = self.allocate(layout.size()).map_err(|_| AllocError)?; + Ok( + NonNull::new(unsafe { core::slice::from_raw_parts_mut(ptr.load_mut(), layout.size()) }) + .ok_or_else(|| AllocError)?, + ) } unsafe fn deallocate(&self, ptr: NonNull, _layout: std::alloc::Layout) { @@ -64,16 +67,69 @@ pub enum PalError { // A friendly persistent pointer. Useless without the according handle to the // original arena. -#[derive(Debug, Clone, Copy)] -pub struct PalPtr { +pub struct PalPtr { inner: PMEMoid, size: usize, + _phantom: PhantomData, +} + +impl PartialEq for PalPtr { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner && self.size == other.size + } +} + +// impl Deref for PalPtr { +// type Target = T; +// +// fn deref(&self) -> &Self::Target { +// self.load() +// } +// } + +impl Eq for PalPtr {} + +impl Debug for PalPtr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PalPtr") + .field("inner", &self.inner) + .field("size", &self.size) + .finish() + } } -impl PalPtr { +impl Clone for PalPtr { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + size: self.size.clone(), + _phantom: self._phantom.clone(), + } + } +} + +impl Copy for PalPtr {} + +impl PartialEq for PMEMoid { + fn eq(&self, other: &Self) -> bool { + self.pool_uuid_lo == other.pool_uuid_lo && self.off == other.off + } +} + +impl Eq for PMEMoid {} + +impl PalPtr { /// Translate this persistent ptr to a volatile one. - pub fn load(&self) -> *mut c_void { - unsafe { haura_direct(self.inner) } + pub fn load(&self) -> &T { + unsafe { (haura_direct(self.inner) as *const T).as_ref().unwrap() } + } + + pub fn load_mut(&mut self) -> &mut T { + unsafe { (haura_direct(self.inner) as *mut T).as_mut().unwrap() } + } + + pub fn init(&mut self, src: *const T, count: usize) { + unsafe { (haura_direct(self.inner) as *mut T).copy_from(src, count) } } /// Copy a range of bytes behind this pointer to a given buffer. Data is @@ -84,7 +140,7 @@ impl PalPtr { pmemobj_memcpy( arena.pool.as_ptr(), other.as_mut_ptr() as *mut c_void, - self.load(), + haura_direct(self.inner), self.size.min(other.len()), PMEMOBJ_F_MEM_NOFLUSH, ) @@ -97,7 +153,7 @@ impl PalPtr { unsafe { pmemobj_memcpy( arena.pool.as_ptr(), - self.load(), + haura_direct(self.inner), other.as_ptr() as *const c_void, self.size.min(other.len()), PMEMOBJ_F_MEM_NONTEMPORAL, @@ -107,7 +163,7 @@ impl PalPtr { /// Deallocate this object. Required if this value is no longer needed. /// There is *no* automatic deallocation logic. - pub fn free(mut self) { + pub fn free(&mut self) { unsafe { pmemobj_free(&mut self.inner) } } } @@ -161,8 +217,17 @@ impl Pal { unsafe { pmemobj_close(self.pool.as_ptr()) }; } - /// Allocate an area of size in the persistent memory. - pub fn allocate(&self, size: usize) -> Result { + pub fn allocate_variable(&self, v: T) -> Result, PalError> { + let mut ptr = self.allocate(std::mem::size_of_val(&v))?; + assert!(ptr.size < 8192); + ptr.init(&v, std::mem::size_of_val(&v)); + Ok(ptr) + } + + /// Allocate an area of size in the persistent memory. Allocations are + /// always guaranteed to be cache line aligned for Optane PMem (64 bytes). + pub fn allocate(&self, size: usize) -> Result, PalError> { + assert!(size < 8192); let mut oid = std::mem::MaybeUninit::::uninit(); if unsafe { haura_alloc( @@ -187,6 +252,7 @@ impl Pal { Ok(PalPtr { inner: unsafe { oid.assume_init() }, size, + _phantom: PhantomData {}, }) } @@ -196,12 +262,16 @@ impl Pal { /// /// If called with size 0 an existing root object might be opened, if none /// exists EINVAL is returned. - pub fn root(&self, size: usize) -> Result { + pub fn root(&self, size: usize) -> Result, PalError> { let oid = unsafe { pmemobj_root(self.pool.as_ptr(), size) }; if oid_is_null(oid) { return Err(PalError::AllocationFailed(format!("{}", errno()))); } - Ok(PalPtr { inner: oid, size }) + Ok(PalPtr { + inner: oid, + size, + _phantom: PhantomData {}, + }) } /// Return the maximum size of the current root object. @@ -269,43 +339,30 @@ mod tests { let file = TestFile::new(); { let mut pal = Pal::create(file.path(), 128 * 1024 * 1024, 0o666).unwrap(); - let mut map: BTreeMap = BTreeMap::new_in(pal.clone()); - let root_ptr = pal + let map: BTreeMap = BTreeMap::new_in(pal.clone()); + let mut root_ptr: PalPtr> = pal .root(std::mem::size_of::>()) .unwrap(); - unsafe { - (root_ptr.load() as *mut BTreeMap) - .copy_from(&map, std::mem::size_of::>()) - }; + root_ptr.init(&map, std::mem::size_of::>()); std::mem::forget(map); - let map: &mut BTreeMap = unsafe { - (root_ptr.load() as *mut BTreeMap) - .as_mut() - .unwrap() - }; + let map: &mut BTreeMap = root_ptr.load_mut(); for id in 0..100 { map.insert(id, id); } for id in 100..0 { assert_eq!(map.get(&id), Some(&id)); } - std::mem::forget(map); pal.close(); } { let mut pal = Pal::open(file.path()).unwrap(); - let root_ptr = pal + let mut root_ptr = pal .root(std::mem::size_of::>()) .unwrap(); - let map: &mut BTreeMap = unsafe { - (root_ptr.load() as *mut BTreeMap) - .as_mut() - .unwrap() - }; + let map: &mut BTreeMap = root_ptr.load_mut(); for id in 100..0 { assert_eq!(map.get(&id), Some(&id)); } - std::mem::forget(map); pal.close(); } } diff --git a/betree/pmem-hashmap/src/bin/bench_pal.rs b/betree/pmem-hashmap/src/bin/bench_pal.rs index 144d2531..f7ea3317 100644 --- a/betree/pmem-hashmap/src/bin/bench_pal.rs +++ b/betree/pmem-hashmap/src/bin/bench_pal.rs @@ -12,8 +12,8 @@ fn bench_pal_sub(rank: usize, barrier: Arc, tx: std::sync::m Arc::new(Pal::create(format!("/home/wuensche/pmem/foobar{rank}"), SIZE, 0o666).unwrap()); enum CMD { - Read(Vec), - Write(Vec), + Read(Vec>), + Write(Vec>), Wait, } diff --git a/betree/src/buffer.rs b/betree/src/buffer.rs index 5a896593..dece75ae 100644 --- a/betree/src/buffer.rs +++ b/betree/src/buffer.rs @@ -56,6 +56,8 @@ fn split_range_at( struct AlignedStorage { ptr: *mut u8, capacity: Block, + // avoid deallocation + is_persistent: bool, } impl Default for AlignedStorage { @@ -63,6 +65,7 @@ impl Default for AlignedStorage { AlignedStorage { ptr: ptr::null_mut(), capacity: Block(0), + is_persistent: false, } } } @@ -127,7 +130,7 @@ impl AlignedStorage { impl Drop for AlignedStorage { fn drop(&mut self) { - if !self.ptr.is_null() { + if !self.ptr.is_null() && !self.is_persistent { unsafe { let layout = Layout::from_size_align_unchecked( self.capacity.to_bytes() as usize, @@ -143,11 +146,12 @@ impl From> for AlignedStorage { fn from(b: Box<[u8]>) -> Self { // It can be useful to re-enable this line to easily locate places where unnecessary // copying takes place, but it's not suited to stay enabled unconditionally. - // assert!(is_aligned(&b)); + assert!(is_aligned(&b)); if is_aligned(&b) { AlignedStorage { capacity: Block::from_bytes(b.len() as u32), ptr: unsafe { (*Box::into_raw(b)).as_mut_ptr() }, + is_persistent: false, } } else { assert!( @@ -411,6 +415,44 @@ impl Buf { }, ) } + + /// Display the entire underlying buffer without respect to the semantic + /// size of the data contained. + pub fn as_slice_with_padding(&self) -> &[u8] { + unsafe { + let buf = &*self.buf.buf.get(); + slice::from_raw_parts(buf.ptr, buf.capacity.to_bytes() as usize) + } + } +} + +#[cfg(feature = "nvm")] +use pmem_hashmap::allocator::PalPtr; + +#[cfg(feature = "nvm")] +impl Buf { + /// Special case transformation from a pointer to persistent memory. Always + /// assumed to be length-aligned to [BLOCK_SIZE]. + /// + /// The alignment works differently and is not as freely configurable as can + /// be done with "normal" rust allocations. Therefore, only the length of + /// the ptr is checked *not* the actual location. These are guaranteed to + /// lie at 64 byte cache line boundaries. + pub fn from_persistent_ptr(mut ptr: PalPtr, size: u32) -> Self { + assert_eq!(size as usize % BLOCK_SIZE, 0); + let padded_size = Block::round_up_from_bytes(size); + let aligned_buf = AlignedBuf { + buf: Arc::new(UnsafeCell::new(AlignedStorage { + ptr: std::ptr::addr_of_mut!(*ptr.load_mut()), + capacity: padded_size, + is_persistent: true, + })), + }; + Self { + range: aligned_buf.full_range(), + buf: aligned_buf, + } + } } impl MutBuf { diff --git a/betree/src/c_interface.rs b/betree/src/c_interface.rs index b4717b08..71b2e1cc 100644 --- a/betree/src/c_interface.rs +++ b/betree/src/c_interface.rs @@ -961,16 +961,17 @@ pub unsafe extern "C" fn betree_object_write_at( .handle_result(err) } -/* -/// Return the objects size in bytes. +/// Return the objects size in bytes. If the size could not be determined +/// it is assumed the object is zero-sized. #[no_mangle] -pub unsafe extern "C" fn betree_object_status(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { +pub unsafe extern "C" fn betree_object_size(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { let obj = &(*obj).0; let info = obj.info(); - obj. - obj.size() + info.and_then(|ok_opt| Ok(ok_opt.map(|obj_info| obj_info.size).unwrap_or(0))) + .unwrap_or(0) } +/* /// Returns the last modification timestamp in microseconds since the Unix epoch. #[no_mangle] pub unsafe extern "C" fn betree_object_mtime_us(obj: *const obj_t) -> c_ulong { diff --git a/betree/src/checksum.rs b/betree/src/checksum.rs index fadc9a4a..47b27c19 100644 --- a/betree/src/checksum.rs +++ b/betree/src/checksum.rs @@ -2,12 +2,17 @@ use crate::size::{Size, StaticSize}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{error::Error, fmt, hash::Hasher, iter::once}; +use std::{ + error::Error, + fmt, + hash::{Hash, Hasher}, + iter::once, +}; use twox_hash; /// A checksum to verify data integrity. pub trait Checksum: - Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static + Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static { /// Builds a new `Checksum`. type Builder: Builder; @@ -67,7 +72,7 @@ impl Error for ChecksumError { /// `XxHash` contains a digest of `xxHash` /// which is an "extremely fast non-cryptographic hash algorithm" /// () -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct XxHash(u64); impl StaticSize for XxHash { diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index 7d9032b1..5de0282c 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -5,9 +5,11 @@ use super::{ object_ptr::ObjectPointer, CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference, }; +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; use crate::{ allocator::{Action, SegmentAllocator, SegmentId}, - buffer::Buf, + buffer::{Buf, BufWrite}, cache::{Cache, ChangeKeyError, RemoveError}, checksum::{Builder, Checksum, State}, compression::CompressionBuilder, @@ -60,6 +62,8 @@ where next_modified_node_id: AtomicU64, next_disk_id: AtomicU64, report_tx: Option>, + #[cfg(feature = "nvm")] + persistent_cache: Option>>>>, } impl Dmu @@ -76,6 +80,9 @@ where alloc_strategy: [[Option; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES], cache: E, handler: Handler>>, + #[cfg(feature = "nvm")] persistent_cache: Option< + PersistentCache>, + >, ) -> Self { let allocation_data = (0..pool.storage_class_count()) .map(|class| { @@ -103,6 +110,8 @@ where next_modified_node_id: AtomicU64::new(1), next_disk_id: AtomicU64::new(0), report_tx: None, + #[cfg(feature = "nvm")] + persistent_cache: persistent_cache.map(|cache| Arc::new(RwLock::new(cache))), } } @@ -162,6 +171,19 @@ where let obj = CacheValueRef::write(entry); if let ObjRef::Unmodified(ptr, ..) = replace(or, ObjRef::Modified(mid, pk)) { + // Deallocate old-region and remove from cache + if let Some(pcache_mtx) = &self.persistent_cache { + let pcache = pcache_mtx.read(); + let res = pcache.get(ptr.offset().clone()).is_ok(); + drop(pcache); + if res { + // FIXME: Offload this lock to a different thread + // This operation is only tangantely important for the + // operation here and not time critical. + let mut pcache = pcache_mtx.write(); + pcache.remove(ptr.offset().clone()).unwrap(); + } + } self.copy_on_write(ptr, CopyOnWriteReason::Steal, or.index().clone()); } Ok(Some(obj)) @@ -226,6 +248,24 @@ where let offset = op.offset(); let generation = op.generation(); + #[cfg(feature = "nvm")] + let compressed_data = { + let mut buf = None; + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut cache = pcache_mtx.read(); + if let Ok(buffer) = cache.get_buf(offset) { + // buf = Some(Buf::from_zero_padded(buffer.to_vec())) + buf = Some(buffer) + } + } + if let Some(b) = buf { + b + } else { + self.pool + .read(op.size(), op.offset(), op.checksum().clone())? + } + }; + #[cfg(not(feature = "nvm"))] let compressed_data = self .pool .read(op.size(), op.offset(), op.checksum().clone())?; @@ -329,7 +369,51 @@ where let mid = match key { ObjectKey::InWriteback(_) => unreachable!(), - ObjectKey::Unmodified { .. } => return Ok(()), + ObjectKey::Unmodified { + offset, + generation: _, + } => { + // If data is unmodified still move to pcache, as it might be worth saving (if not already contained) + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + { + // Encapsulate concurrent read access + let pcache = pcache_mtx.read(); + if pcache.get(offset).is_ok() { + return Ok(()); + } + } + // We need to compress data here to ensure compatability + // with the other branch going through the write back + // procedure. + let compression = &self.default_compression; + let compressed_data = { + let mut state = compression.new_compression()?; + { + object.value().read().pack(&mut state)?; + drop(object); + } + state.finish() + }; + let away = Arc::clone(pcache_mtx); + // Arc to storage pool + let pool = self.pool.clone(); + self.pool.begin_write_offload(offset, move || { + let mut pcache = away.write(); + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, compressed_data, None) + .insert(|maybe_offset, buf| { + if let Some(offset) = maybe_offset { + pool.begin_write(buf, *offset)?; + } + Ok(()) + }) + .unwrap(); + })?; + } + return Ok(()); + } ObjectKey::Modified(mid) => mid, }; @@ -341,6 +425,8 @@ where drop(cache); let object = CacheValueRef::write(entry); + // Eviction at this points only writes a singular node as all children + // need to be unmodified beforehand. self.handle_write_back(object, mid, true, pk)?; Ok(()) } @@ -412,7 +498,37 @@ where state.finish() }; - self.pool.begin_write(compressed_data, offset)?; + #[cfg(feature = "nvm")] + let skip_write_back = self.persistent_cache.is_some(); + #[cfg(not(feature = "nvm"))] + let skip_write_back = false; + + if !skip_write_back { + self.pool.begin_write(compressed_data, offset)?; + } else { + // Cheap copy + let bar = compressed_data.clone(); + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let away = Arc::clone(pcache_mtx); + // Arc to storage pool + let pool = self.pool.clone(); + self.pool.begin_write_offload(offset, move || { + let mut pcache = away.write(); + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, bar, None) + .insert(|maybe_offset, buf| { + if let Some(offset) = maybe_offset { + pool.begin_write(buf, *offset)?; + } + Ok(()) + }) + .unwrap(); + })?; + } + self.pool.begin_write(compressed_data, offset)?; + } let obj_ptr = ObjectPointer { offset, @@ -499,11 +615,7 @@ where { warn!( "Storage tier {class} does not have enough space remaining. {} blocks of {}", - self.handler - .free_space_tier(class) - .unwrap() - .free - .as_u64(), + self.handler.free_space_tier(class).unwrap().free.as_u64(), size.as_u64() ); continue; diff --git a/betree/src/data_management/errors.rs b/betree/src/data_management/errors.rs index 4bbb7d34..5a0238f0 100644 --- a/betree/src/data_management/errors.rs +++ b/betree/src/data_management/errors.rs @@ -1,10 +1,12 @@ #![allow(missing_docs, unused_doc_comments)] use crate::{storage_pool::DiskOffset, vdev::Block}; +#[cfg(feature = "nvm")] +use pmem_hashmap::PMapError; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("The storage pool encountered an error.")] + #[error("The storage pool encountered an error. `{source}`")] VdevError { #[from] source: crate::vdev::Error, @@ -33,6 +35,12 @@ pub enum Error { CallbackError, #[error("A raw allocation has failed.")] RawAllocationError { at: DiskOffset, size: Block }, + #[cfg(feature = "nvm")] + #[error("A error occured while accessing the persistent cache. `{source}`")] + PersistentCacheError { + #[from] + source: PMapError, + }, } // To avoid recursive error types here, define a simple translation from diff --git a/betree/src/data_management/object_ptr.rs b/betree/src/data_management/object_ptr.rs index 0dbbd6d1..a394e1bf 100644 --- a/betree/src/data_management/object_ptr.rs +++ b/betree/src/data_management/object_ptr.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)] /// A pointer to an on-disk serialized object. pub struct ObjectPointer { pub(super) decompression_tag: DecompressionTag, diff --git a/betree/src/database/errors.rs b/betree/src/database/errors.rs index 4dcee1ca..5863d97a 100644 --- a/betree/src/database/errors.rs +++ b/betree/src/database/errors.rs @@ -17,7 +17,7 @@ pub enum Error { #[from] source: crate::storage_pool::Error, }, - #[error("A tree operation encountered an error. This is likely an internal error.")] + #[error("A tree operation encountered an error. This is likely an internal error. `{source}`")] TreeError { #[from] source: crate::tree::Error, @@ -56,7 +56,7 @@ pub enum Error { InUse, #[error("Message surpasses the maximum length. If you cannot shrink your value, use an object store instead.")] MessageTooLarge, - #[error("Could not serialize the given data. This is an internal error.")] + #[error("Could not serialize the given data. This is an internal error. `{source}`")] SerializeFailed { #[from] source: serde_json::Error, diff --git a/betree/src/database/mod.rs b/betree/src/database/mod.rs index e8aabaa3..a0954eaf 100644 --- a/betree/src/database/mod.rs +++ b/betree/src/database/mod.rs @@ -21,6 +21,10 @@ use crate::{ vdev::Block, StoragePreference, }; + +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; + use bincode::{deserialize, serialize_into}; use byteorder::{BigEndian, ByteOrder, LittleEndian}; use crossbeam_channel::Sender; @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration { compression: CompressionConfiguration::None, cache_size: DEFAULT_CACHE_SIZE, #[cfg(feature = "nvm")] - persistent_cache_path: None, + persistent_cache: None, access_mode: AccessMode::OpenIfExists, sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS), metrics: None, @@ -233,6 +237,12 @@ impl DatabaseConfiguration { } } + #[cfg(feature = "nvm")] + let pcache = self.persistent_cache.as_ref().map(|config| { + PersistentCache::create(&config.path, config.bytes) + .unwrap_or_else(|_| PersistentCache::open(&config.path).unwrap()) + }); + Dmu::new( self.compression.to_builder(), XxHashBuilder, @@ -241,6 +251,8 @@ impl DatabaseConfiguration { strategy, ClockCache::new(self.cache_size), handler, + #[cfg(feature = "nvm")] + pcache, ) } diff --git a/betree/src/lib.rs b/betree/src/lib.rs index c25d5c50..3e1d19f8 100644 --- a/betree/src/lib.rs +++ b/betree/src/lib.rs @@ -5,6 +5,8 @@ // code as the alternative would be allowing each single method only provided on // a maybe needed in the future basis. #![allow(dead_code)] +#![feature(allocator_api)] +#![feature(btreemap_alloc)] #[macro_use] extern crate error_chain; diff --git a/betree/src/replication/lru.rs b/betree/src/replication/lru.rs index 6fd217ee..574ee073 100644 --- a/betree/src/replication/lru.rs +++ b/betree/src/replication/lru.rs @@ -1,110 +1,92 @@ -use pmem_hashmap::{PMap, PMapError}; - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -/// Key pointing to an LRU node in a persistent cache. -pub struct LruKey(u64); - -impl LruKey { - /// Fetch and cast a pmem pointer to a [PlruNode]. - /// - /// Safety - /// ====== - /// The internals of this method are highly unsafe. Concurrent usage and - /// removal are urgently discouraged. - fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> { - map.get(self.key()).and_then(|node_raw| unsafe { - Ok(std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( - node_raw.try_into().unwrap(), - )) - }) - } - - /// Wrap a hashed value in an [LruKey]. - pub fn from(hash: u64) -> Self { - LruKey(hash) - } +use super::{Persistent, PREFIX_LRU}; +use pmem_hashmap::{allocator::PalPtr, PMap, PMapError}; +use std::{marker::PhantomData, mem::size_of, ptr::NonNull}; - fn key(&self) -> [u8; size_of::() + 1] { - let mut key = [0; size_of::() + 1]; - key[0] = PREFIX_LRU; - key[1..].copy_from_slice(&self.0.to_le_bytes()); - key - } -} +/// Fetch and cast a pmem pointer to a [PlruNode]. +/// +/// Safety +/// ====== +/// The internals of this method are highly unsafe. Concurrent usage and +/// removal are urgently discouraged. +// fn fetch(ptr: &PalPtr>) -> Result<&mut PlruNode, PMapError> { +// Ok(unsafe { +// std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( +// core::slice::from_raw_parts_mut(ptr.load_mut(), PLRU_NODE_SIZE) +// .try_into() +// .unwrap(), +// ) +// }) +// } /// Persistent LRU #[repr(C)] -pub struct Plru { - head: Option, - tail: Option, +pub struct Plru { + head: Option>>, + tail: Option>>, // in Blocks? Return evicted element when full capacity: u64, count: u64, size: u64, + // Satisfy the rust compiler by adding a zero sized phantom here + // Also, avoid the drop checker by using a raw ptr + key_type: PhantomData<*const T>, } -impl Plru { - pub fn create(map: &mut PMap, capacity: u64) -> Result, PMapError> { - let this = Self { +// hack ⛏ +const PLRU_ROOT_LENGTH_READ_COMMENT: usize = size_of::>(); + +impl Plru { + pub fn init(capacity: u64) -> Plru { + Self { head: None, tail: None, capacity, size: 0, count: 0, - }; - map.insert([super::PREFIX_LRU_ROOT], unsafe { - &std::mem::transmute::<_, [u8; size_of::()]>(this) - })?; - Self::open(map) - } - - pub fn open(map: &mut PMap) -> Result, PMapError> { - // Fetch from map, check if valid, transmute to type, and reaffirm - // non-null-ness. The unchecked transition is always correct as we unpack an already ensured - unsafe { - Ok(Persistent(NonNull::new_unchecked(std::mem::transmute( - map.get([super::PREFIX_LRU_ROOT])?.as_mut_ptr(), - )))) + key_type: PhantomData::default(), } } - pub fn touch(&mut self, map: &mut PMap, node_ptr: LruKey) -> Result<(), PMapError> { - if self.head == Some(node_ptr) { + pub fn touch(&mut self, node_ptr: &mut PalPtr>) -> Result<(), PMapError> { + if self.head.as_ref() == Some(node_ptr) { return Ok(()); } - let node = node_ptr.fetch(map).unwrap(); - self.cut_node_and_stitch(map, node)?; + self.cut_node_and_stitch(node_ptr)?; // Fixate new head - let old_head_ptr = self.head.expect("Invalid State"); - let old_head = old_head_ptr.fetch(map).unwrap(); - old_head.fwd = Some(node_ptr); - node.back = self.head; - self.head = Some(node_ptr); + let mut old_head_ptr = self.head.as_mut().expect("Invalid State"); + let old_head: &mut PlruNode = old_head_ptr.load_mut(); + old_head.fwd = Some(node_ptr.clone()); + let node: &mut PlruNode = node_ptr.load_mut(); + node.back = self.head.clone(); + self.head = Some(node_ptr.clone()); Ok(()) } /// Add a new entry into the LRU. Will fail if already present. - pub fn insert(&mut self, map: &mut PMap, node_ptr: LruKey, size: u64) -> Result<(), PMapError> { - let node = PlruNode { - fwd: None, - back: self.head, - size, - data: node_ptr.0, - }; - - map.insert(node_ptr.key(), &unsafe { - std::mem::transmute::<_, [u8; PLRU_NODE_SIZE]>(node) - })?; - if let Some(head_ptr) = self.head { - let head = head_ptr.fetch(map)?; - head.fwd = Some(node_ptr); + pub fn insert( + &mut self, + mut node_ptr: PalPtr>, + hash: u64, + size: u64, + baggage: T, + ) -> Result<(), PMapError> { + let new_node = node_ptr.load_mut(); + new_node.fwd = None; + new_node.back = self.head.clone(); + new_node.size = size; + new_node.key = baggage; + new_node.hash = hash; + + if let Some(ref mut head_ptr) = self.head.as_mut() { + let head: &mut PlruNode = head_ptr.load_mut(); + head.fwd = Some(node_ptr.clone()); self.head = Some(node_ptr); } else { // no head existed yet -> newly initialized list - self.head = Some(node_ptr); + self.head = Some(node_ptr.clone()); self.tail = Some(node_ptr); } self.size += size; @@ -114,33 +96,35 @@ impl Plru { /// Checks whether an eviction is necessary and which entry to evict. /// This call does not perform the removal itself. - pub fn evict(&mut self) -> Option { - if let (Some(tail), true) = (self.tail, self.size > self.capacity) { - return Some(DataKey(tail.0)); + pub fn evict(&self, size: u64) -> Result, PMapError> { + if let (Some(ref tail), true) = (self.tail.as_ref(), self.size + size > self.capacity) { + let node = tail.load(); + return Ok(Some((node.hash, &node.key))); } - None + Ok(None) } - fn cut_node_and_stitch( - &mut self, - map: &mut PMap, - node: &mut PlruNode, - ) -> Result<(), PMapError> { - let node_ptr = LruKey(node.data); - if let Some(forward_ptr) = node.fwd { - let forward = forward_ptr.fetch(map)?; - forward.back = node.back; + fn cut_node_and_stitch(&mut self, node_ptr: &mut PalPtr>) -> Result<(), PMapError> { + let node: &mut PlruNode = node_ptr.load_mut(); + if let Some(ref mut forward_ptr) = node.fwd.as_mut() { + let forward: &mut PlruNode = forward_ptr.load_mut(); + forward.back = node.back.clone(); } - if self.tail == Some(node_ptr) { - self.tail = node.fwd; + if let Some(ref mut back_ptr) = node.back.as_mut() { + let back: &mut PlruNode = back_ptr.load_mut(); + back.fwd = node.fwd.clone(); } - if let Some(back_ptr) = node.back { - let back = back_ptr.fetch(map)?; - back.fwd = node.fwd; + drop(node); + let node = node_ptr.load(); + + if self.head.as_ref() == Some(node_ptr) { + self.head = node.back.clone(); } - if self.head == Some(node_ptr) { - self.head = node.back; + if self.tail.as_ref() == Some(node_ptr) { + self.tail = node.fwd.clone(); } + + let node: &mut PlruNode = node_ptr.load_mut(); node.fwd = None; node.back = None; @@ -148,23 +132,15 @@ impl Plru { } /// Remove a node from cache and deallocate. - pub fn remove(&mut self, map: &mut PMap, node_ptr: LruKey) -> Result<(), PMapError> { - let node = node_ptr.fetch(map)?; - self.cut_node_and_stitch(map, node)?; - let size = node.size; - map.remove(node_ptr.key())?; - self.size -= size; + pub fn remove(&mut self, node_ptr: &mut PalPtr>) -> Result<(), PMapError> { + self.cut_node_and_stitch(node_ptr)?; + let node: &PlruNode = node_ptr.load(); + self.size -= node.size; self.count -= 1; Ok(()) } } -use std::{mem::size_of, ptr::NonNull}; - -use super::{DataKey, Persistent, PREFIX_LRU}; - -const PLRU_NODE_SIZE: usize = size_of::(); - /// Ephemeral Wrapper around a byte array for sane access code. /// /// Structure @@ -175,22 +151,56 @@ const PLRU_NODE_SIZE: usize = size_of::(); /// .. │ ├────> /// └───┘ back /// +/// Size Constraint +/// =============== +/// This structure allows for a generic member which is to be returned when +/// evictions are happening. The size available to the entire object is 256 +/// bytes of which the custom type can occupy at most 208 bytes. +/// /// Safety /// ====== /// Using this wrapper requires transmuting the underlying byte array, which /// invalidates, when used on references, all borrow checker guarantees. Use /// with extrem caution, and be sure what you are doing. #[repr(C)] -pub struct PlruNode { - fwd: Option, - back: Option, +pub struct PlruNode { + fwd: Option>>, + back: Option>>, size: u64, - data: u64, + hash: u64, + key: T, +} +pub(super) const PLRU_NODE_SIZE: usize = 256; + +impl PlruNode { + const SIZE_CONSTRAINT: () = assert!( + std::mem::size_of::>() < PLRU_NODE_SIZE, + "Size of attached data to LRU entry surpasses size constraint." + ); + + pub fn new( + fwd: Option>>, + back: Option>>, + size: u64, + hash: u64, + key: T, + ) -> Self { + // has to remain to ensure that the code path is evaluated by rustc + let _ = Self::SIZE_CONSTRAINT; + Self { + fwd, + back, + size, + hash, + key, + } + } } #[cfg(test)] mod tests { use super::*; + use pmem_hashmap::allocator::Pal; use std::{path::PathBuf, process::Command}; use tempfile::Builder; @@ -228,66 +238,156 @@ mod tests { #[test] fn new() { let file = TestFile::new(); - let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let _ = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); } #[test] fn insert() { let file = TestFile::new(); - let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); - assert_eq!(lru.head, Some(LruKey(100))); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); - assert_eq!(lru.head, Some(LruKey(101))); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); - assert_eq!(lru.head, Some(LruKey(102))); + let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); + + // Insert 3 entries + for id in 0..3 { + let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); + plru.insert(node_ptr.clone(), id, 312, ()).unwrap(); + assert_eq!(plru.head, Some(node_ptr)); + } + assert_eq!(plru.count, 3); } #[test] fn touch() { let file = TestFile::new(); - let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); + + // Insert 3 entries + let mut ptr = vec![]; + for id in 0..3 { + let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); + ptr.push(node_ptr.clone()); + plru.insert(node_ptr.clone(), id, 312, ()).unwrap(); + assert_eq!(plru.head, Some(node_ptr)); + } + assert_eq!(plru.count, 3); + + for ptr in ptr.iter_mut() { + plru.touch(ptr); + assert_eq!(plru.head, Some(ptr).cloned()); + } } #[test] fn evict() { let file = TestFile::new(); - let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) - .unwrap(); - lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024) - .unwrap(); - lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024).unwrap(); - assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), - Some(LruKey(100)) - ); - lru.remove(&mut pmap, LruKey(100)).unwrap(); - lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024).unwrap(); - assert_eq!(lru.evict().and_then(|opt| Some(LruKey(opt.0))), None); - lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024).unwrap(); - assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), - Some(LruKey(101)) + let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), ); + let plru = root.load_mut(); + + // Insert 3 entries + let mut ptr = vec![]; + for id in 0..3 { + let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); + ptr.push(node_ptr.clone()); + plru.insert(node_ptr.clone(), id, 15 * 1024 * 1024, ()) + .unwrap(); + assert_eq!(plru.head, Some(node_ptr)); + } + assert_eq!(plru.count, 3); + + assert_eq!(plru.evict(0).unwrap(), Some((0, &()))); + plru.remove(&mut ptr[0]).unwrap(); + plru.insert(ptr[0].clone(), 3, 1 * 1024 * 1024, ()).unwrap(); + assert_eq!(plru.evict(0).unwrap(), None); } #[test] fn remove() { let file = TestFile::new(); - let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) - .unwrap(); - lru.remove(&mut pmap, LruKey(100)).unwrap(); - assert_eq!(lru.head, None); - assert_eq!(lru.tail, None); + let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); + + // Insert 3 entries + let mut ptr = vec![]; + for id in 0..3 { + let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); + ptr.push(node_ptr.clone()); + plru.insert(node_ptr.clone(), id, 15 * 1024 * 1024, ()) + .unwrap(); + assert_eq!(plru.head, Some(node_ptr)); + } + assert_eq!(plru.count, 3); + + for ptr in ptr.iter_mut() { + plru.remove(ptr); + } + assert_eq!(plru.head, None); + assert_eq!(plru.tail, None); + } + + #[test] + fn reinit() { + let file = TestFile::new(); + let mut ptr = vec![]; + { + let mut pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); + + // Insert 3 entries + for id in 0..3 { + let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); + ptr.push(node_ptr.clone()); + plru.insert(node_ptr.clone(), id, 15 * 1024 * 1024, ()) + .unwrap(); + assert_eq!(plru.head, Some(node_ptr)); + } + assert_eq!(plru.count, 3); + pal.close(); + } + { + let mut pal = Pal::open(file.path()).unwrap(); + let mut root: PalPtr> = pal.root(size_of::>()).unwrap(); + let plru = root.load_mut(); + + assert_eq!(plru.head, Some(ptr.last().unwrap().clone())); + assert_eq!(plru.tail, Some(ptr.first().unwrap().clone())); + for ptr in ptr.iter_mut().rev() { + assert_eq!(plru.head, Some(ptr.clone())); + plru.remove(ptr); + } + assert_eq!(plru.count, 0); + pal.close(); + } } } diff --git a/betree/src/replication/lru_worker.rs b/betree/src/replication/lru_worker.rs new file mode 100644 index 00000000..47a66fca --- /dev/null +++ b/betree/src/replication/lru_worker.rs @@ -0,0 +1,33 @@ +use crossbeam_channel::Receiver; +use pmem_hashmap::allocator::PalPtr; + +use super::{lru::PlruNode, PCacheRoot, Persistent}; + +pub enum Msg { + Touch(PalPtr>), + Remove(PalPtr>), + Insert(PalPtr>, u64, u64, T), + Close, +} + +pub fn main(rx: Receiver>, mut root: Persistent>) { + // TODO: Error handling with return to valid state in the data section.. + while let Ok(msg) = rx.recv() { + match msg { + Msg::Touch(mut ptr) => { + let mut lru = root.lru.write(); + let _ = lru.touch(&mut ptr); + } + Msg::Remove(mut ptr) => { + let mut lru = root.lru.write(); + let _ = lru.remove(&mut ptr); + ptr.free(); + } + Msg::Insert(ptr, hash, size, baggage) => { + let mut lru = root.lru.write(); + let _ = lru.insert(ptr, hash, size, baggage); + } + Msg::Close => break, + } + } +} diff --git a/betree/src/replication/mod.rs b/betree/src/replication/mod.rs index 019e86de..0cffb359 100644 --- a/betree/src/replication/mod.rs +++ b/betree/src/replication/mod.rs @@ -10,17 +10,6 @@ //! 3.3 SLOW //! 3.4 SLOWEST //! -//! -//! ``` -//! 1 -//! / \ -//! 1 1 -//! / \ / \ -//! 1 2 1 2 -//! /|\ | /|\ |\ -//! 321 2 313 22 -//! ``` -//! //! Map Keys //! ======== //! @@ -32,67 +21,95 @@ const PREFIX_KV: u8 = 0; const PREFIX_LRU: u8 = 1; const PREFIX_LRU_ROOT: u8 = 2; +use crossbeam_channel::Sender; +use parking_lot::RwLock; +use pmem_hashmap::{ + allocator::{Pal, PalPtr}, + PMap, PMapError, +}; use std::{ - hash::Hash, + collections::BTreeMap, + hash::{Hash, Hasher}, + marker::PhantomData, + mem::size_of, ops::{Deref, DerefMut}, path::PathBuf, ptr::NonNull, + thread::JoinHandle, }; - -use pmem_hashmap::{PMap, PMapError}; +use twox_hash::XxHash64; +use zstd_safe::WriteBuf; mod lru; +mod lru_worker; use lru::Plru; use serde::{Deserialize, Serialize}; -use self::lru::LruKey; +use crate::buffer::Buf; + +use self::lru::PlruNode; /// A pointer to a region in persistent memory. -pub struct Persistent(NonNull); +pub struct Persistent(PalPtr); // Pointer to persistent memory can be assumed to be non-thread-local unsafe impl Send for Persistent {} impl Deref for Persistent { type Target = T; fn deref(&self) -> &Self::Target { - unsafe { self.0.as_ref() } + unsafe { self.0.load() } } } impl DerefMut for Persistent { fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { self.0.as_mut() } + unsafe { self.0.load_mut() } } } -/// Internally used key to access data store. -pub struct DataKey(u64); - -impl DataKey { - /// Create an internal store from an external key. - pub fn from(key: K, pmap: &mut PMap) -> Self { - DataKey(pmap.hash(key)) +impl Drop for Persistent { + fn drop(&mut self) { + // NO-OP } +} - /// Expose as byte range for FFI use. - pub fn key(&self) -> [u8; 9] { - let mut key = [0; 9]; - key[0] = PREFIX_KV; - key[1..].copy_from_slice(&self.0.to_le_bytes()); - key - } +// TODO: Is this really safe? +unsafe impl Sync for Persistent> {} - /// Convert to an internally used representation for LRU entries. - pub fn to_lru_key(&self) -> LruKey { - LruKey::from(self.0) +/// Persistent byte array cache. Optimized for read performance, avoid frequent +/// updates. +pub struct PersistentCache { + pal: Pal, + root: Persistent>, + tx: Sender>, + hndl: Option>, + // Fix key types + key_type: PhantomData, +} + +impl Drop for PersistentCache { + fn drop(&mut self) { + // Spin while the queue needs to be processed + self.tx + .send(lru_worker::Msg::Close) + .expect("Thread panicked."); + self.hndl + .take() + .expect("Thread handle has been empty?") + .join(); + self.pal.close(); } } -/// Persistent byte array cache. Optimized for read performance, avoid frequent -/// updates. -pub struct PersistentCache { - pmap: PMap, - // Persistent Pointer - lru: Persistent, +pub struct PCacheRoot { + map: BTreeMap, Pal>, + lru: RwLock>, +} + +#[derive(Debug)] +pub struct PCacheMapEntry { + size: usize, + lru_node: PalPtr>, + data: PalPtr, } /// Configuration for a persistent cache. @@ -104,51 +121,200 @@ pub struct PersistentCacheConfig { pub bytes: usize, } -impl PersistentCache { +/// Ephemeral struct created in the preparation for an insertion. +pub struct PersistentCacheInsertion<'a, K, T> { + cache: &'a mut PersistentCache, + key: u64, + value: Buf, + baggage: T, +} + +impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { + /// Performs an execution and calls the given function on each evicted entry + /// from the store. On error the entire insertion is aborted and has to be + /// initiated anew. + pub fn insert(self, f: F) -> Result<(), PMapError> + where + F: Fn(&T, Buf) -> Result<(), crate::vdev::Error>, + { + loop { + let key = { + let lock = self.cache.root.lru.read(); + let res = lock.evict(self.value.len() as u64); + match res { + Ok(Some((key, baggage))) => { + // let data = self.cache.pmap.get(key.key())?; + let entry = self.cache.root.map.get(&key).unwrap(); + let data = unsafe { + core::slice::from_raw_parts(entry.data.load() as *const u8, entry.size) + }; + + let buf = Buf::from_persistent_ptr(entry.data, entry.size as u32); + if f(baggage, buf).is_err() { + return Err(PMapError::ExternalError("Writeback failed".into())); + } + key + } + _ => break, + } + }; + // Finally actually remove the entries + let mut entry = self.cache.root.map.remove(&key).unwrap(); + entry.data.free(); + self.cache.tx.send(lru_worker::Msg::Remove(entry.lru_node)); + // self.cache.root.lru.remove(&mut entry.lru_node)?; + // entry.lru_node.free(); + } + + // while let Ok(Some((key, baggage))) = + // self.cache.root.lru.read().evict(self.value.len() as u64) + // { + // // let data = self.cache.pmap.get(key.key())?; + // let entry = self.cache.root.map.get(&key).unwrap(); + // let data = + // unsafe { core::slice::from_raw_parts(entry.data.load() as *const u8, entry.size) }; + // if f(baggage, data).is_err() { + // return Err(PMapError::ExternalError("Writeback failed".into())); + // } + // // Finally actually remove the entries + // let mut entry = self.cache.root.map.remove(&key).unwrap(); + // entry.data.free(); + // self.cache.tx.send(lru_worker::Msg::Remove(entry.lru_node)); + // // self.cache.root.lru.remove(&mut entry.lru_node)?; + // // entry.lru_node.free(); + // } + let lru_ptr = self.cache.pal.allocate(lru::PLRU_NODE_SIZE).unwrap(); + let data = self.value.as_slice_with_padding(); + let data_ptr = self.cache.pal.allocate(data.len()).unwrap(); + data_ptr.copy_from(data, &self.cache.pal); + self.cache.tx.send(lru_worker::Msg::Insert( + lru_ptr.clone(), + self.key, + data.len() as u64, + self.baggage, + )); + // self.cache.root.lru.insert( + // lru_ptr.clone(), + // self.key, + // self.value.len() as u64, + // self.baggage, + // )?; + let map_entry = PCacheMapEntry { + lru_node: lru_ptr, + data: data_ptr, + size: data.len(), + }; + self.cache.root.map.insert(self.key, map_entry); + Ok(()) + } +} + +impl PersistentCache { /// Open an existent [PersistentCache]. Fails if no cache exist or invalid. pub fn open>(path: P) -> Result { - let mut pmap = PMap::open(path.into())?; + let pal = Pal::open(path.into()).unwrap(); + let root = pal.root(size_of::>()).unwrap(); + let (tx, rx) = crossbeam_channel::unbounded(); + let root_lru = Persistent(root.clone()); + let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); + let root = Persistent(root); Ok(Self { - lru: Plru::open(&mut pmap)?, - pmap, + pal, + tx, + root, + hndl: Some(hndl), + key_type: PhantomData::default(), }) } /// Create a new [PersistentCache] in the specified path. Fails if underlying resources are not valid. pub fn create>(path: P, size: usize) -> Result { - let mut pmap = PMap::create(path.into(), size)?; + let mut pal = Pal::create(path.into(), size, 0o666).unwrap(); + let mut root: PalPtr> = pal.root(size_of::>()).unwrap(); + root.init( + &PCacheRoot { + lru: RwLock::new(Plru::init(size as u64)), + map: BTreeMap::new_in(pal.clone()), + }, + std::mem::size_of::>(), + ); + let (tx, rx) = crossbeam_channel::unbounded(); + let root_lru = Persistent(root.clone()); + let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); + let mut root = Persistent(root); Ok(Self { - lru: Plru::create(&mut pmap, size as u64)?, - pmap, + pal, + tx, + root, + hndl: Some(hndl), + key_type: PhantomData::default(), }) } /// Fetch an entry from the hashmap. - pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { - let k = DataKey::from(key, &mut self.pmap); - self.lru.touch(&mut self.pmap, k.to_lru_key())?; - Ok(self.pmap.get(k.key())?) + pub fn get(&self, key: K) -> Result<&[u8], PMapError> { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + let res = self.root.map.get(&hash); + if let Some(entry) = res { + self.tx.send(lru_worker::Msg::Touch(entry.lru_node)); + // self.root.lru.touch(&entry.lru_node)?; + Ok(unsafe { core::slice::from_raw_parts(entry.data.load() as *const u8, entry.size) }) + } else { + Err(PMapError::DoesNotExist) + } } - /// Insert an entry and remove infrequent values. - pub fn insert(&mut self, key: K, value: &[u8]) -> Result<(), PMapError> { - // TODO: Update old values? Convenience - let k = DataKey::from(key, &mut self.pmap); - self.pmap.insert(k.key(), value)?; - self.lru - .insert(&mut self.pmap, k.to_lru_key(), value.len() as u64)?; - while let Some(id) = self.lru.evict() { - self.pmap.remove(id.key())?; - self.lru.remove(&mut self.pmap, id.to_lru_key())?; + /// Return a [Buf]. + /// + /// TODO: We have to pin these entries to ensure that they may not be + /// evicted while in read-only mode. + pub fn get_buf(&self, key: K) -> Result { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + let res = self.root.map.get(&hash); + if let Some(entry) = res { + self.tx.send(lru_worker::Msg::Touch(entry.lru_node)); + // self.root.lru.touch(&entry.lru_node)?; + Ok(Buf::from_persistent_ptr(entry.data, entry.size as u32)) + } else { + Err(PMapError::DoesNotExist) + } + } + + /// Start an insertion. An insertion can only be successfully completed if values are properly evicted from the cache + pub fn prepare_insert<'a>( + &'a mut self, + key: K, + value: Buf, + baggage: T, + ) -> PersistentCacheInsertion<'a, K, T> { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + PersistentCacheInsertion { + cache: self, + key: hash, + value, + baggage, } - Ok(()) } /// Remove an entry. - pub fn remove(&mut self, key: K) -> Result<(), PMapError> { - let k = DataKey::from(key, &mut self.pmap); - self.pmap.remove(k.key())?; - self.lru.remove(&mut self.pmap, k.to_lru_key())?; - Ok(()) + pub fn remove(&mut self, key: K) -> Result<(), PMapError> { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + if let Some(mut entry) = self.root.map.remove(&hash) { + self.tx.send(lru_worker::Msg::Remove(entry.lru_node)); + // self.root.lru.remove(&mut entry.lru_node).unwrap(); + // entry.lru_node.free(); + entry.data.free(); + Ok(()) + } else { + Err(PMapError::DoesNotExist) + } } } diff --git a/betree/src/storage_pool/mod.rs b/betree/src/storage_pool/mod.rs index 66bbe0f6..c8c39d0f 100644 --- a/betree/src/storage_pool/mod.rs +++ b/betree/src/storage_pool/mod.rs @@ -58,6 +58,10 @@ pub trait StoragePoolLayer: Clone + Send + Sync + 'static { /// Issues a write request that might happen in the background. fn begin_write(&self, data: Buf, offset: DiskOffset) -> VdevResult<()>; + fn begin_write_offload(&self, offset: DiskOffset, f: F) -> VdevResult<()> + where + F: FnOnce() + Send + 'static; + /// Writes the given `data` at `offset` for every `LeafVdev`. fn write_raw(&self, data: Buf, offset: Block) -> VdevResult<()>; diff --git a/betree/src/storage_pool/unit.rs b/betree/src/storage_pool/unit.rs index 13e7373e..4b53f902 100644 --- a/betree/src/storage_pool/unit.rs +++ b/betree/src/storage_pool/unit.rs @@ -166,8 +166,6 @@ impl StoragePoolLayer for StoragePoolUnit { .write(data, offset.block_offset()) .await; - // TODO: what about multiple writes to same offset? - // NOTE: This is currently covered in the tests and fails as expected inner.write_back_queue.mark_completed(&offset).await; res })?; @@ -183,6 +181,28 @@ impl StoragePoolLayer for StoragePoolUnit { ret } + fn begin_write_offload(&self, offset: DiskOffset, f: F) -> vdev::Result<()> + where + F: FnOnce() + Send + 'static, + { + let inner = self.inner.clone(); + let (enqueue_done, wait_for_enqueue) = futures::channel::oneshot::channel(); + let write = self.inner.pool.spawn_with_handle(async move { + wait_for_enqueue.await.unwrap(); + f(); + + inner.write_back_queue.mark_completed(&offset).await; + Ok(()) + })?; + + let ret = self.inner.write_back_queue.enqueue(offset, Box::pin(write)); + + enqueue_done + .send(()) + .expect("Couldn't unlock enqueued write task"); + ret + } + fn write_raw(&self, data: Buf, offset: Block) -> Result<(), VdevError> { let vec = self .inner diff --git a/betree/src/tree/errors.rs b/betree/src/tree/errors.rs index 79cc9541..f2859350 100644 --- a/betree/src/tree/errors.rs +++ b/betree/src/tree/errors.rs @@ -3,7 +3,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Storage operation could not be performed")] + #[error("Storage operation could not be performed. `{source}`")] DmuError { #[from] source: crate::data_management::Error, diff --git a/docs/src/rfc/2-persistent-cache-replication.md b/docs/src/rfc/2-persistent-cache-replication.md new file mode 100644 index 00000000..a8a62b6f --- /dev/null +++ b/docs/src/rfc/2-persistent-cache-replication.md @@ -0,0 +1,51 @@ +- Title: Persistent Cache / Data Replication +- Status: DRAFT + +# Description + +A persistent cache is a way to utilize a storage device capabilities without +integrating it further into the existing storage hierarchy by keeping it +essentially in the memory layer semantically. For this we present two +approaches, either may be used as they have different drawbacks and advantages. + +## Fallback Cache + +Once entries are evicted from the volatile cache we remove them by eviction and +allocate resources on the underlying storage media. To prolong this process we +skip this part and write them first to the non-volatile cache and eventually +when they are evicted from there they actually get written to disk. Problems +with this approach are mostly the actual utilization (read + write traffic) +which is not optimal for some storage devices (*cough* PMem). + +Copy-on-Write is another issue as we may need to ensure writes to the storage +device when modifying the entry, but this can be taken care of in the DMU. + +Regardless, it is easy to implement and should remain an option we try. + +## Read-heavy-cache + +Another approach which promises overall a better specific device utilization is +the focus on *read* heavy data which seldomly gets modified. Most of the issues +mentioned above are offloaded are resolved by this approach by it does depend on +a *decision maker* which moves data from storage or cache to the device. Most +issues in this approach are produced by this decision, with the actual result +being completely depending on this. An idea is to use the migration policies as +done with inter-tier data migration, but for this additional classification is +required. Possible workflow node independent classification schemes are still in +need to be constructed for this, which is partially an open topic for research. + +# Purpose + +This RFC tries to avoid to integrate stuff like PMem into the storage hierarchy +as non-latency bound workflows do not really gain an advantage with them, rather +a separate stack is build in the cache level to reduce latency on accesses there +(if the data fits into storage). + +# Drawbacks + + + +# Alternatives + +> Find atleast one alternative, this thought process helps preventing being +> stuck in certain implemenation details diff --git a/fio-haura/src/fio-engine-haura.c b/fio-haura/src/fio-engine-haura.c index 5aa16a6e..7053bfe7 100644 --- a/fio-haura/src/fio-engine-haura.c +++ b/fio-haura/src/fio-engine-haura.c @@ -304,11 +304,17 @@ static int fio_haura_setup(struct thread_data *td) { /* Haura needs some additional space to provide extra data like object * pointers and metadata. This is more of a hack, but nonetheless. */ creat(td->files[idx]->file_name, 0644); - if (truncate(td->files[idx]->file_name, td->o.size + (50 * 1024 * 1024))) { + + // FIXME: If only one file is specified the file_size_high argument is 0. + // A fallback in this case is to use the usual size. + if (truncate(td->files[idx]->file_name, td->o.size * 2)) { fprintf( stderr, "Could not retruncate file to provide enough storage for Haura.\n"); } + // Set the already allocated size so that fio avoids the internal initial + // layouting + td->files[idx]->real_file_size = td->o.size * 2; } td->io_ops_data = malloc(sizeof(size_t)); @@ -355,6 +361,11 @@ static int fio_haura_setup(struct thread_data *td) { ((u_int32_t *)buf)[off] = random(); } while (max_io_size > total_written) { + if (betree_object_size(global_data.objs[idx], &error) >= + max_io_size) { + printf("haura: skipping prepopulation of object %lu", idx + 1); + break; + } unsigned long written = 0; betree_object_write_at(global_data.objs[idx], buf, block_size,