From 4d5ea597831ffc7bbccf42d4464950bd354b6aaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCnsche?= Date: Thu, 13 Jul 2023 09:45:17 +0200 Subject: [PATCH] tmp commit 3 --- betree/pmem-hashmap/src/allocator.rs | 3 +- betree/src/buffer.rs | 46 ++++++++++++++++++++++- betree/src/data_management/dmu.rs | 53 ++++++++++++++++---------- betree/src/replication/lru.rs | 4 +- betree/src/replication/lru_worker.rs | 8 ++-- betree/src/replication/mod.rs | 56 ++++++++++++++++++++++++---- fio-haura/src/fio-engine-haura.c | 7 +++- 7 files changed, 140 insertions(+), 37 deletions(-) diff --git a/betree/pmem-hashmap/src/allocator.rs b/betree/pmem-hashmap/src/allocator.rs index 8a1baad8..4ddc1fb6 100644 --- a/betree/pmem-hashmap/src/allocator.rs +++ b/betree/pmem-hashmap/src/allocator.rs @@ -175,7 +175,8 @@ impl Pal { unsafe { pmemobj_close(self.pool.as_ptr()) }; } - /// Allocate an area of size in the persistent memory. + /// Allocate an area of size in the persistent memory. Allocations are + /// always guaranteed to be cache line aligned for Optane PMem (64 bytes). pub fn allocate(&self, size: usize) -> Result { let mut oid = std::mem::MaybeUninit::::uninit(); if unsafe { diff --git a/betree/src/buffer.rs b/betree/src/buffer.rs index 5a896593..45de3fb3 100644 --- a/betree/src/buffer.rs +++ b/betree/src/buffer.rs @@ -56,6 +56,8 @@ fn split_range_at( struct AlignedStorage { ptr: *mut u8, capacity: Block, + // avoid deallocation + is_persistent: bool, } impl Default for AlignedStorage { @@ -63,6 +65,7 @@ impl Default for AlignedStorage { AlignedStorage { ptr: ptr::null_mut(), capacity: Block(0), + is_persistent: false, } } } @@ -127,7 +130,7 @@ impl AlignedStorage { impl Drop for AlignedStorage { fn drop(&mut self) { - if !self.ptr.is_null() { + if !self.ptr.is_null() && !self.is_persistent { unsafe { let layout = Layout::from_size_align_unchecked( self.capacity.to_bytes() as usize, @@ -143,11 +146,12 @@ impl From> for AlignedStorage { fn from(b: Box<[u8]>) -> Self { // It can be useful to re-enable this line to easily locate places where unnecessary // copying takes place, but it's not suited to stay enabled unconditionally. - // assert!(is_aligned(&b)); + assert!(is_aligned(&b)); if is_aligned(&b) { AlignedStorage { capacity: Block::from_bytes(b.len() as u32), ptr: unsafe { (*Box::into_raw(b)).as_mut_ptr() }, + is_persistent: false, } } else { assert!( @@ -411,6 +415,44 @@ impl Buf { }, ) } + + /// Display the entire underlying buffer without respect to the semantic + /// size of the data contained. + pub fn as_slice_with_padding(&self) -> &[u8] { + unsafe { + let buf = &*self.buf.buf.get(); + slice::from_raw_parts(buf.ptr, buf.capacity.to_bytes() as usize) + } + } +} + +#[cfg(feature = "nvm")] +use pmem_hashmap::allocator::PalPtr; + +#[cfg(feature = "nvm")] +impl Buf { + /// Special case transformation from a pointer to persistent memory. Always + /// assumed to be length-aligned to [BLOCK_SIZE]. + /// + /// The alignment works differently and is not as freely configurable as can + /// be done with "normal" rust allocations. Therefore, only the length of + /// the ptr is checked *not* the actual location. These are guaranteed to + /// lie at 64 byte cache line boundaries. + pub fn from_persistent_ptr(ptr: PalPtr, size: u32) -> Self { + assert_eq!(size as usize % BLOCK_SIZE, 0); + let padded_size = Block::round_up_from_bytes(size); + let aligned_buf = AlignedBuf { + buf: Arc::new(UnsafeCell::new(AlignedStorage { + ptr: ptr.load() as *mut u8, + capacity: padded_size, + is_persistent: true, + })), + }; + Self { + range: aligned_buf.full_range(), + buf: aligned_buf, + } + } } impl MutBuf { diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index ff6348c4..b7176698 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -9,7 +9,7 @@ use super::{ use crate::replication::PersistentCache; use crate::{ allocator::{Action, SegmentAllocator, SegmentId}, - buffer::Buf, + buffer::{Buf, BufWrite}, cache::{Cache, ChangeKeyError, RemoveError}, checksum::{Builder, Checksum, State}, compression::CompressionBuilder, @@ -240,8 +240,9 @@ where let mut buf = None; if let Some(ref pcache_mtx) = self.persistent_cache { let mut cache = pcache_mtx.read(); - if let Ok(buffer) = cache.get(offset) { - buf = Some(Buf::from_zero_padded(buffer.to_vec())) + if let Ok(buffer) = cache.get_buf(offset) { + // buf = Some(Buf::from_zero_padded(buffer.to_vec())) + buf = Some(buffer) } } if let Some(b) = buf { @@ -359,23 +360,34 @@ where offset, generation: _, } => { + // If data is unmodified still move to pcache, as it might be worth saving (if not already contained) #[cfg(feature = "nvm")] if let Some(ref pcache_mtx) = self.persistent_cache { - let mut pcache = pcache_mtx.write(); - // TODO: Specify correct constant instead of magic 🪄 - let mut vec = Vec::with_capacity(4 * 1024 * 1024); - object.value_mut().get_mut().pack(&mut vec)?; - let _ = pcache.remove(offset); - pcache - .prepare_insert(offset, &vec, None) - .insert(|maybe_offset, data| { - // TODO: Write eventually not synced data to disk finally. - if let Some(offset) = maybe_offset { - self.pool - .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; - } - Ok(()) - })?; + { + // Encapsulate concurrent read access + let pcache = pcache_mtx.read(); + if pcache.get(offset).is_ok() { + return Ok(()); + } + } + + warn!("Entry would need to be written to persistent cache but procedure unimplemented!"); + + // TODO: Compress and write-out entry + // let mut pcache = pcache_mtx.write(); + // let mut buf = BufWrite::with_capacity(Block(1)); + // object.value_mut().get_mut().pack(&mut buf)?; + // let _ = pcache.remove(offset); + // pcache + // .prepare_insert(offset, &vec, None) + // .insert(|maybe_offset, data| { + // // TODO: Write eventually not synced data to disk finally. + // if let Some(offset) = maybe_offset { + // self.pool + // .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; + // } + // Ok(()) + // })?; } return Ok(()); } @@ -471,6 +483,7 @@ where if !skip_write_back { self.pool.begin_write(compressed_data, offset)?; } else { + // Cheap copy due to rc let bar = compressed_data.clone(); #[cfg(feature = "nvm")] if let Some(ref pcache_mtx) = self.persistent_cache { @@ -479,7 +492,7 @@ where let mut pcache = away.write(); let _ = pcache.remove(offset); pcache - .prepare_insert(offset, &compressed_data, None) + .prepare_insert(offset, bar, None) .insert(|maybe_offset, data| { // TODO: Deduplicate this? // if let Some(offset) = maybe_offset { @@ -492,7 +505,7 @@ where .unwrap(); })?; } - self.pool.begin_write(bar, offset)?; + self.pool.begin_write(compressed_data, offset)?; } let obj_ptr = ObjectPointer { diff --git a/betree/src/replication/lru.rs b/betree/src/replication/lru.rs index 639056ba..dc019dc0 100644 --- a/betree/src/replication/lru.rs +++ b/betree/src/replication/lru.rs @@ -359,9 +359,10 @@ mod tests { } let plru = unsafe { plru.as_mut().unwrap() }; assert_eq!(plru.count, 3); + pal.close(); } { - let pal = Pal::open(file.path()).unwrap(); + let mut pal = Pal::open(file.path()).unwrap(); let root = pal.root(size_of::>()).unwrap(); let plru = root.load() as *mut Plru<()>; let plru = unsafe { plru.as_mut().unwrap() }; @@ -373,6 +374,7 @@ mod tests { plru.remove(ptr); } assert_eq!(plru.count, 0); + pal.close(); } } } diff --git a/betree/src/replication/lru_worker.rs b/betree/src/replication/lru_worker.rs index 26102f23..6152a94e 100644 --- a/betree/src/replication/lru_worker.rs +++ b/betree/src/replication/lru_worker.rs @@ -7,6 +7,7 @@ pub enum Msg { Touch(PalPtr), Remove(PalPtr), Insert(PalPtr, u64, u64, T), + Close, } pub fn main(rx: Receiver>, mut root: Persistent>) { @@ -15,17 +16,18 @@ pub fn main(rx: Receiver>, mut root: Persistent>) { match msg { Msg::Touch(ptr) => { let mut lru = root.lru.write(); - lru.touch(&ptr); + let _ = lru.touch(&ptr); } Msg::Remove(mut ptr) => { let mut lru = root.lru.write(); - lru.remove(&ptr); + let _ = lru.remove(&ptr); ptr.free(); } Msg::Insert(ptr, hash, size, baggage) => { let mut lru = root.lru.write(); - lru.insert(ptr, hash, size, baggage); + let _ = lru.insert(ptr, hash, size, baggage); } + Msg::Close => break, } } } diff --git a/betree/src/replication/mod.rs b/betree/src/replication/mod.rs index 058f02ba..50bbcd4b 100644 --- a/betree/src/replication/mod.rs +++ b/betree/src/replication/mod.rs @@ -35,14 +35,18 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, ptr::NonNull, + thread::JoinHandle, }; use twox_hash::XxHash64; +use zstd_safe::WriteBuf; mod lru; mod lru_worker; use lru::Plru; use serde::{Deserialize, Serialize}; +use crate::buffer::Buf; + /// A pointer to a region in persistent memory. pub struct Persistent(NonNull); // Pointer to persistent memory can be assumed to be non-thread-local @@ -75,10 +79,25 @@ pub struct PersistentCache { pal: Pal, root: Persistent>, tx: Sender>, + hndl: Option>, // Fix key types key_type: PhantomData, } +impl Drop for PersistentCache { + fn drop(&mut self) { + // Spin while the queue needs to be processed + self.tx + .send(lru_worker::Msg::Close) + .expect("Thread panicked."); + self.hndl + .take() + .expect("Thread handle has been empty?") + .join(); + self.pal.close(); + } +} + pub struct PCacheRoot { map: BTreeMap, lru: RwLock>, @@ -104,7 +123,7 @@ pub struct PersistentCacheConfig { pub struct PersistentCacheInsertion<'a, K, T> { cache: &'a mut PersistentCache, key: u64, - value: &'a [u8], + value: Buf, baggage: T, } @@ -161,12 +180,13 @@ impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { // // entry.lru_node.free(); // } let lru_ptr = self.cache.pal.allocate(lru::PLRU_NODE_SIZE).unwrap(); - let data_ptr = self.cache.pal.allocate(self.value.len()).unwrap(); - data_ptr.copy_from(self.value, &self.cache.pal); + let data = self.value.as_slice_with_padding(); + let data_ptr = self.cache.pal.allocate(data.len()).unwrap(); + data_ptr.copy_from(data, &self.cache.pal); self.cache.tx.send(lru_worker::Msg::Insert( lru_ptr.clone(), self.key, - self.value.len() as u64, + data.len() as u64, self.baggage, )); // self.cache.root.lru.insert( @@ -178,7 +198,7 @@ impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { let map_entry = PCacheMapEntry { lru_node: lru_ptr, data: data_ptr, - size: self.value.len(), + size: data.len(), }; self.cache.root.map.insert(self.key, map_entry); Ok(()) @@ -194,12 +214,13 @@ impl PersistentCache { if let Some(root) = NonNull::new(root.load() as *mut PCacheRoot) { let (tx, rx) = crossbeam_channel::unbounded(); let root_lru = Persistent(root.clone()); - std::thread::spawn(move || lru_worker::main(rx, root_lru)); + let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); let root = Persistent(root); Ok(Self { pal, tx, root, + hndl: Some(hndl), key_type: PhantomData::default(), }) } else { @@ -221,12 +242,13 @@ impl PersistentCache { }; let (tx, rx) = crossbeam_channel::unbounded(); let root_lru = Persistent(root.clone()); - std::thread::spawn(move || lru_worker::main(rx, root_lru)); + let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); let mut root = Persistent(root); Ok(Self { pal, tx, root, + hndl: Some(hndl), key_type: PhantomData::default(), }) } else { @@ -249,11 +271,29 @@ impl PersistentCache { } } + /// Return a [Buf]. + /// + /// TODO: We have to pin these entries to ensure that they may not be + /// evicted while in read-only mode. + pub fn get_buf(&self, key: K) -> Result { + let mut hasher = XxHash64::default(); + key.hash(&mut hasher); + let hash = hasher.finish(); + let res = self.root.map.get(&hash).cloned(); + if let Some(entry) = res { + self.tx.send(lru_worker::Msg::Touch(entry.lru_node)); + // self.root.lru.touch(&entry.lru_node)?; + Ok(Buf::from_persistent_ptr(entry.data, entry.size as u32)) + } else { + Err(PMapError::DoesNotExist) + } + } + /// Start an insertion. An insertion can only be successfully completed if values are properly evicted from the cache pub fn prepare_insert<'a>( &'a mut self, key: K, - value: &'a [u8], + value: Buf, baggage: T, ) -> PersistentCacheInsertion<'a, K, T> { let mut hasher = XxHash64::default(); diff --git a/fio-haura/src/fio-engine-haura.c b/fio-haura/src/fio-engine-haura.c index 28b26bd3..7053bfe7 100644 --- a/fio-haura/src/fio-engine-haura.c +++ b/fio-haura/src/fio-engine-haura.c @@ -304,14 +304,17 @@ static int fio_haura_setup(struct thread_data *td) { /* Haura needs some additional space to provide extra data like object * pointers and metadata. This is more of a hack, but nonetheless. */ creat(td->files[idx]->file_name, 0644); - if (truncate(td->files[idx]->file_name, td->o.file_size_high)) { + + // FIXME: If only one file is specified the file_size_high argument is 0. + // A fallback in this case is to use the usual size. + if (truncate(td->files[idx]->file_name, td->o.size * 2)) { fprintf( stderr, "Could not retruncate file to provide enough storage for Haura.\n"); } // Set the already allocated size so that fio avoids the internal initial // layouting - td->files[idx]->real_file_size = td->o.file_size_high; + td->files[idx]->real_file_size = td->o.size * 2; } td->io_ops_data = malloc(sizeof(size_t));