Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes Wünsche committed Jun 19, 2023
1 parent f58a1d8 commit f270e06
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 97 deletions.
2 changes: 1 addition & 1 deletion betree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ criterion = "0.3"
tempfile = "3.6.0"

[features]
default = ["init_env_logger", "figment_config"]
default = ["init_env_logger", "figment_config", "nvm"]

# unlock unstable API for consumption by bectl and other debugging tools
internal-api = []
Expand Down
11 changes: 8 additions & 3 deletions betree/src/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
use crate::size::{Size, StaticSize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt, hash::Hasher, iter::once};
use std::{
error::Error,
fmt,
hash::{Hash, Hasher},
iter::once,
};
use twox_hash;

/// A checksum to verify data integrity.
pub trait Checksum:
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static
{
/// Builds a new `Checksum`.
type Builder: Builder<Self>;
Expand Down Expand Up @@ -67,7 +72,7 @@ impl Error for ChecksumError {
/// `XxHash` contains a digest of `xxHash`
/// which is an "extremely fast non-cryptographic hash algorithm"
/// (<https://github.com/Cyan4973/xxHash>)
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct XxHash(u64);

impl StaticSize for XxHash {
Expand Down
61 changes: 54 additions & 7 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use super::{
object_ptr::ObjectPointer,
CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference,
};
#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;
use crate::{
allocator::{Action, SegmentAllocator, SegmentId},
buffer::Buf,
Expand Down Expand Up @@ -60,6 +62,8 @@ where
next_modified_node_id: AtomicU64,
next_disk_id: AtomicU64,
report_tx: Option<Sender<DmlMsg>>,
#[cfg(feature = "nvm")]
persistent_cache: Mutex<PersistentCache<DiskOffset, Option<DiskOffset>>>,
}

impl<E, SPL> Dmu<E, SPL>
Expand All @@ -76,6 +80,7 @@ where
alloc_strategy: [[Option<u8>; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES],
cache: E,
handler: Handler<ObjRef<ObjectPointer<SPL::Checksum>>>,
#[cfg(feature = "nvm")] persistent_cache: PersistentCache<DiskOffset, Option<DiskOffset>>,
) -> Self {
let allocation_data = (0..pool.storage_class_count())
.map(|class| {
Expand Down Expand Up @@ -103,6 +108,8 @@ where
next_modified_node_id: AtomicU64::new(1),
next_disk_id: AtomicU64::new(0),
report_tx: None,
#[cfg(feature = "nvm")]
persistent_cache: Mutex::new(persistent_cache),
}
}

Expand Down Expand Up @@ -226,6 +233,19 @@ where
let offset = op.offset();
let generation = op.generation();

#[cfg(feature = "nvm")]
let compressed_data = {
let mut cache = self.persistent_cache.lock();
if let Ok(buffer) = cache.get(offset) {
Buf::from_zero_padded(buffer.to_vec())
} else {
drop(cache);

self.pool
.read(op.size(), op.offset(), op.checksum().clone())?
}
};
#[cfg(not(feature = "nvm"))]
let compressed_data = self
.pool
.read(op.size(), op.offset(), op.checksum().clone())?;
Expand Down Expand Up @@ -329,7 +349,29 @@ where

let mid = match key {
ObjectKey::InWriteback(_) => unreachable!(),
ObjectKey::Unmodified { .. } => return Ok(()),
ObjectKey::Unmodified {
offset,
generation: _,
} => {
#[cfg(feature = "nvm")]
{
let mut pcache = self.persistent_cache.lock();
// TODO: Specify correct constant instead of magic 🪄
let mut vec = Vec::with_capacity(4 * 1024 * 1024);
object.value_mut().get_mut().pack(&mut vec)?;
pcache
.prepare_insert(offset, &vec, None)
.insert(|maybe_offset, data| {
// TODO: Write eventually not synced data to disk finally.
if let Some(offset) = maybe_offset {
self.pool
.begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?;
}
Ok(())
})?;
}
return Ok(());
}
ObjectKey::Modified(mid) => mid,
};

Expand All @@ -341,6 +383,8 @@ where
drop(cache);
let object = CacheValueRef::write(entry);

// Eviction at this points only writes a singular node as all children
// need to be unmodified beforehand.
self.handle_write_back(object, mid, true, pk)?;
Ok(())
}
Expand Down Expand Up @@ -412,7 +456,14 @@ where
state.finish()
};

self.pool.begin_write(compressed_data, offset)?;
#[cfg(feature = "nvm")]
let skip_write_back = true;
#[cfg(not(feature = "nvm"))]
let skip_write_back = false;

if !skip_write_back {
self.pool.begin_write(compressed_data, offset)?;
}

let obj_ptr = ObjectPointer {
offset,
Expand Down Expand Up @@ -499,11 +550,7 @@ where
{
warn!(
"Storage tier {class} does not have enough space remaining. {} blocks of {}",
self.handler
.free_space_tier(class)
.unwrap()
.free
.as_u64(),
self.handler.free_space_tier(class).unwrap().free.as_u64(),
size.as_u64()
);
continue;
Expand Down
8 changes: 8 additions & 0 deletions betree/src/data_management/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(missing_docs, unused_doc_comments)]
use crate::{storage_pool::DiskOffset, vdev::Block};
#[cfg(feature = "nvm")]
use pmem_hashmap::PMapError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -33,6 +35,12 @@ pub enum Error {
CallbackError,
#[error("A raw allocation has failed.")]
RawAllocationError { at: DiskOffset, size: Block<u32> },
#[cfg(feature = "nvm")]
#[error("A error occured while accessing the persistent cache.")]
PersistentCacheError {
#[from]
source: PMapError,
},
}

// To avoid recursive error types here, define a simple translation from
Expand Down
2 changes: 1 addition & 1 deletion betree/src/data_management/object_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)]
/// A pointer to an on-disk serialized object.
pub struct ObjectPointer<D> {
pub(super) decompression_tag: DecompressionTag,
Expand Down
11 changes: 10 additions & 1 deletion betree/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use crate::{
vdev::Block,
StoragePreference,
};

#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;

use bincode::{deserialize, serialize_into};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use crossbeam_channel::Sender;
Expand Down Expand Up @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration {
compression: CompressionConfiguration::None,
cache_size: DEFAULT_CACHE_SIZE,
#[cfg(feature = "nvm")]
persistent_cache_path: None,
persistent_cache: None,
access_mode: AccessMode::OpenIfExists,
sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS),
metrics: None,
Expand Down Expand Up @@ -233,6 +237,9 @@ impl DatabaseConfiguration {
}
}

#[cfg(feature = "nvm")]
let pcache = PersistentCache::create("foobar", 69420).expect("FIXME");

Dmu::new(
self.compression.to_builder(),
XxHashBuilder,
Expand All @@ -241,6 +248,8 @@ impl DatabaseConfiguration {
strategy,
ClockCache::new(self.cache_size),
handler,
#[cfg(feature = "nvm")]
pcache,
)
}

Expand Down
Loading

0 comments on commit f270e06

Please sign in to comment.