From f727205d79a6f3ff2ab31dcb79eb04b1cf94d992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCnsche?= Date: Fri, 30 Jun 2023 10:19:57 +0200 Subject: [PATCH] tmp commit --- betree/Cargo.toml | 2 +- betree/include/betree.h | 8 +- betree/pmem-hashmap/src/allocator.rs | 18 +- betree/src/c_interface.rs | 11 +- betree/src/checksum.rs | 11 +- betree/src/data_management/dmu.rs | 91 +++++- betree/src/data_management/errors.rs | 10 +- betree/src/data_management/object_ptr.rs | 2 +- betree/src/database/errors.rs | 4 +- betree/src/database/mod.rs | 14 +- betree/src/lib.rs | 2 + betree/src/replication/lru.rs | 283 ++++++++++-------- betree/src/replication/mod.rs | 214 ++++++++----- betree/src/storage_pool/mod.rs | 4 + betree/src/storage_pool/unit.rs | 21 +- betree/src/tree/errors.rs | 2 +- .../src/rfc/2-persistent-cache-replication.md | 51 ++++ fio-haura/src/fio-engine-haura.c | 12 +- 18 files changed, 526 insertions(+), 234 deletions(-) create mode 100644 docs/src/rfc/2-persistent-cache-replication.md diff --git a/betree/Cargo.toml b/betree/Cargo.toml index a93612b2..e634df15 100644 --- a/betree/Cargo.toml +++ b/betree/Cargo.toml @@ -70,7 +70,7 @@ criterion = "0.3" tempfile = "3.6.0" [features] -default = ["init_env_logger", "figment_config"] +default = ["init_env_logger", "figment_config", "nvm"] # unlock unstable API for consumption by bectl and other debugging tools internal-api = [] diff --git a/betree/include/betree.h b/betree/include/betree.h index d7131aec..364815f9 100644 --- a/betree/include/betree.h +++ b/betree/include/betree.h @@ -1,7 +1,7 @@ #ifndef betree_h #define betree_h -/* Generated with cbindgen:0.24.3 */ +/* Generated with cbindgen:0.24.5 */ /* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */ @@ -467,6 +467,12 @@ int betree_object_read_at(struct obj_t *obj, unsigned long *n_read, struct err_t **err); +/** + * Return the objects size in bytes. If the size could not be determined + * it is assumed the object is zero-sized. + */ +unsigned long betree_object_size(const struct obj_t *obj, struct err_t **err); + /** * Try to write `buf_len` bytes from `buf` into `obj`, starting at `offset` bytes into the objects * data. diff --git a/betree/pmem-hashmap/src/allocator.rs b/betree/pmem-hashmap/src/allocator.rs index 27b462e6..88b763cf 100644 --- a/betree/pmem-hashmap/src/allocator.rs +++ b/betree/pmem-hashmap/src/allocator.rs @@ -64,12 +64,26 @@ pub enum PalError { // A friendly persistent pointer. Useless without the according handle to the // original arena. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PalPtr { inner: PMEMoid, size: usize, } +impl PartialEq for PMEMoid { + fn eq(&self, other: &Self) -> bool { + self.pool_uuid_lo == other.pool_uuid_lo && self.off == other.off + } +} + +impl Eq for PMEMoid {} + +impl Drop for PalPtr { + fn drop(&mut self) { + // self.free() + } +} + impl PalPtr { /// Translate this persistent ptr to a volatile one. pub fn load(&self) -> *mut c_void { @@ -107,7 +121,7 @@ impl PalPtr { /// Deallocate this object. Required if this value is no longer needed. /// There is *no* automatic deallocation logic. - pub fn free(mut self) { + pub fn free(&mut self) { unsafe { pmemobj_free(&mut self.inner) } } } diff --git a/betree/src/c_interface.rs b/betree/src/c_interface.rs index b4717b08..71b2e1cc 100644 --- a/betree/src/c_interface.rs +++ b/betree/src/c_interface.rs @@ -961,16 +961,17 @@ pub unsafe extern "C" fn betree_object_write_at( .handle_result(err) } -/* -/// Return the objects size in bytes. +/// Return the objects size in bytes. If the size could not be determined +/// it is assumed the object is zero-sized. #[no_mangle] -pub unsafe extern "C" fn betree_object_status(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { +pub unsafe extern "C" fn betree_object_size(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong { let obj = &(*obj).0; let info = obj.info(); - obj. - obj.size() + info.and_then(|ok_opt| Ok(ok_opt.map(|obj_info| obj_info.size).unwrap_or(0))) + .unwrap_or(0) } +/* /// Returns the last modification timestamp in microseconds since the Unix epoch. #[no_mangle] pub unsafe extern "C" fn betree_object_mtime_us(obj: *const obj_t) -> c_ulong { diff --git a/betree/src/checksum.rs b/betree/src/checksum.rs index fadc9a4a..47b27c19 100644 --- a/betree/src/checksum.rs +++ b/betree/src/checksum.rs @@ -2,12 +2,17 @@ use crate::size::{Size, StaticSize}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{error::Error, fmt, hash::Hasher, iter::once}; +use std::{ + error::Error, + fmt, + hash::{Hash, Hasher}, + iter::once, +}; use twox_hash; /// A checksum to verify data integrity. pub trait Checksum: - Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static + Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static { /// Builds a new `Checksum`. type Builder: Builder; @@ -67,7 +72,7 @@ impl Error for ChecksumError { /// `XxHash` contains a digest of `xxHash` /// which is an "extremely fast non-cryptographic hash algorithm" /// () -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct XxHash(u64); impl StaticSize for XxHash { diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index 7d9032b1..165d86fd 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -5,6 +5,8 @@ use super::{ object_ptr::ObjectPointer, CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference, }; +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; use crate::{ allocator::{Action, SegmentAllocator, SegmentId}, buffer::Buf, @@ -60,6 +62,8 @@ where next_modified_node_id: AtomicU64, next_disk_id: AtomicU64, report_tx: Option>, + #[cfg(feature = "nvm")] + persistent_cache: Option>>>>, } impl Dmu @@ -76,6 +80,9 @@ where alloc_strategy: [[Option; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES], cache: E, handler: Handler>>, + #[cfg(feature = "nvm")] persistent_cache: Option< + PersistentCache>, + >, ) -> Self { let allocation_data = (0..pool.storage_class_count()) .map(|class| { @@ -103,6 +110,8 @@ where next_modified_node_id: AtomicU64::new(1), next_disk_id: AtomicU64::new(0), report_tx: None, + #[cfg(feature = "nvm")] + persistent_cache: persistent_cache.map(|cache| Arc::new(Mutex::new(cache))), } } @@ -226,6 +235,23 @@ where let offset = op.offset(); let generation = op.generation(); + #[cfg(feature = "nvm")] + let compressed_data = { + let mut buf = None; + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut cache = pcache_mtx.lock(); + if let Ok(buffer) = cache.get(offset) { + buf = Some(Buf::from_zero_padded(buffer.to_vec())) + } + } + if let Some(b) = buf { + b + } else { + self.pool + .read(op.size(), op.offset(), op.checksum().clone())? + } + }; + #[cfg(not(feature = "nvm"))] let compressed_data = self .pool .read(op.size(), op.offset(), op.checksum().clone())?; @@ -329,7 +355,30 @@ where let mid = match key { ObjectKey::InWriteback(_) => unreachable!(), - ObjectKey::Unmodified { .. } => return Ok(()), + ObjectKey::Unmodified { + offset, + generation: _, + } => { + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let mut pcache = pcache_mtx.lock(); + // TODO: Specify correct constant instead of magic 🪄 + let mut vec = Vec::with_capacity(4 * 1024 * 1024); + object.value_mut().get_mut().pack(&mut vec)?; + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, &vec, None) + .insert(|maybe_offset, data| { + // TODO: Write eventually not synced data to disk finally. + if let Some(offset) = maybe_offset { + self.pool + .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + } + Ok(()) + })?; + } + return Ok(()); + } ObjectKey::Modified(mid) => mid, }; @@ -341,6 +390,8 @@ where drop(cache); let object = CacheValueRef::write(entry); + // Eviction at this points only writes a singular node as all children + // need to be unmodified beforehand. self.handle_write_back(object, mid, true, pk)?; Ok(()) } @@ -412,7 +463,37 @@ where state.finish() }; - self.pool.begin_write(compressed_data, offset)?; + #[cfg(feature = "nvm")] + let skip_write_back = self.persistent_cache.is_some(); + #[cfg(not(feature = "nvm"))] + let skip_write_back = false; + + if !skip_write_back { + self.pool.begin_write(compressed_data, offset)?; + } else { + let bar = compressed_data.clone(); + #[cfg(feature = "nvm")] + if let Some(ref pcache_mtx) = self.persistent_cache { + let away = Arc::clone(pcache_mtx); + self.pool.begin_foo(offset, move || { + let mut pcache = away.lock(); + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, &compressed_data, None) + .insert(|maybe_offset, data| { + // TODO: Deduplicate this? + // if let Some(offset) = maybe_offset { + // self.pool + // .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + // } + panic!("This should not have happnened in the debug run!"); + Ok(()) + }) + .unwrap(); + })?; + } + self.pool.begin_write(bar, offset)?; + } let obj_ptr = ObjectPointer { offset, @@ -499,11 +580,7 @@ where { warn!( "Storage tier {class} does not have enough space remaining. {} blocks of {}", - self.handler - .free_space_tier(class) - .unwrap() - .free - .as_u64(), + self.handler.free_space_tier(class).unwrap().free.as_u64(), size.as_u64() ); continue; diff --git a/betree/src/data_management/errors.rs b/betree/src/data_management/errors.rs index 4bbb7d34..5a0238f0 100644 --- a/betree/src/data_management/errors.rs +++ b/betree/src/data_management/errors.rs @@ -1,10 +1,12 @@ #![allow(missing_docs, unused_doc_comments)] use crate::{storage_pool::DiskOffset, vdev::Block}; +#[cfg(feature = "nvm")] +use pmem_hashmap::PMapError; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("The storage pool encountered an error.")] + #[error("The storage pool encountered an error. `{source}`")] VdevError { #[from] source: crate::vdev::Error, @@ -33,6 +35,12 @@ pub enum Error { CallbackError, #[error("A raw allocation has failed.")] RawAllocationError { at: DiskOffset, size: Block }, + #[cfg(feature = "nvm")] + #[error("A error occured while accessing the persistent cache. `{source}`")] + PersistentCacheError { + #[from] + source: PMapError, + }, } // To avoid recursive error types here, define a simple translation from diff --git a/betree/src/data_management/object_ptr.rs b/betree/src/data_management/object_ptr.rs index 0dbbd6d1..a394e1bf 100644 --- a/betree/src/data_management/object_ptr.rs +++ b/betree/src/data_management/object_ptr.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)] /// A pointer to an on-disk serialized object. pub struct ObjectPointer { pub(super) decompression_tag: DecompressionTag, diff --git a/betree/src/database/errors.rs b/betree/src/database/errors.rs index 4dcee1ca..5863d97a 100644 --- a/betree/src/database/errors.rs +++ b/betree/src/database/errors.rs @@ -17,7 +17,7 @@ pub enum Error { #[from] source: crate::storage_pool::Error, }, - #[error("A tree operation encountered an error. This is likely an internal error.")] + #[error("A tree operation encountered an error. This is likely an internal error. `{source}`")] TreeError { #[from] source: crate::tree::Error, @@ -56,7 +56,7 @@ pub enum Error { InUse, #[error("Message surpasses the maximum length. If you cannot shrink your value, use an object store instead.")] MessageTooLarge, - #[error("Could not serialize the given data. This is an internal error.")] + #[error("Could not serialize the given data. This is an internal error. `{source}`")] SerializeFailed { #[from] source: serde_json::Error, diff --git a/betree/src/database/mod.rs b/betree/src/database/mod.rs index e8aabaa3..a0954eaf 100644 --- a/betree/src/database/mod.rs +++ b/betree/src/database/mod.rs @@ -21,6 +21,10 @@ use crate::{ vdev::Block, StoragePreference, }; + +#[cfg(feature = "nvm")] +use crate::replication::PersistentCache; + use bincode::{deserialize, serialize_into}; use byteorder::{BigEndian, ByteOrder, LittleEndian}; use crossbeam_channel::Sender; @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration { compression: CompressionConfiguration::None, cache_size: DEFAULT_CACHE_SIZE, #[cfg(feature = "nvm")] - persistent_cache_path: None, + persistent_cache: None, access_mode: AccessMode::OpenIfExists, sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS), metrics: None, @@ -233,6 +237,12 @@ impl DatabaseConfiguration { } } + #[cfg(feature = "nvm")] + let pcache = self.persistent_cache.as_ref().map(|config| { + PersistentCache::create(&config.path, config.bytes) + .unwrap_or_else(|_| PersistentCache::open(&config.path).unwrap()) + }); + Dmu::new( self.compression.to_builder(), XxHashBuilder, @@ -241,6 +251,8 @@ impl DatabaseConfiguration { strategy, ClockCache::new(self.cache_size), handler, + #[cfg(feature = "nvm")] + pcache, ) } diff --git a/betree/src/lib.rs b/betree/src/lib.rs index c25d5c50..3e1d19f8 100644 --- a/betree/src/lib.rs +++ b/betree/src/lib.rs @@ -5,6 +5,8 @@ // code as the alternative would be allowing each single method only provided on // a maybe needed in the future basis. #![allow(dead_code)] +#![feature(allocator_api)] +#![feature(btreemap_alloc)] #[macro_use] extern crate error_chain; diff --git a/betree/src/replication/lru.rs b/betree/src/replication/lru.rs index 6fd217ee..ea4a8aa1 100644 --- a/betree/src/replication/lru.rs +++ b/betree/src/replication/lru.rs @@ -1,110 +1,84 @@ -use pmem_hashmap::{PMap, PMapError}; +use super::{Persistent, PREFIX_LRU}; +use pmem_hashmap::{allocator::PalPtr, PMap, PMapError}; +use std::{marker::PhantomData, mem::size_of, ptr::NonNull}; -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -/// Key pointing to an LRU node in a persistent cache. -pub struct LruKey(u64); - -impl LruKey { - /// Fetch and cast a pmem pointer to a [PlruNode]. - /// - /// Safety - /// ====== - /// The internals of this method are highly unsafe. Concurrent usage and - /// removal are urgently discouraged. - fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> { - map.get(self.key()).and_then(|node_raw| unsafe { - Ok(std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( - node_raw.try_into().unwrap(), - )) - }) - } - - /// Wrap a hashed value in an [LruKey]. - pub fn from(hash: u64) -> Self { - LruKey(hash) - } - - fn key(&self) -> [u8; size_of::() + 1] { - let mut key = [0; size_of::() + 1]; - key[0] = PREFIX_LRU; - key[1..].copy_from_slice(&self.0.to_le_bytes()); - key - } +/// Fetch and cast a pmem pointer to a [PlruNode]. +/// +/// Safety +/// ====== +/// The internals of this method are highly unsafe. Concurrent usage and +/// removal are urgently discouraged. +fn fetch(ptr: &PalPtr) -> Result<&mut PlruNode, PMapError> { + Ok(unsafe { + std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( + core::slice::from_raw_parts_mut(ptr.load() as *mut u8, PLRU_NODE_SIZE) + .try_into() + .unwrap(), + ) + }) } /// Persistent LRU #[repr(C)] -pub struct Plru { - head: Option, - tail: Option, +pub struct Plru { + head: Option, + + tail: Option, // in Blocks? Return evicted element when full capacity: u64, count: u64, size: u64, + // Satisfy the rust compiler by adding a zero sized phantom here + // Also, avoid the drop checker by using a raw ptr + key_type: PhantomData<*const T>, } -impl Plru { - pub fn create(map: &mut PMap, capacity: u64) -> Result, PMapError> { - let this = Self { - head: None, - tail: None, - capacity, - size: 0, - count: 0, - }; - map.insert([super::PREFIX_LRU_ROOT], unsafe { - &std::mem::transmute::<_, [u8; size_of::()]>(this) - })?; - Self::open(map) - } +// hack ⛏ +const PLRU_ROOT_LENGTH_READ_COMMENT: usize = size_of::>(); - pub fn open(map: &mut PMap) -> Result, PMapError> { - // Fetch from map, check if valid, transmute to type, and reaffirm - // non-null-ness. The unchecked transition is always correct as we unpack an already ensured - unsafe { - Ok(Persistent(NonNull::new_unchecked(std::mem::transmute( - map.get([super::PREFIX_LRU_ROOT])?.as_mut_ptr(), - )))) - } +impl Plru { + pub fn init(&mut self, capacity: u64) { + self.head = None; + self.tail = None; + self.capacity = capacity; + self.size = 0; + self.count = 0; + self.key_type = PhantomData::default(); } - pub fn touch(&mut self, map: &mut PMap, node_ptr: LruKey) -> Result<(), PMapError> { - if self.head == Some(node_ptr) { + pub fn touch(&mut self, node_ptr: &PalPtr) -> Result<(), PMapError> { + if self.head.as_ref() == Some(node_ptr) { return Ok(()); } - let node = node_ptr.fetch(map).unwrap(); - self.cut_node_and_stitch(map, node)?; + self.cut_node_and_stitch(node_ptr)?; // Fixate new head - let old_head_ptr = self.head.expect("Invalid State"); - let old_head = old_head_ptr.fetch(map).unwrap(); - old_head.fwd = Some(node_ptr); - node.back = self.head; - self.head = Some(node_ptr); + let old_head_ptr = self.head.as_ref().expect("Invalid State"); + let old_head: &mut PlruNode = fetch(old_head_ptr).unwrap(); + old_head.fwd = Some(node_ptr.clone()); + let node: &mut PlruNode = fetch(node_ptr).unwrap(); + node.back = self.head.clone(); + self.head = Some(node_ptr.clone()); Ok(()) } /// Add a new entry into the LRU. Will fail if already present. - pub fn insert(&mut self, map: &mut PMap, node_ptr: LruKey, size: u64) -> Result<(), PMapError> { - let node = PlruNode { - fwd: None, - back: self.head, - size, - data: node_ptr.0, - }; + pub fn insert(&mut self, node_ptr: PalPtr, size: u64, baggage: T) -> Result<(), PMapError> { + let new_node = fetch(&node_ptr).unwrap(); + new_node.fwd = None; + new_node.back = self.head.clone(); + new_node.size = size; + new_node.key = baggage; - map.insert(node_ptr.key(), &unsafe { - std::mem::transmute::<_, [u8; PLRU_NODE_SIZE]>(node) - })?; - if let Some(head_ptr) = self.head { - let head = head_ptr.fetch(map)?; - head.fwd = Some(node_ptr); + if let Some(ref mut head_ptr) = self.head.as_mut() { + let head: &mut PlruNode = fetch(head_ptr).unwrap(); + head.fwd = Some(node_ptr.clone()); self.head = Some(node_ptr); } else { // no head existed yet -> newly initialized list - self.head = Some(node_ptr); + self.head = Some(node_ptr.clone()); self.tail = Some(node_ptr); } self.size += size; @@ -114,32 +88,29 @@ impl Plru { /// Checks whether an eviction is necessary and which entry to evict. /// This call does not perform the removal itself. - pub fn evict(&mut self) -> Option { - if let (Some(tail), true) = (self.tail, self.size > self.capacity) { - return Some(DataKey(tail.0)); + pub fn evict(&self, size: u64) -> Result, PMapError> { + if let (Some(ref tail), true) = (self.tail.as_ref(), self.size + size > self.capacity) { + let node = fetch(tail).unwrap(); + return Ok(Some((node.hash, node.key))); } - None + Ok(None) } - fn cut_node_and_stitch( - &mut self, - map: &mut PMap, - node: &mut PlruNode, - ) -> Result<(), PMapError> { - let node_ptr = LruKey(node.data); - if let Some(forward_ptr) = node.fwd { - let forward = forward_ptr.fetch(map)?; - forward.back = node.back; + fn cut_node_and_stitch(&mut self, node_ptr: &PalPtr) -> Result<(), PMapError> { + let node: &mut PlruNode = fetch(node_ptr).unwrap(); + if let Some(ref mut forward_ptr) = node.fwd.as_mut() { + let forward: &mut PlruNode = fetch(forward_ptr).unwrap(); + forward.back = node.back.clone(); } - if self.tail == Some(node_ptr) { - self.tail = node.fwd; + if self.tail.as_ref() == Some(node_ptr) { + self.tail = node.fwd.clone(); } - if let Some(back_ptr) = node.back { - let back = back_ptr.fetch(map)?; - back.fwd = node.fwd; + if let Some(ref mut back_ptr) = node.back.as_mut() { + let back: &mut PlruNode = fetch(back_ptr).unwrap(); + back.fwd = node.fwd.clone(); } - if self.head == Some(node_ptr) { - self.head = node.back; + if self.head.as_ref() == Some(node_ptr) { + self.head = node.back.clone(); } node.fwd = None; node.back = None; @@ -148,23 +119,15 @@ impl Plru { } /// Remove a node from cache and deallocate. - pub fn remove(&mut self, map: &mut PMap, node_ptr: LruKey) -> Result<(), PMapError> { - let node = node_ptr.fetch(map)?; - self.cut_node_and_stitch(map, node)?; - let size = node.size; - map.remove(node_ptr.key())?; - self.size -= size; + pub fn remove(&mut self, node_ptr: &mut PalPtr) -> Result<(), PMapError> { + self.cut_node_and_stitch(node_ptr)?; + let node: &mut PlruNode = fetch(node_ptr).unwrap(); + self.size -= node.size; self.count -= 1; Ok(()) } } -use std::{mem::size_of, ptr::NonNull}; - -use super::{DataKey, Persistent, PREFIX_LRU}; - -const PLRU_NODE_SIZE: usize = size_of::(); - /// Ephemeral Wrapper around a byte array for sane access code. /// /// Structure @@ -175,17 +138,44 @@ const PLRU_NODE_SIZE: usize = size_of::(); /// .. │ ├────> /// └───┘ back /// +/// Size Constraint +/// =============== +/// This structure allows for a generic member which is to be returned when +/// evictions are happening. The size available to the entire object is 256 +/// bytes of which the custom type can occupy at most 208 bytes. +/// /// Safety /// ====== /// Using this wrapper requires transmuting the underlying byte array, which /// invalidates, when used on references, all borrow checker guarantees. Use /// with extrem caution, and be sure what you are doing. #[repr(C)] -pub struct PlruNode { - fwd: Option, - back: Option, +pub struct PlruNode { + fwd: Option, + back: Option, size: u64, - data: u64, + hash: u64, + key: T, +} +pub(super) const PLRU_NODE_SIZE: usize = 256; + +impl PlruNode { + const SIZE_CONSTRAINT: () = assert!( + std::mem::size_of::>() < PLRU_NODE_SIZE, + "Size of attached data to LRU entry surpasses size constraint." + ); + + pub fn new(fwd: Option, back: Option, size: u64, hash: u64, key: T) -> Self { + // has to remain to ensure that the code path is evaluated by rustc + let _ = Self::SIZE_CONSTRAINT; + Self { + fwd, + back, + size, + hash, + key, + } + } } #[cfg(test)] @@ -229,7 +219,7 @@ mod tests { fn new() { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let _ = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + let _ = Plru::<()>::create(&mut pmap, 32 * 1024 * 1024).unwrap(); } #[test] @@ -237,11 +227,11 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); + lru.insert(&mut pmap, LruKey(100), 321, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(100))); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); + lru.insert(&mut pmap, LruKey(101), 322, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(101))); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + lru.insert(&mut pmap, LruKey(102), 323, ()).unwrap(); assert_eq!(lru.head, Some(LruKey(102))); } @@ -250,9 +240,9 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 321).unwrap(); - lru.insert(&mut pmap, LruKey(101), 322).unwrap(); - lru.insert(&mut pmap, LruKey(102), 323).unwrap(); + lru.insert(&mut pmap, LruKey(100), 321, ()).unwrap(); + lru.insert(&mut pmap, LruKey(101), 322, ()).unwrap(); + lru.insert(&mut pmap, LruKey(102), 323, ()).unwrap(); } #[test] @@ -260,21 +250,33 @@ mod tests { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) + .unwrap(); + lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024, ()) .unwrap(); - lru.insert(&mut pmap, LruKey(101), 15 * 1024 * 1024) + lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024, ()) .unwrap(); - lru.insert(&mut pmap, LruKey(102), 8 * 1024 * 1024).unwrap(); assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), Some(LruKey(100)) ); lru.remove(&mut pmap, LruKey(100)).unwrap(); - lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024).unwrap(); - assert_eq!(lru.evict().and_then(|opt| Some(LruKey(opt.0))), None); - lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 1 * 1024 * 1024, ()) + .unwrap(); + assert_eq!( + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), + None + ); + lru.insert(&mut pmap, LruKey(103), 9 * 1024 * 1024, ()) + .unwrap(); assert_eq!( - lru.evict().and_then(|opt| Some(LruKey(opt.0))), + lru.evict(&mut pmap, 0) + .unwrap() + .and_then(|opt| Some(LruKey(opt.0 .0))), Some(LruKey(101)) ); } @@ -283,11 +285,28 @@ mod tests { fn remove() { let file = TestFile::new(); let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); - let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); - lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024) + let mut lru: Persistent> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) .unwrap(); lru.remove(&mut pmap, LruKey(100)).unwrap(); assert_eq!(lru.head, None); assert_eq!(lru.tail, None); } + + #[test] + fn reinit() { + let file = TestFile::new(); + { + let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap(); + let mut lru: Persistent> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap(); + lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024, ()) + .unwrap(); + } + { + let mut pmap = PMap::open(file.path()).unwrap(); + let lru: Persistent> = Plru::open(&mut pmap).unwrap(); + assert_eq!(lru.head, Some(LruKey(100))); + assert_eq!(lru.tail, Some(LruKey(100))); + } + } } diff --git a/betree/src/replication/mod.rs b/betree/src/replication/mod.rs index 019e86de..019272c0 100644 --- a/betree/src/replication/mod.rs +++ b/betree/src/replication/mod.rs @@ -10,17 +10,6 @@ //! 3.3 SLOW //! 3.4 SLOWEST //! -//! -//! ``` -//! 1 -//! / \ -//! 1 1 -//! / \ / \ -//! 1 2 1 2 -//! /|\ | /|\ |\ -//! 321 2 313 22 -//! ``` -//! //! Map Keys //! ======== //! @@ -32,21 +21,25 @@ const PREFIX_KV: u8 = 0; const PREFIX_LRU: u8 = 1; const PREFIX_LRU_ROOT: u8 = 2; +use pmem_hashmap::{ + allocator::{Pal, PalPtr}, + PMap, PMapError, +}; use std::{ - hash::Hash, + collections::BTreeMap, + hash::{Hash, Hasher}, + marker::PhantomData, + mem::size_of, ops::{Deref, DerefMut}, path::PathBuf, ptr::NonNull, }; - -use pmem_hashmap::{PMap, PMapError}; +use twox_hash::XxHash64; mod lru; use lru::Plru; use serde::{Deserialize, Serialize}; -use self::lru::LruKey; - /// A pointer to a region in persistent memory. pub struct Persistent(NonNull); // Pointer to persistent memory can be assumed to be non-thread-local @@ -64,35 +57,25 @@ impl DerefMut for Persistent { } } -/// Internally used key to access data store. -pub struct DataKey(u64); - -impl DataKey { - /// Create an internal store from an external key. - pub fn from(key: K, pmap: &mut PMap) -> Self { - DataKey(pmap.hash(key)) - } - - /// Expose as byte range for FFI use. - pub fn key(&self) -> [u8; 9] { - let mut key = [0; 9]; - key[0] = PREFIX_KV; - key[1..].copy_from_slice(&self.0.to_le_bytes()); - key - } +/// Persistent byte array cache. Optimized for read performance, avoid frequent +/// updates. +pub struct PersistentCache { + pal: Pal, + root: Persistent>, + // Fix key types + key_type: PhantomData, +} - /// Convert to an internally used representation for LRU entries. - pub fn to_lru_key(&self) -> LruKey { - LruKey::from(self.0) - } +pub struct PCacheRoot { + map: BTreeMap, + lru: Plru, } -/// Persistent byte array cache. Optimized for read performance, avoid frequent -/// updates. -pub struct PersistentCache { - pmap: PMap, - // Persistent Pointer - lru: Persistent, +#[derive(Debug, Clone)] +pub struct PCacheMapEntry { + size: usize, + lru_node: PalPtr, + data: PalPtr, } /// Configuration for a persistent cache. @@ -104,51 +87,136 @@ pub struct PersistentCacheConfig { pub bytes: usize, } -impl PersistentCache { +/// Ephemeral struct created in the preparation for an insertion. +pub struct PersistentCacheInsertion<'a, K, T> { + cache: &'a mut PersistentCache, + key: u64, + value: &'a [u8], + baggage: T, +} + +impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { + /// Performs an execution and calls the given function on each evicted entry + /// from the store. On error the entire insertion is aborted and has to be + /// initiated anew. + pub fn insert(self, f: F) -> Result<(), PMapError> + where + F: Fn(&T, &[u8]) -> Result<(), crate::vdev::Error>, + { + while let Ok(Some((key, baggage))) = self.cache.root.lru.evict(self.value.len() as u64) { + // let data = self.cache.pmap.get(key.key())?; + let entry = self.cache.root.map.get(&key).unwrap(); + let data = + unsafe { core::slice::from_raw_parts(entry.data.load() as *const u8, entry.size) }; + if f(baggage, data).is_err() { + return Err(PMapError::ExternalError("Writeback failed".into())); + } + // Finally actually remove the entries + let mut entry = self.cache.root.map.remove(&key).unwrap(); + entry.data.free(); + self.cache.root.lru.remove(&mut entry.lru_node)?; + entry.lru_node.free(); + } + let lru_ptr = self.cache.pal.allocate(lru::PLRU_NODE_SIZE).unwrap(); + let data_ptr = self.cache.pal.allocate(self.value.len()).unwrap(); + data_ptr.copy_from(self.value, &self.cache.pal); + self.cache + .root + .lru + .insert(lru_ptr, self.value.len() as u64, self.baggage)?; + + Ok(()) + } +} + +impl PersistentCache { /// Open an existent [PersistentCache]. Fails if no cache exist or invalid. pub fn open>(path: P) -> Result { - let mut pmap = PMap::open(path.into())?; - Ok(Self { - lru: Plru::open(&mut pmap)?, - pmap, - }) + let pal = Pal::open(path.into()).unwrap(); + let root = pal.root(size_of::>()).unwrap(); + assert!(!root.load().is_null()); + if let Some(root) = NonNull::new(root.load() as *mut PCacheRoot) { + let root = Persistent(root); + Ok(Self { + pal, + root, + key_type: PhantomData::default(), + }) + } else { + Err(PMapError::DoesNotExist) + } } /// Create a new [PersistentCache] in the specified path. Fails if underlying resources are not valid. pub fn create>(path: P, size: usize) -> Result { - let mut pmap = PMap::create(path.into(), size)?; - Ok(Self { - lru: Plru::create(&mut pmap, size as u64)?, - pmap, - }) + let mut pal = Pal::create(path.into(), size, 0o666).unwrap(); + let root = pal.root(size_of::>()).unwrap(); + assert!(!root.load().is_null()); + if let Some(root) = NonNull::new(root.load() as *mut PCacheRoot) { + let mut root = Persistent(root); + root.lru.init(size as u64); + root.map = BTreeMap::new_in(pal.clone()); + Ok(Self { + pal, + root, + key_type: PhantomData::default(), + }) + } else { + Err(PMapError::DoesNotExist) + } + // Ok(Self { + // lru: Plru::create(&mut pmap, size as u64)?, + // pmap, + // key_type: PhantomData::default(), + // }) } /// Fetch an entry from the hashmap. - pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { - let k = DataKey::from(key, &mut self.pmap); - self.lru.touch(&mut self.pmap, k.to_lru_key())?; - Ok(self.pmap.get(k.key())?) + pub fn get(&mut self, key: K) -> Result<&[u8], PMapError> { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + let res = self.root.map.get(&hash).cloned(); + if let Some(entry) = res { + self.root.lru.touch(&entry.lru_node)?; + Ok(unsafe { core::slice::from_raw_parts(entry.data.load() as *const u8, entry.size) }) + } else { + Err(PMapError::DoesNotExist) + } } - /// Insert an entry and remove infrequent values. - pub fn insert(&mut self, key: K, value: &[u8]) -> Result<(), PMapError> { - // TODO: Update old values? Convenience - let k = DataKey::from(key, &mut self.pmap); - self.pmap.insert(k.key(), value)?; - self.lru - .insert(&mut self.pmap, k.to_lru_key(), value.len() as u64)?; - while let Some(id) = self.lru.evict() { - self.pmap.remove(id.key())?; - self.lru.remove(&mut self.pmap, id.to_lru_key())?; + /// Start an insertion. An insertion can only be successfully completed if values are properly evicted from the cache + pub fn prepare_insert<'a>( + &'a mut self, + key: K, + value: &'a [u8], + baggage: T, + ) -> PersistentCacheInsertion<'a, K, T> { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + PersistentCacheInsertion { + cache: self, + key: hash, + value, + baggage, } - Ok(()) } /// Remove an entry. - pub fn remove(&mut self, key: K) -> Result<(), PMapError> { - let k = DataKey::from(key, &mut self.pmap); - self.pmap.remove(k.key())?; - self.lru.remove(&mut self.pmap, k.to_lru_key())?; - Ok(()) + pub fn remove(&mut self, key: K) -> Result<(), PMapError> { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + if let Some(mut entry) = self.root.map.remove(&hash) { + self.root.lru.remove(&mut entry.lru_node).unwrap(); + entry.lru_node.free(); + entry.data.free(); + Ok(()) + } else { + Err(PMapError::DoesNotExist) + } + // self.root.lru.remove(&mut self.pmap, k.to_lru_key())?; + // Ok(()) } } diff --git a/betree/src/storage_pool/mod.rs b/betree/src/storage_pool/mod.rs index 66bbe0f6..581a4b94 100644 --- a/betree/src/storage_pool/mod.rs +++ b/betree/src/storage_pool/mod.rs @@ -58,6 +58,10 @@ pub trait StoragePoolLayer: Clone + Send + Sync + 'static { /// Issues a write request that might happen in the background. fn begin_write(&self, data: Buf, offset: DiskOffset) -> VdevResult<()>; + fn begin_foo(&self, offset: DiskOffset, f: F) -> VdevResult<()> + where + F: FnOnce() + Send + 'static; + /// Writes the given `data` at `offset` for every `LeafVdev`. fn write_raw(&self, data: Buf, offset: Block) -> VdevResult<()>; diff --git a/betree/src/storage_pool/unit.rs b/betree/src/storage_pool/unit.rs index 13e7373e..99272f24 100644 --- a/betree/src/storage_pool/unit.rs +++ b/betree/src/storage_pool/unit.rs @@ -166,8 +166,6 @@ impl StoragePoolLayer for StoragePoolUnit { .write(data, offset.block_offset()) .await; - // TODO: what about multiple writes to same offset? - // NOTE: This is currently covered in the tests and fails as expected inner.write_back_queue.mark_completed(&offset).await; res })?; @@ -183,6 +181,25 @@ impl StoragePoolLayer for StoragePoolUnit { ret } + fn begin_foo(&self, offset: DiskOffset, f: F) -> vdev::Result<()> + where + F: FnOnce() + Send + 'static, + { + let (enqueue_done, wait_for_enqueue) = futures::channel::oneshot::channel(); + let write = self.inner.pool.spawn_with_handle(async move { + wait_for_enqueue.await.unwrap(); + f(); + Ok(()) + })?; + + let ret = self.inner.write_back_queue.enqueue(offset, Box::pin(write)); + + enqueue_done + .send(()) + .expect("Couldn't unlock enqueued write task"); + ret + } + fn write_raw(&self, data: Buf, offset: Block) -> Result<(), VdevError> { let vec = self .inner diff --git a/betree/src/tree/errors.rs b/betree/src/tree/errors.rs index 79cc9541..f2859350 100644 --- a/betree/src/tree/errors.rs +++ b/betree/src/tree/errors.rs @@ -3,7 +3,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Storage operation could not be performed")] + #[error("Storage operation could not be performed. `{source}`")] DmuError { #[from] source: crate::data_management::Error, diff --git a/docs/src/rfc/2-persistent-cache-replication.md b/docs/src/rfc/2-persistent-cache-replication.md new file mode 100644 index 00000000..a8a62b6f --- /dev/null +++ b/docs/src/rfc/2-persistent-cache-replication.md @@ -0,0 +1,51 @@ +- Title: Persistent Cache / Data Replication +- Status: DRAFT + +# Description + +A persistent cache is a way to utilize a storage device capabilities without +integrating it further into the existing storage hierarchy by keeping it +essentially in the memory layer semantically. For this we present two +approaches, either may be used as they have different drawbacks and advantages. + +## Fallback Cache + +Once entries are evicted from the volatile cache we remove them by eviction and +allocate resources on the underlying storage media. To prolong this process we +skip this part and write them first to the non-volatile cache and eventually +when they are evicted from there they actually get written to disk. Problems +with this approach are mostly the actual utilization (read + write traffic) +which is not optimal for some storage devices (*cough* PMem). + +Copy-on-Write is another issue as we may need to ensure writes to the storage +device when modifying the entry, but this can be taken care of in the DMU. + +Regardless, it is easy to implement and should remain an option we try. + +## Read-heavy-cache + +Another approach which promises overall a better specific device utilization is +the focus on *read* heavy data which seldomly gets modified. Most of the issues +mentioned above are offloaded are resolved by this approach by it does depend on +a *decision maker* which moves data from storage or cache to the device. Most +issues in this approach are produced by this decision, with the actual result +being completely depending on this. An idea is to use the migration policies as +done with inter-tier data migration, but for this additional classification is +required. Possible workflow node independent classification schemes are still in +need to be constructed for this, which is partially an open topic for research. + +# Purpose + +This RFC tries to avoid to integrate stuff like PMem into the storage hierarchy +as non-latency bound workflows do not really gain an advantage with them, rather +a separate stack is build in the cache level to reduce latency on accesses there +(if the data fits into storage). + +# Drawbacks + + + +# Alternatives + +> Find atleast one alternative, this thought process helps preventing being +> stuck in certain implemenation details diff --git a/fio-haura/src/fio-engine-haura.c b/fio-haura/src/fio-engine-haura.c index 5aa16a6e..678186f3 100644 --- a/fio-haura/src/fio-engine-haura.c +++ b/fio-haura/src/fio-engine-haura.c @@ -304,11 +304,14 @@ static int fio_haura_setup(struct thread_data *td) { /* Haura needs some additional space to provide extra data like object * pointers and metadata. This is more of a hack, but nonetheless. */ creat(td->files[idx]->file_name, 0644); - if (truncate(td->files[idx]->file_name, td->o.size + (50 * 1024 * 1024))) { + if (truncate(td->files[idx]->file_name, td->o.file_size_high)) { fprintf( stderr, "Could not retruncate file to provide enough storage for Haura.\n"); } + // Set the already allocated size so that fio avoids the internal initial + // layouting + td->files[idx]->real_file_size = td->o.file_size_high; } td->io_ops_data = malloc(sizeof(size_t)); @@ -324,7 +327,7 @@ static int fio_haura_setup(struct thread_data *td) { return bail(error); } fio_haura_translate(td, cfg); - if ((global_data.db = betree_create_db(cfg, &error)) == NULL) { + if ((global_data.db = betree_open_or_create_db(cfg, &error)) == NULL) { return bail(error); } if ((global_data.obj_s = betree_create_object_store( @@ -355,6 +358,11 @@ static int fio_haura_setup(struct thread_data *td) { ((u_int32_t *)buf)[off] = random(); } while (max_io_size > total_written) { + if (betree_object_size(global_data.objs[idx], &error) >= + max_io_size) { + printf("haura: skipping prepopulation of object %lu", idx + 1); + break; + } unsigned long written = 0; betree_object_write_at(global_data.objs[idx], buf, block_size,