diff --git a/betree/Cargo.toml b/betree/Cargo.toml index a93612b2..e634df15 100644 --- a/betree/Cargo.toml +++ b/betree/Cargo.toml @@ -70,7 +70,7 @@ criterion = "0.3" tempfile = "3.6.0" [features] -default = ["init_env_logger", "figment_config"] +default = ["init_env_logger", "figment_config", "nvm"] # unlock unstable API for consumption by bectl and other debugging tools internal-api = [] diff --git a/betree/include/betree.h b/betree/include/betree.h index d7131aec..364815f9 100644 --- a/betree/include/betree.h +++ b/betree/include/betree.h @@ -1,7 +1,7 @@ #ifndef betree_h #define betree_h -/* Generated with cbindgen:0.24.3 */ +/* Generated with cbindgen:0.24.5 */ /* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */ @@ -467,6 +467,12 @@ int betree_object_read_at(struct obj_t *obj, unsigned long *n_read, struct err_t **err); +/** + * Return the objects size in bytes. If the size could not be determined + * it is assumed the object is zero-sized. + */ +unsigned long betree_object_size(const struct obj_t *obj, struct err_t **err); + /** * Try to write `buf_len` bytes from `buf` into `obj`, starting at `offset` bytes into the objects * data. diff --git a/betree/src/c_interface.rs b/betree/src/c_interface.rs index b4717b08..71b2e1cc 100644 --- a/betree/src/c_interface.rs +++ b/betree/src/c_interface.rs @@ -961,16 +961,17 @@ pub unsafe extern "C" fn betree_object_write_at( .handle_result(err) } -/* -/// Return the objects size in bytes. +/// Return the objects size in bytes. If the size could not be determined +/// it is assumed the object is zero-sized. #[no_mangle] -pub unsafe extern "C" fn betree_object_status(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { +pub unsafe extern "C" fn betree_object_size(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { let obj = &(*obj).0; let info = obj.info(); - obj. - obj.size() + info.and_then(|ok_opt| Ok(ok_opt.map(|obj_info| obj_info.size).unwrap_or(0))) + .unwrap_or(0) } +/* /// Returns the last modification timestamp in microseconds since the Unix epoch. #[no_mangle] pub unsafe extern "C" fn betree_object_mtime_us(obj: *const obj_t) -> c_ulong { diff --git a/betree/src/checksum.rs b/betree/src/checksum.rs index fadc9a4a..47b27c19 100644 --- a/betree/src/checksum.rs +++ b/betree/src/checksum.rs @@ -2,12 +2,17 @@ use crate::size::{Size, StaticSize}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{error::Error, fmt, hash::Hasher, iter::once}; +use std::{ + error::Error, + fmt, + hash::{Hash, Hasher}, + iter::once, +}; use twox_hash; /// A checksum to verify data integrity. pub trait Checksum: - Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static + Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static { /// Builds a new `Checksum`. type Builder: Builder; @@ -67,7 +72,7 @@ impl Error for ChecksumError { /// `XxHash` contains a digest of `xxHash` /// which is an "extremely fast non-cryptographic hash algorithm" /// () -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct XxHash(u64); impl StaticSize for XxHash { diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index 7d9032b1..165d86fd 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -5,6 +5,8 @@ use super::{ object_ptr::ObjectPointer, CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference, }; +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; use crate::{ allocator::{Action, SegmentAllocator, SegmentId}, buffer::Buf, @@ -60,6 +62,8 @@ where next_modified_node_id: AtomicU64, next_disk_id: AtomicU64, report_tx: Option>, + #[cfg(feature = "nvm")] + persistent_cache: Option>>>>, } impl Dmu @@ -76,6 +80,9 @@ where alloc_strategy: [[Option; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES], cache: E, handler: Handler>>, + #[cfg(feature = "nvm")] persistent_cache: Option< + PersistentCache>, + >, ) -> Self { let allocation_data = (0..pool.storage_class_count()) .map(|class| { @@ -103,6 +110,8 @@ where next_modified_node_id: AtomicU64::new(1), next_disk_id: AtomicU64::new(0), report_tx: None, + #[cfg(feature = "nvm")] + persistent_cache: persistent_cache.map(|cache| Arc::new(Mutex::new(cache))), } } @@ -226,6 +235,23 @@ where let offset = op.offset(); let generation = op.generation(); + #[cfg(feature = "nvm")] + let compressed_data = { + let mut buf = None; + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut cache = pcache_mtx.lock(); + if let Ok(buffer) = cache.get(offset) { + buf = Some(Buf::from_zero_padded(buffer.to_vec())) + } + } + if let Some(b) = buf { + b + } else { + self.pool + .read(op.size(), op.offset(), op.checksum().clone())? + } + }; + #[cfg(not(feature = "nvm"))] let compressed_data = self .pool .read(op.size(), op.offset(), op.checksum().clone())?; @@ -329,7 +355,30 @@ where let mid = match key { ObjectKey::InWriteback(_) => unreachable!(), - ObjectKey::Unmodified { .. } => return Ok(()), + ObjectKey::Unmodified { + offset, + generation: _, + } => { + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut pcache = pcache_mtx.lock(); + // TODO: Specify correct constant instead of magic 🪄 + let mut vec = Vec::with_capacity(4 * 1024 * 1024); + object.value_mut().get_mut().pack(&mut vec)?; + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, &vec, None) + .insert(|maybe_offset, data| { + // TODO: Write eventually not synced data to disk finally. + if let Some(offset) = maybe_offset { + self.pool + .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + } + Ok(()) + })?; + } + return Ok(()); + } ObjectKey::Modified(mid) => mid, }; @@ -341,6 +390,8 @@ where drop(cache); let object = CacheValueRef::write(entry); + // Eviction at this points only writes a singular node as all children + // need to be unmodified beforehand. self.handle_write_back(object, mid, true, pk)?; Ok(()) } @@ -412,7 +463,37 @@ where state.finish() }; - self.pool.begin_write(compressed_data, offset)?; + #[cfg(feature = "nvm")] + let skip_write_back = self.persistent_cache.is_some(); + #[cfg(not(feature = "nvm"))] + let skip_write_back = false; + + if !skip_write_back { + self.pool.begin_write(compressed_data, offset)?; + } else { + let bar = compressed_data.clone(); + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let away = Arc::clone(pcache_mtx); + self.pool.begin_foo(offset, move || { + let mut pcache = away.lock(); + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, &compressed_data, None) + .insert(|maybe_offset, data| { + // TODO: Deduplicate this? + // if let Some(offset) = maybe_offset { + // self.pool + // .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + // } + panic!("This should not have happnened in the debug run!"); + Ok(()) + }) + .unwrap(); + })?; + } + self.pool.begin_write(bar, offset)?; + } let obj_ptr = ObjectPointer { offset, @@ -499,11 +580,7 @@ where { warn!( "Storage tier {class} does not have enough space remaining. {} blocks of {}", - self.handler - .free_space_tier(class) - .unwrap() - .free - .as_u64(), + self.handler.free_space_tier(class).unwrap().free.as_u64(), size.as_u64() ); continue; diff --git a/betree/src/data_management/errors.rs b/betree/src/data_management/errors.rs index 4bbb7d34..5a0238f0 100644 --- a/betree/src/data_management/errors.rs +++ b/betree/src/data_management/errors.rs @@ -1,10 +1,12 @@ #![allow(missing_docs, unused_doc_comments)] use crate::{storage_pool::DiskOffset, vdev::Block}; +#[cfg(feature = "nvm")] +use pmem_hashmap::PMapError; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("The storage pool encountered an error.")] + #[error("The storage pool encountered an error. `{source}`")] VdevError { #[from] source: crate::vdev::Error, @@ -33,6 +35,12 @@ pub enum Error { CallbackError, #[error("A raw allocation has failed.")] RawAllocationError { at: DiskOffset, size: Block }, + #[cfg(feature = "nvm")] + #[error("A error occured while accessing the persistent cache. `{source}`")] + PersistentCacheError { + #[from] + source: PMapError, + }, } // To avoid recursive error types here, define a simple translation from diff --git a/betree/src/data_management/object_ptr.rs b/betree/src/data_management/object_ptr.rs index 0dbbd6d1..a394e1bf 100644 --- a/betree/src/data_management/object_ptr.rs +++ b/betree/src/data_management/object_ptr.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)] /// A pointer to an on-disk serialized object. pub struct ObjectPointer { pub(super) decompression_tag: DecompressionTag, diff --git a/betree/src/database/errors.rs b/betree/src/database/errors.rs index 4dcee1ca..5863d97a 100644 --- a/betree/src/database/errors.rs +++ b/betree/src/database/errors.rs @@ -17,7 +17,7 @@ pub enum Error { #[from] source: crate::storage_pool::Error, }, - #[error("A tree operation encountered an error. This is likely an internal error.")] + #[error("A tree operation encountered an error. This is likely an internal error. `{source}`")] TreeError { #[from] source: crate::tree::Error, @@ -56,7 +56,7 @@ pub enum Error { InUse, #[error("Message surpasses the maximum length. If you cannot shrink your value, use an object store instead.")] MessageTooLarge, - #[error("Could not serialize the given data. This is an internal error.")] + #[error("Could not serialize the given data. This is an internal error. `{source}`")] SerializeFailed { #[from] source: serde_json::Error, diff --git a/betree/src/database/mod.rs b/betree/src/database/mod.rs index e8aabaa3..a0954eaf 100644 --- a/betree/src/database/mod.rs +++ b/betree/src/database/mod.rs @@ -21,6 +21,10 @@ use crate::{ vdev::Block, StoragePreference, }; + +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; + use bincode::{deserialize, serialize_into}; use byteorder::{BigEndian, ByteOrder, LittleEndian}; use crossbeam_channel::Sender; @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration { compression: CompressionConfiguration::None, cache_size: DEFAULT_CACHE_SIZE, #[cfg(feature = "nvm")] - persistent_cache_path: None, + persistent_cache: None, access_mode: AccessMode::OpenIfExists, sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS), metrics: None, @@ -233,6 +237,12 @@ impl DatabaseConfiguration { } } + #[cfg(feature = "nvm")] + let pcache = self.persistent_cache.as_ref().map(|config| { + PersistentCache::create(&config.path, config.bytes) + .unwrap_or_else(|_| PersistentCache::open(&config.path).unwrap()) + }); + Dmu::new( self.compression.to_builder(), XxHashBuilder, @@ -241,6 +251,8 @@ impl DatabaseConfiguration { strategy, ClockCache::new(self.cache_size), handler, + #[cfg(feature = "nvm")] + pcache, ) } diff --git a/betree/src/replication/lru.rs b/betree/src/replication/lru.rs index 6fd217ee..d32ee188 100644 --- a/betree/src/replication/lru.rs +++ b/betree/src/replication/lru.rs @@ -1,9 +1,10 @@ +use super::{DataKey, Persistent, PREFIX_LRU}; use pmem_hashmap::{PMap, PMapError}; +use std::{marker::PhantomData, mem::size_of, ptr::NonNull}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] /// Key pointing to an LRU node in a persistent cache. pub struct LruKey(u64); - impl LruKey { /// Fetch and cast a pmem pointer to a [PlruNode]. /// @@ -11,7 +12,7 @@ impl LruKey { /// ====== /// The internals of this method are highly unsafe. Concurrent usage and /// removal are urgently discouraged. - fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> { + fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> { map.get(self.key()).and_then(|node_raw| unsafe { Ok(std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( node_raw.try_into().unwrap(), @@ -34,16 +35,22 @@ impl LruKey { /// Persistent LRU #[repr(C)] -pub struct Plru { +pub struct Plru { head: Option, tail: Option, // in Blocks? Return evicted element when full capacity: u64, count: u64, size: u64, + // Satisfy the rust compiler by adding a zero sized phantom here + // Also, avoid the drop checker by using a raw ptr + key_type: PhantomData<*const T>, } -impl Plru { +// hack ⛏ +const PLRU_ROOT_LENGTH_READ_COMMENT: usize = size_of::>(); + +impl Plru { pub fn create(map: &mut PMap, capacity: u64) -> Result, PMapError> { let this = Self { head: None, @@ -51,9 +58,20 @@ impl Plru { capacity, size: 0, count: 0, + key_type: PhantomData::default(), }; map.insert([super::PREFIX_LRU_ROOT], unsafe { - &std::mem::transmute::<_, [u8; size_of::()]>(this) + // This does not work as of + // https://github.com/rust-lang/rust/issues/43408 + // + // &std::mem::transmute::<_, [u8; size_of::>()]>(this) + // + // While annoying, we can avoid this problem by specifying a certain + // type to the struct and use the result from this constant for + // every other type as the type parameter is only a phantom. 👻 + // + // instead let's do this: + &std::mem::transmute::<_, [u8; PLRU_ROOT_LENGTH_READ_COMMENT]>(this) })?; Self::open(map) } @@ -78,7 +96,7 @@ impl Plru { // Fixate new head let old_head_ptr = self.head.expect("Invalid State"); - let old_head = old_head_ptr.fetch(map).unwrap(); + let old_head: &mut PlruNode = old_head_ptr.fetch(map).unwrap(); old_head.fwd = Some(node_ptr); node.back = self.head; self.head = Some(node_ptr); @@ -87,19 +105,26 @@ impl Plru { } /// Add a new entry into the LRU. Will fail if already present. - pub fn insert(&mut self, map: &mut PMap, node_ptr: LruKey, size: u64) -> Result<(), PMapError> { + pub fn insert( + &mut self, + map: &mut PMap, + node_ptr: LruKey, + size: u64, + baggage: T, + ) -> Result<(), PMapError> { let node = PlruNode { fwd: None, back: self.head, size, data: node_ptr.0, + key: baggage, }; - map.insert(node_ptr.key(), &unsafe { - std::mem::transmute::<_, [u8; PLRU_NODE_SIZE]>(node) + map.insert(node_ptr.key(), unsafe { + std::mem::transmute::<_, &[u8; PLRU_NODE_SIZE]>(&node) })?; if let Some(head_ptr) = self.head { - let head = head_ptr.fetch(map)?; + let head: &mut PlruNode = head_ptr.fetch(map)?; head.fwd = Some(node_ptr); self.head = Some(node_ptr); } else { @@ -114,28 +139,29 @@ impl Plru { /// Checks whether an eviction is necessary and which entry to evict. /// This call does not perform the removal itself. - pub fn evict(&mut self) -> Option { - if let (Some(tail), true) = (self.tail, self.size > self.capacity) { - return Some(DataKey(tail.0)); + pub fn evict(&self, map: &mut PMap, size: u64) -> Result, PMapError> { + if let (Some(tail), true) = (self.tail, self.size + size > self.capacity) { + let node = tail.fetch(map)?; + return Ok(Some((DataKey(tail.0), node.key))); } - None + Ok(None) } fn cut_node_and_stitch( &mut self, map: &mut PMap, - node: &mut PlruNode, + node: &mut PlruNode, ) -> Result<(), PMapError> { let node_ptr = LruKey(node.data); if let Some(forward_ptr) = node.fwd { - let forward = forward_ptr.fetch(map)?; + let forward: &mut PlruNode = forward_ptr.fetch(map)?; forward.back = node.back; } if self.tail == Some(node_ptr) { self.tail = node.fwd; } if let Some(back_ptr) = node.back { - let back = back_ptr.fetch(map)?; + let back: &mut PlruNode = back_ptr.fetch(map)?; back.fwd = node.fwd; } if self.head == Some(node_ptr) { @@ -159,12 +185,6 @@ impl Plru { } } -use std::{mem::size_of, ptr::NonNull}; - -use super::{DataKey, Persistent, PREFIX_LRU}; - -const PLRU_NODE_SIZE: usize = size_of::(); - /// Ephemeral Wrapper around a byte array for sane access code. /// /// Structure @@ -175,17 +195,44 @@ const PLRU_NODE_SIZE: usize = size_of::(); /// .. │ ├────> /// └───┘ back /// +/// Size Constraint +/// =============== +/// This structure allows for a generic member which is to be returned when +/// evictions are happening. The size available to the entire object is 256 +/// bytes of which the custom type can occupy at most 208 bytes. +/// /// Safety /// ====== /// Using this wrapper requires transmuting the underlying byte array, which /// invalidates, when used on references, all borrow checker guarantees. Use /// with extrem caution, and be sure what you are doing. #[repr(C)] -pub struct PlruNode { +pub struct PlruNode { fwd: Option, back: Option, size: u64, data: u64, + key: T, +} +const PLRU_NODE_SIZE: usize = 256; + +impl PlruNode { + const SIZE_CONSTRAINT: () = assert!( + std::mem::size_of::>() < PLRU_NODE_SIZE, + "Size of attached data to LRU entry surpasses size constraint." + ); + + pub fn new(fwd: Option, back: Option, size: u64, data: u64, key: T) -> Self { + // has to remain to ensure that the code path is evaluated by rustc + let _ = Self::SIZE_CONSTRAINT; + Self { + fwd, + back, + size, + data, + key, + } + } } #[cfg(test)] @@ -229,7 +276,7 @@ mod tests { fn new() { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let _ = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + let _ = Plru::<()>::create(&mut pmap, 32 * 1024 * 1024).unwrap(); } #[test] @@ -237,11 +284,11 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); + lru.insert(&mut pmap, LruKey(100), 321, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(100))); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); + lru.insert(&mut pmap, LruKey(101), 322, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(101))); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + lru.insert(&mut pmap, LruKey(102), 323, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(102))); } @@ -250,9 +297,9 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + lru.insert(&mut pmap, LruKey(100), 321, ()).unwrap(); + lru.insert(&mut pmap, LruKey(101), 322, ()).unwrap(); + lru.insert(&mut pmap, LruKey(102), 323, ()).unwrap(); } #[test] @@ -260,21 +307,33 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) .unwrap(); - lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024) + lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024, ()) + .unwrap(); + lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024, ()) .unwrap(); - lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024).unwrap(); assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), Some(LruKey(100)) ); lru.remove(&mut pmap, LruKey(100)).unwrap(); - lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024).unwrap(); - assert_eq!(lru.evict().and_then(|opt| Some(LruKey(opt.0))), None); - lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024, ()) + .unwrap(); + assert_eq!( + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), + None + ); + lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024, ()) + .unwrap(); assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), Some(LruKey(101)) ); } @@ -283,11 +342,28 @@ mod tests { fn remove() { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) + let mut lru: Persistent> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) .unwrap(); lru.remove(&mut pmap, LruKey(100)).unwrap(); assert_eq!(lru.head, None); assert_eq!(lru.tail, None); } + + #[test] + fn reinit() { + let file = TestFile::new(); + { + let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); + let mut lru: Persistent> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) + .unwrap(); + } + { + let mut pmap = PMap::open(file.path()).unwrap(); + let lru: Persistent> = Plru::open(&mut pmap).unwrap(); + assert_eq!(lru.head, Some(LruKey(100))); + assert_eq!(lru.tail, Some(LruKey(100))); + } + } } diff --git a/betree/src/replication/mod.rs b/betree/src/replication/mod.rs index 019e86de..a552e842 100644 --- a/betree/src/replication/mod.rs +++ b/betree/src/replication/mod.rs @@ -10,17 +10,6 @@ //! 3.3 SLOW //! 3.4 SLOWEST //! -//! -//! ``` -//! 1 -//! / \ -//! 1 1 -//! / \ / \ -//! 1 2 1 2 -//! /|\ | /|\ |\ -//! 321 2 313 22 -//! ``` -//! //! Map Keys //! ======== //! @@ -34,6 +23,7 @@ const PREFIX_LRU_ROOT: u8 = 2; use std::{ hash::Hash, + marker::PhantomData, ops::{Deref, DerefMut}, path::PathBuf, ptr::NonNull, @@ -89,10 +79,12 @@ impl DataKey { /// Persistent byte array cache. Optimized for read performance, avoid frequent /// updates. -pub struct PersistentCache { +pub struct PersistentCache { pmap: PMap, // Persistent Pointer - lru: Persistent, + lru: Persistent>, + // Fix key types + key_type: PhantomData, } /// Configuration for a persistent cache. @@ -104,13 +96,58 @@ pub struct PersistentCacheConfig { pub bytes: usize, } -impl PersistentCache { +/// Ephemeral struct created in the preparation for an insertion. +pub struct PersistentCacheInsertion<'a, K, T> { + cache: &'a mut PersistentCache, + key: DataKey, + value: &'a [u8], + baggage: T, +} + +impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { + /// Performs an execution and calls the given function on each evicted entry + /// from the store. On error the entire insertion is aborted and has to be + /// initiated anew. + pub fn insert(self, f: F) -> Result<(), PMapError> + where + F: Fn(&T, &[u8]) -> Result<(), crate::vdev::Error>, + { + while let Ok(Some((key, baggage))) = self + .cache + .lru + .evict(&mut self.cache.pmap, self.value.len() as u64) + { + let data = self.cache.pmap.get(key.key())?; + if f(baggage, data).is_err() { + return Err(PMapError::ExternalError("Writeback failed".into())); + } + // Finally actually remove the entries + self.cache.pmap.remove(key.key())?; + self.cache + .lru + .remove(&mut self.cache.pmap, key.to_lru_key())?; + } + + self.cache.pmap.insert(self.key.key(), self.value)?; + self.cache.lru.insert( + &mut self.cache.pmap, + self.key.to_lru_key(), + self.value.len() as u64, + self.baggage, + )?; + + Ok(()) + } +} + +impl PersistentCache { /// Open an existent [PersistentCache]. Fails if no cache exist or invalid. pub fn open>(path: P) -> Result { let mut pmap = PMap::open(path.into())?; Ok(Self { lru: Plru::open(&mut pmap)?, pmap, + key_type: PhantomData::default(), }) } @@ -120,32 +157,35 @@ impl PersistentCache { Ok(Self { lru: Plru::create(&mut pmap, size as u64)?, pmap, + key_type: PhantomData::default(), }) } /// Fetch an entry from the hashmap. - pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { + pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { let k = DataKey::from(key, &mut self.pmap); self.lru.touch(&mut self.pmap, k.to_lru_key())?; Ok(self.pmap.get(k.key())?) } - /// Insert an entry and remove infrequent values. - pub fn insert(&mut self, key: K, value: &[u8]) -> Result<(), PMapError> { - // TODO: Update old values? Convenience + /// Start an insertion. An insertion can only be successfully completed if values are properly evicted from the cache + pub fn prepare_insert<'a>( + &'a mut self, + key: K, + value: &'a [u8], + baggage: T, + ) -> PersistentCacheInsertion<'a, K, T> { let k = DataKey::from(key, &mut self.pmap); - self.pmap.insert(k.key(), value)?; - self.lru - .insert(&mut self.pmap, k.to_lru_key(), value.len() as u64)?; - while let Some(id) = self.lru.evict() { - self.pmap.remove(id.key())?; - self.lru.remove(&mut self.pmap, id.to_lru_key())?; + PersistentCacheInsertion { + cache: self, + key: k, + value, + baggage, } - Ok(()) } /// Remove an entry. - pub fn remove(&mut self, key: K) -> Result<(), PMapError> { + pub fn remove(&mut self, key: K) -> Result<(), PMapError> { let k = DataKey::from(key, &mut self.pmap); self.pmap.remove(k.key())?; self.lru.remove(&mut self.pmap, k.to_lru_key())?; diff --git a/betree/src/storage_pool/mod.rs b/betree/src/storage_pool/mod.rs index 66bbe0f6..581a4b94 100644 --- a/betree/src/storage_pool/mod.rs +++ b/betree/src/storage_pool/mod.rs @@ -58,6 +58,10 @@ pub trait StoragePoolLayer: Clone + Send + Sync + 'static { /// Issues a write request that might happen in the background. fn begin_write(&self, data: Buf, offset: DiskOffset) -> VdevResult<()>; + fn begin_foo(&self, offset: DiskOffset, f: F) -> VdevResult<()> + where + F: FnOnce() + Send + 'static; + /// Writes the given `data` at `offset` for every `LeafVdev`. fn write_raw(&self, data: Buf, offset: Block) -> VdevResult<()>; diff --git a/betree/src/storage_pool/unit.rs b/betree/src/storage_pool/unit.rs index 13e7373e..99272f24 100644 --- a/betree/src/storage_pool/unit.rs +++ b/betree/src/storage_pool/unit.rs @@ -166,8 +166,6 @@ impl StoragePoolLayer for StoragePoolUnit { .write(data, offset.block_offset()) .await; - // TODO: what about multiple writes to same offset? - // NOTE: This is currently covered in the tests and fails as expected inner.write_back_queue.mark_completed(&offset).await; res })?; @@ -183,6 +181,25 @@ impl StoragePoolLayer for StoragePoolUnit { ret } + fn begin_foo(&self, offset: DiskOffset, f: F) -> vdev::Result<()> + where + F: FnOnce() + Send + 'static, + { + let (enqueue_done, wait_for_enqueue) = futures::channel::oneshot::channel(); + let write = self.inner.pool.spawn_with_handle(async move { + wait_for_enqueue.await.unwrap(); + f(); + Ok(()) + })?; + + let ret = self.inner.write_back_queue.enqueue(offset, Box::pin(write)); + + enqueue_done + .send(()) + .expect("Couldn't unlock enqueued write task"); + ret + } + fn write_raw(&self, data: Buf, offset: Block) -> Result<(), VdevError> { let vec = self .inner diff --git a/betree/src/tree/errors.rs b/betree/src/tree/errors.rs index 79cc9541..f2859350 100644 --- a/betree/src/tree/errors.rs +++ b/betree/src/tree/errors.rs @@ -3,7 +3,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Storage operation could not be performed")] + #[error("Storage operation could not be performed. `{source}`")] DmuError { #[from] source: crate::data_management::Error, diff --git a/docs/src/rfc/2-persistent-cache-replication.md b/docs/src/rfc/2-persistent-cache-replication.md new file mode 100644 index 00000000..a8a62b6f --- /dev/null +++ b/docs/src/rfc/2-persistent-cache-replication.md @@ -0,0 +1,51 @@ +- Title: Persistent Cache / Data Replication +- Status: DRAFT + +# Description + +A persistent cache is a way to utilize a storage device capabilities without +integrating it further into the existing storage hierarchy by keeping it +essentially in the memory layer semantically. For this we present two +approaches, either may be used as they have different drawbacks and advantages. + +## Fallback Cache + +Once entries are evicted from the volatile cache we remove them by eviction and +allocate resources on the underlying storage media. To prolong this process we +skip this part and write them first to the non-volatile cache and eventually +when they are evicted from there they actually get written to disk. Problems +with this approach are mostly the actual utilization (read + write traffic) +which is not optimal for some storage devices (*cough* PMem). + +Copy-on-Write is another issue as we may need to ensure writes to the storage +device when modifying the entry, but this can be taken care of in the DMU. + +Regardless, it is easy to implement and should remain an option we try. + +## Read-heavy-cache + +Another approach which promises overall a better specific device utilization is +the focus on *read* heavy data which seldomly gets modified. Most of the issues +mentioned above are offloaded are resolved by this approach by it does depend on +a *decision maker* which moves data from storage or cache to the device. Most +issues in this approach are produced by this decision, with the actual result +being completely depending on this. An idea is to use the migration policies as +done with inter-tier data migration, but for this additional classification is +required. Possible workflow node independent classification schemes are still in +need to be constructed for this, which is partially an open topic for research. + +# Purpose + +This RFC tries to avoid to integrate stuff like PMem into the storage hierarchy +as non-latency bound workflows do not really gain an advantage with them, rather +a separate stack is build in the cache level to reduce latency on accesses there +(if the data fits into storage). + +# Drawbacks + + + +# Alternatives + +> Find atleast one alternative, this thought process helps preventing being +> stuck in certain implemenation details diff --git a/fio-haura/src/fio-engine-haura.c b/fio-haura/src/fio-engine-haura.c index 5aa16a6e..678186f3 100644 --- a/fio-haura/src/fio-engine-haura.c +++ b/fio-haura/src/fio-engine-haura.c @@ -304,11 +304,14 @@ static int fio_haura_setup(struct thread_data *td) { /* Haura needs some additional space to provide extra data like object * pointers and metadata. This is more of a hack, but nonetheless. */ creat(td->files[idx]->file_name, 0644); - if (truncate(td->files[idx]->file_name, td->o.size + (50 * 1024 * 1024))) { + if (truncate(td->files[idx]->file_name, td->o.file_size_high)) { fprintf( stderr, "Could not retruncate file to provide enough storage for Haura.\n"); } + // Set the already allocated size so that fio avoids the internal initial + // layouting + td->files[idx]->real_file_size = td->o.file_size_high; } td->io_ops_data = malloc(sizeof(size_t)); @@ -324,7 +327,7 @@ static int fio_haura_setup(struct thread_data *td) { return bail(error); } fio_haura_translate(td, cfg); - if ((global_data.db = betree_create_db(cfg, &error)) == NULL) { + if ((global_data.db = betree_open_or_create_db(cfg, &error)) == NULL) { return bail(error); } if ((global_data.obj_s = betree_create_object_store( @@ -355,6 +358,11 @@ static int fio_haura_setup(struct thread_data *td) { ((u_int32_t *)buf)[off] = random(); } while (max_io_size > total_written) { + if (betree_object_size(global_data.objs[idx], &error) >= + max_io_size) { + printf("haura: skipping prepopulation of object %lu", idx + 1); + break; + } unsigned long written = 0; betree_object_write_at(global_data.objs[idx], buf, block_size,