Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes Wünsche committed Jun 16, 2023
1 parent 8b1d9f8 commit c9bbe43
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 36 deletions.
11 changes: 8 additions & 3 deletions betree/src/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
use crate::size::{Size, StaticSize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt, hash::Hasher, iter::once};
use std::{
error::Error,
fmt,
hash::{Hash, Hasher},
iter::once,
};
use twox_hash;

/// A checksum to verify data integrity.
pub trait Checksum:
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static
{
/// Builds a new `Checksum`.
type Builder: Builder<Self>;
Expand Down Expand Up @@ -67,7 +72,7 @@ impl Error for ChecksumError {
/// `XxHash` contains a digest of `xxHash`
/// which is an "extremely fast non-cryptographic hash algorithm"
/// (<https://github.com/Cyan4973/xxHash>)
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct XxHash(u64);

impl StaticSize for XxHash {
Expand Down
26 changes: 21 additions & 5 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use super::{
object_ptr::ObjectPointer,
CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference,
};
#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;
use crate::{
allocator::{Action, SegmentAllocator, SegmentId},
buffer::Buf,
Expand Down Expand Up @@ -60,6 +62,8 @@ where
next_modified_node_id: AtomicU64,
next_disk_id: AtomicU64,
report_tx: Option<Sender<DmlMsg>>,
#[cfg(feature = "nvm")]
persistent_cache: Mutex<PersistentCache<DiskOffset>>,
}

impl<E, SPL> Dmu<E, SPL>
Expand All @@ -76,6 +80,7 @@ where
alloc_strategy: [[Option<u8>; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES],
cache: E,
handler: Handler<ObjRef<ObjectPointer<SPL::Checksum>>>,
#[cfg(feature = "nvm")] persistent_cache: PersistentCache<DiskOffset>,
) -> Self {
let allocation_data = (0..pool.storage_class_count())
.map(|class| {
Expand Down Expand Up @@ -103,6 +108,8 @@ where
next_modified_node_id: AtomicU64::new(1),
next_disk_id: AtomicU64::new(0),
report_tx: None,
#[cfg(feature = "nvm")]
persistent_cache: Mutex::new(persistent_cache),
}
}

Expand Down Expand Up @@ -226,6 +233,19 @@ where
let offset = op.offset();
let generation = op.generation();

#[cfg(feature = "nvm")]
let compressed_data = {
let mut cache = self.persistent_cache.lock();
if let Ok(buffer) = cache.get(&op) {
Buf::from_zero_padded(buffer.to_vec())
} else {
drop(cache);

self.pool
.read(op.size(), op.offset(), op.checksum().clone())?
}
};
#[cfg(not(feature = "nvm"))]
let compressed_data = self
.pool
.read(op.size(), op.offset(), op.checksum().clone())?;
Expand Down Expand Up @@ -499,11 +519,7 @@ where
{
warn!(
"Storage tier {class} does not have enough space remaining. {} blocks of {}",
self.handler
.free_space_tier(class)
.unwrap()
.free
.as_u64(),
self.handler.free_space_tier(class).unwrap().free.as_u64(),
size.as_u64()
);
continue;
Expand Down
2 changes: 1 addition & 1 deletion betree/src/data_management/object_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)]
/// A pointer to an on-disk serialized object.
pub struct ObjectPointer<D> {
pub(super) decompression_tag: DecompressionTag,
Expand Down
11 changes: 10 additions & 1 deletion betree/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use crate::{
vdev::Block,
StoragePreference,
};

#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;

use bincode::{deserialize, serialize_into};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use crossbeam_channel::Sender;
Expand Down Expand Up @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration {
compression: CompressionConfiguration::None,
cache_size: DEFAULT_CACHE_SIZE,
#[cfg(feature = "nvm")]
persistent_cache_path: None,
persistent_cache: None,
access_mode: AccessMode::OpenIfExists,
sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS),
metrics: None,
Expand Down Expand Up @@ -233,6 +237,9 @@ impl DatabaseConfiguration {
}
}

#[cfg(feature = "nvm")]
let pcache = PersistentCache::create("foobar", 69420).expect("FIXME");

Dmu::new(
self.compression.to_builder(),
XxHashBuilder,
Expand All @@ -241,6 +248,8 @@ impl DatabaseConfiguration {
strategy,
ClockCache::new(self.cache_size),
handler,
#[cfg(feature = "nvm")]
pcache,
)
}

Expand Down
84 changes: 64 additions & 20 deletions betree/src/replication/lru.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{DataKey, Persistent, PREFIX_LRU};
use pmem_hashmap::{PMap, PMapError};
use std::{marker::PhantomData, mem::size_of, ptr::NonNull};

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Key pointing to an LRU node in a persistent cache.
Expand All @@ -11,7 +13,7 @@ impl LruKey {
/// ======
/// The internals of this method are highly unsafe. Concurrent usage and
/// removal are urgently discouraged.
fn fetch(&self, map: &mut PMap) -> Result<&mut PlruNode, PMapError> {
fn fetch<T>(&self, map: &mut PMap) -> Result<&mut PlruNode<T>, PMapError> {
map.get(self.key()).and_then(|node_raw| unsafe {
Ok(std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>(
node_raw.try_into().unwrap(),
Expand All @@ -34,26 +36,43 @@ impl LruKey {

/// Persistent LRU
#[repr(C)]
pub struct Plru {
pub struct Plru<T> {
head: Option<LruKey>,
tail: Option<LruKey>,
// in Blocks? Return evicted element when full
capacity: u64,
count: u64,
size: u64,
// Satisfy the rust compiler by adding a zero sized phantom here
// Also, avoid the drop checker by using a raw ptr
key_type: PhantomData<*const T>,
}

impl Plru {
// hack ⛏
const PLRU_DUMMY_LENGTH_READ_COMMENT: usize = size_of::<Plru<()>>();

impl<T> Plru<T> {
pub fn create(map: &mut PMap, capacity: u64) -> Result<Persistent<Self>, PMapError> {
let this = Self {
head: None,
tail: None,
capacity,
size: 0,
count: 0,
key_type: PhantomData::default(),
};
map.insert([super::PREFIX_LRU_ROOT], unsafe {
&std::mem::transmute::<_, [u8; size_of::<Plru>()]>(this)
// This does not work as of
// https://github.com/rust-lang/rust/issues/43408
//
// &std::mem::transmute::<_, [u8; size_of::<Plru<T>>()]>(this)
//
// While annoying, we can avoid this problem by specifying a certain
// type to the struct and use the result from this constant for
// every other type as the type parameter is only a phantom. 👻
//
// instead let's do this:
&std::mem::transmute::<_, [u8; PLRU_DUMMY_LENGTH_READ_COMMENT]>(this)
})?;
Self::open(map)
}
Expand All @@ -78,7 +97,7 @@ impl Plru {

// Fixate new head
let old_head_ptr = self.head.expect("Invalid State");
let old_head = old_head_ptr.fetch(map).unwrap();
let old_head: &mut PlruNode<T> = old_head_ptr.fetch(map).unwrap();
old_head.fwd = Some(node_ptr);
node.back = self.head;
self.head = Some(node_ptr);
Expand All @@ -87,19 +106,26 @@ impl Plru {
}

/// Add a new entry into the LRU. Will fail if already present.
pub fn insert(&mut self, map: &mut PMap, node_ptr: LruKey, size: u64) -> Result<(), PMapError> {
pub fn insert(
&mut self,
map: &mut PMap,
node_ptr: LruKey,
size: u64,
baggage: T,
) -> Result<(), PMapError> {
let node = PlruNode {
fwd: None,
back: self.head,
size,
data: node_ptr.0,
key: baggage,
};

map.insert(node_ptr.key(), &unsafe {
std::mem::transmute::<_, [u8; PLRU_NODE_SIZE]>(node)
map.insert(node_ptr.key(), unsafe {
std::mem::transmute::<_, &[u8; PLRU_NODE_SIZE]>(&node)
})?;
if let Some(head_ptr) = self.head {
let head = head_ptr.fetch(map)?;
let head: &mut PlruNode<T> = head_ptr.fetch(map)?;
head.fwd = Some(node_ptr);
self.head = Some(node_ptr);
} else {
Expand All @@ -124,18 +150,18 @@ impl Plru {
fn cut_node_and_stitch(
&mut self,
map: &mut PMap,
node: &mut PlruNode,
node: &mut PlruNode<T>,
) -> Result<(), PMapError> {
let node_ptr = LruKey(node.data);
if let Some(forward_ptr) = node.fwd {
let forward = forward_ptr.fetch(map)?;
let forward: &mut PlruNode<T> = forward_ptr.fetch(map)?;
forward.back = node.back;
}
if self.tail == Some(node_ptr) {
self.tail = node.fwd;
}
if let Some(back_ptr) = node.back {
let back = back_ptr.fetch(map)?;
let back: &mut PlruNode<T> = back_ptr.fetch(map)?;
back.fwd = node.fwd;
}
if self.head == Some(node_ptr) {
Expand All @@ -159,12 +185,6 @@ impl Plru {
}
}

use std::{mem::size_of, ptr::NonNull};

use super::{DataKey, Persistent, PREFIX_LRU};

const PLRU_NODE_SIZE: usize = size_of::<PlruNode>();

/// Ephemeral Wrapper around a byte array for sane access code.
///
/// Structure
Expand All @@ -175,17 +195,41 @@ const PLRU_NODE_SIZE: usize = size_of::<PlruNode>();
/// .. │ ├────>
/// └───┘ back
///
/// Size Constraint
/// ===============
/// This structure allows for a generic member which is to be returned when
/// evictions are happening. The size available to the entire object is 256
/// bytes of which the custom type can occupy at most 208 bytes.
///
/// Safety
/// ======
/// Using this wrapper requires transmuting the underlying byte array, which
/// invalidates, when used on references, all borrow checker guarantees. Use
/// with extrem caution, and be sure what you are doing.
#[repr(C)]
pub struct PlruNode {
pub struct PlruNode<T> {
fwd: Option<LruKey>,
back: Option<LruKey>,
size: u64,
data: u64,
key: T,
}
const PLRU_NODE_SIZE: usize = 256;

impl<T> PlruNode<T> {
const SIZE_CONSTRAINT: () = assert!(std::mem::size_of::<PlruNode<T>>() < PLRU_NODE_SIZE);

pub fn new(fwd: Option<LruKey>, back: Option<LruKey>, size: u64, data: u64, key: T) -> Self {
// has to remain to ensure that the code path is evaluated by rustc
let _ = Self::SIZE_CONSTRAINT;
Self {
fwd,
back,
size,
data,
key,
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -283,7 +327,7 @@ mod tests {
fn remove() {
let file = TestFile::new();
let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap();
let mut lru = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap();
let mut lru: Plru<()> = Plru::create(&mut pmap, 32 * 1024 * 1024).unwrap();
lru.insert(&mut pmap, LruKey(100), 16 * 1024 * 1024)
.unwrap();
lru.remove(&mut pmap, LruKey(100)).unwrap();
Expand Down
32 changes: 26 additions & 6 deletions betree/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ impl DataKey {

/// Persistent byte array cache. Optimized for read performance, avoid frequent
/// updates.
pub struct PersistentCache {
pub struct PersistentCache<T> {
pmap: PMap,
// Persistent Pointer
lru: Persistent<Plru>,
lru: Persistent<Plru<T>>,
}

/// Configuration for a persistent cache.
Expand All @@ -104,7 +104,14 @@ pub struct PersistentCacheConfig {
pub bytes: usize,
}

impl PersistentCache {
/// Ephemeral struct created in the preparation for an insertion.
pub struct PersistentCacheInsertion<'a, T> {
cache: &'a mut PersistentCache<T>,
key: DataKey,
value: &'a [u8],
}

impl<T> PersistentCache<T> {
/// Open an existent [PersistentCache]. Fails if no cache exist or invalid.
pub fn open<P: Into<std::path::PathBuf>>(path: P) -> Result<Self, PMapError> {
let mut pmap = PMap::open(path.into())?;
Expand All @@ -130,13 +137,26 @@ impl PersistentCache {
Ok(self.pmap.get(k.key())?)
}

pub fn prepare_insert<'a, K: Hash>(
&'a mut self,
key: K,
value: &'a [u8],
) -> PersistentCacheInsertion<'a, T> {
let k = DataKey::from(key, &mut self.pmap);
PersistentCacheInsertion {
cache: self,
key: k,
value,
}
}

/// Insert an entry and remove infrequent values.
pub fn insert<K: Hash>(&mut self, key: K, value: &[u8]) -> Result<(), PMapError> {
pub fn insert<K: Hash>(&mut self, key: K, value: &[u8], bag: T) -> Result<(), PMapError> {
// TODO: Update old values? Convenience
let k = DataKey::from(key, &mut self.pmap);
self.pmap.insert(k.key(), value)?;
self.lru
.insert(&mut self.pmap, k.to_lru_key(), value.len() as u64)?;
.insert(&mut self.pmap, k.to_lru_key(), value.len() as u64, bag)?;
self.pmap.insert(k.key(), value)?;
while let Some(id) = self.lru.evict() {
self.pmap.remove(id.key())?;
self.lru.remove(&mut self.pmap, id.to_lru_key())?;
Expand Down
Loading

0 comments on commit c9bbe43

Please sign in to comment.