Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NVM-opt tree with compression #63

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
32d1cd6
temp checkin
SajadKarim Dec 11, 2023
7019d26
temp checkin
SajadKarim Dec 11, 2023
1dd4d28
temp checkin
SajadKarim Dec 12, 2023
73df720
temp checkin
SajadKarim Dec 12, 2023
6dc44d0
temp checkin
SajadKarim Dec 13, 2023
26b031b
temp checkin
SajadKarim Dec 13, 2023
280b889
temp checkin
SajadKarim Dec 13, 2023
5da2225
push unfinished changes to compare code with the main branch.
SajadKarim Jan 2, 2024
70494f2
Fix some issues.
SajadKarim Jan 3, 2024
4677467
Resolve some compilation issues.
SajadKarim Jan 3, 2024
1344a3f
Bug fix in-progress.
SajadKarim Jan 3, 2024
79a182f
Bug fix is still in progress.
SajadKarim Jan 4, 2024
9a09c56
Save the changes made thus far.
SajadKarim Jan 5, 2024
cfcad73
Save the changes made thus far.
SajadKarim Jan 5, 2024
4bf0d58
temp checkin
SajadKarim Jan 5, 2024
217cbf0
Add changes to the unit tests that are related to NVM. For the time b…
SajadKarim Jan 5, 2024
8999564
Move changes related to reading individual entries from NVM.
SajadKarim Jan 8, 2024
b72df83
Remove some unnecessary changes.
SajadKarim Jan 8, 2024
4ef3369
NVM-optimized Bepsilon tree for Haura.
SajadKarim Jan 12, 2024
9fdf389
Add a few changes to isolate and fix bug(s).
SajadKarim Jun 25, 2024
2df23f0
Add a few changes to isolate and fix bug(s).
SajadKarim Jun 25, 2024
6f6b8bd
Add changes to compression logic after addressing bug in the nvm-tree…
SajadKarim Jul 1, 2024
e933e26
Add changes to try adding compression logic directly in rkyv.
SajadKarim Jul 6, 2024
e7baa3b
Add changes to try adding compression logic directly in rkyv.
SajadKarim Jul 6, 2024
72d65a3
Add a few changes I made while benchmarking the logic.
SajadKarim Jul 8, 2024
35ecfea
Add a few fixes to improve execution time related to compression.
SajadKarim Jul 15, 2024
9672a25
add a few changes to compare nvm-tree with the normal tree.
SajadKarim Jul 16, 2024
3752f12
Add a few fixes.
SajadKarim Jul 19, 2024
d0a93fe
Add to change to see its impact on the read performance of nvm-tree.
SajadKarim Jul 19, 2024
6f7f4cc
Add a few changes to evaluate trees.
SajadKarim Jul 24, 2024
42c6913
temp checkin
SajadKarim Sep 5, 2024
66ded02
temp intermediary changes
SajadKarim Sep 6, 2024
da08bfd
cutomized compression/decompression logic for nvm-opt nodes.
SajadKarim Sep 6, 2024
25ec055
cutomized compression/decompression logic for nvm-opt nodes.
SajadKarim Sep 6, 2024
c1367c3
Add some changes related to lz4.
SajadKarim Sep 7, 2024
f6d3b51
add lz4 related changes/fixes
SajadKarim Sep 8, 2024
5331be0
Add minor changes made during testing the functionality.
SajadKarim Sep 10, 2024
1446fc3
Add temp changes to generate numbers for poster.
SajadKarim Sep 10, 2024
ca604af
Add some changes related to compression.
SajadKarim Oct 1, 2024
0cd9d90
Add changes related to compression.
SajadKarim Oct 1, 2024
9dd0d0f
Add changes related to compression.
SajadKarim Oct 1, 2024
6fb2449
1. Add changes to DatabaseConfiguration regarding NVM tree. 2. Remove…
SajadKarim Oct 5, 2024
b3fa2b4
Remove a few unnecessary changes.
SajadKarim Nov 5, 2024
387cc06
Remove a few unnecessary changes.
SajadKarim Nov 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions betree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ harness = false
name = "tree"
harness = false

#[workspace.dependencies]
#rkyv = { version = "0.7.42", features = ["validation"] }
#betree-tests = { version = "0.1.0"}

[dependencies]
once_cell = "1.10.0"
futures = { version = "0.3", features = ["thread-pool"] }
serde = { version = "1.0", features = [ "derive" ] }
bincode = "1.0"
Expand All @@ -42,6 +47,7 @@ core_affinity = "0.5"
async-trait = "0.1"

lz4-sys = "1.9"
lz4 = "1.23.1"
zstd = { version = "0.9", default-features = false }
zstd-safe = { version = "4.0", default-features = false, features = ["experimental"] }

Expand All @@ -60,6 +66,20 @@ rand = { version = "0.8", features = ["std_rng"] }

pmdk = { path = "./pmdk", optional = true }

rkyv = { version = "0.7.42", features = ["validation"] }
#rkyv = { path = "../rkyv/rkyv"}
#rkyv_dyn = { path = "../rkyv/rkyv_dyn"}
#rkyv_dyn_test = { path = "../rkyv/rkyv_dyn_test"}
#rkyv_test = { path = "../rkyv/rkyv_test"}
#rkyv_typename = { path = "../rkyv/rkyv_typename"}
#betree-tests = { path = "./tests"}

bytecheck = { version = "0.7.0" }
extend = { version = "1.2.0" }

chrono = "0.4"

lazy_static = "1.4"
[dev-dependencies]
rand_xorshift = "0.3"
quickcheck = "1"
Expand Down
15 changes: 15 additions & 0 deletions betree/pmdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

include!(concat!(env!("OUT_DIR"), "/bindings.rs"));

use std::slice;
use std::os::raw::c_void;

#[derive(Debug)]
Expand All @@ -14,6 +15,11 @@ pub struct PMem {
unsafe impl Send for PMem {}
unsafe impl Sync for PMem {}

unsafe fn voidp_to_ref<'a, T>(p: *const c_void) -> &'a T
{
unsafe { &*(p as *const T) }
}

impl PMem {
pub fn create(filepath : &str, len: u64, mapped_len : &mut u64, is_pmem : &mut i32) -> Result<Self, std::io::Error> {
let mut ptr = unsafe {
Expand Down Expand Up @@ -73,6 +79,15 @@ impl PMem {
Ok(())
}

pub unsafe fn get_slice(&self, offset: usize, len: usize) -> Result<&'static [u8], std::io::Error>{
if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("File handle is missing for the PMEM file.")));
}

Ok(slice::from_raw_parts(voidp_to_ref::<u8>(self.ptr.add(offset)), len))
}

pub unsafe fn write(&self, offset: usize, data: &[u8], len: usize) -> Result<(), std::io::Error>{
if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
Expand Down
21 changes: 21 additions & 0 deletions betree/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ pub struct BufWrite {
}

impl BufWrite {
pub fn get_size(&self) -> u32 {
self.size
}

/// Create an empty [BufWrite] with the specified capacity.
/// The backing storage is zeroed.
pub fn with_capacity(capacity: Block<u32>) -> Self {
Expand Down Expand Up @@ -290,6 +294,23 @@ impl io::Write for BufWrite {
}
}


// Implementation of WriteBuf trait for BufWrite with methods for buffer manipulation
unsafe impl zstd::stream::raw::WriteBuf for BufWrite {
fn as_slice(&self) -> &[u8] {
self.as_ref()
}
fn capacity(&self) -> usize {
self.buf.capacity.to_bytes() as usize
}
fn as_mut_ptr(&mut self) -> *mut u8 {
unsafe { self.buf.ptr.as_mut().unwrap() }
}
unsafe fn filled_until(&mut self, n: usize) {
self.size = n as u32
}
}

impl io::Seek for BufWrite {
fn seek(&mut self, seek: io::SeekFrom) -> io::Result<u64> {
use io::SeekFrom::*;
Expand Down
21 changes: 16 additions & 5 deletions betree/src/checksum.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
//! This module provides a `Checksum` trait for verifying data integrity.

use crate::size::{Size, StaticSize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
// Serde-related functionality is used using namespace resolution operator
//use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt, hash::Hasher, iter::once};
use twox_hash;

use rkyv::{
archived_root,
ser::{serializers::AllocSerializer, ScratchSpace, Serializer},
vec::{ArchivedVec, VecResolver},
with::{ArchiveWith, DeserializeWith, SerializeWith},
Archive, Archived, Deserialize, Fallible, Infallible, Serialize, AlignedVec,
};


/// A checksum to verify data integrity.
pub trait Checksum:
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
serde::Serialize + serde::de::DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
{
/// Builds a new `Checksum`.
type Builder: Builder<Self>;
Expand All @@ -27,7 +37,7 @@ pub trait Checksum:

/// A checksum builder
pub trait Builder<C: Checksum>:
Serialize + DeserializeOwned + Clone + Send + Sync + fmt::Debug + 'static
serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + fmt::Debug + 'static
{
/// The internal state of the checksum.
type State: State<Checksum = C>;
Expand Down Expand Up @@ -67,7 +77,8 @@ impl Error for ChecksumError {
/// `XxHash` contains a digest of `xxHash`
/// which is an "extremely fast non-cryptographic hash algorithm"
/// (<https://github.com/Cyan4973/xxHash>)
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct XxHash(u64);

impl StaticSize for XxHash {
Expand Down Expand Up @@ -97,7 +108,7 @@ impl Checksum for XxHash {
}

/// The corresponding `Builder` for `XxHash`.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct XxHashBuilder;

impl Builder<XxHash> for XxHashBuilder {
Expand Down
119 changes: 99 additions & 20 deletions betree/src/compression/lz4.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use super::{ CompressionConfiguration, CompressionState, DecompressionState, DecompressionTag, DEFAULT_BUFFER_SIZE, Result };
use super::{ CompressionBuilder, CompressionState, DecompressionState, DecompressionTag, DEFAULT_BUFFER_SIZE, Result };
use crate::size::StaticSize;
use crate::buffer::{Buf, BufWrite};

use crate::{
vdev::Block,
};
use std::io::Write;

use serde::{Deserialize, Serialize};
use std::io::{self, Read};
use zstd_safe::WriteBuf;
use std::io::{self, BufReader, Read};

use std::{
mem,
};
use std::sync::{Arc, Mutex};

// use lz4_sys::{ Lz4
use lz4::{Encoder, Decoder, EncoderBuilder, ContentChecksum, BlockSize, BlockMode};

/// LZ4 compression. (<https://github.com/lz4/lz4>)
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
Expand All @@ -25,24 +36,27 @@ pub struct Lz4Compression {
pub struct Lz4Decompression;

impl StaticSize for Lz4 {
fn size() -> usize {
fn static_size() -> usize {
1
}
}

impl CompressionConfiguration for Lz4 {
fn new_compression(&self) -> Result<Box<dyn CompressionState>> {
let encoder = EncoderBuilder::new()
impl CompressionBuilder for Lz4 {
fn new_compression(&self) -> Result<Arc<std::sync::RwLock<dyn CompressionState>>> {
let mut encoder = EncoderBuilder::new()
.level(u32::from(self.level))
.checksum(ContentChecksum::NoChecksum)
.block_size(BlockSize::Max4MB)
.block_mode(BlockMode::Linked)
.build(BufWrite::with_capacity(DEFAULT_BUFFER_SIZE))?;

Ok(Box::new(Lz4Compression { config: self.clone(), encoder }))
Ok(Arc::new(std::sync::RwLock::new(Lz4Compression { config: self.clone(), encoder })))

}

fn decompression_tag(&self) -> DecompressionTag { DecompressionTag::Lz4 }
fn decompression_tag(&self) -> DecompressionTag {
DecompressionTag::Lz4
}
}

impl Lz4 {
Expand All @@ -53,30 +67,95 @@ impl Lz4 {

impl io::Write for Lz4Compression {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.encoder.write(buf)
unimplemented!()
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.encoder.write_all(buf)
unimplemented!()
}

fn flush(&mut self) -> io::Result<()> {
self.encoder.flush()
unimplemented!()
}
}

use std::time::Instant;
use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

use lz4_sys::{LZ4F_compressBound, LZ4FPreferences, LZ4FCompressionContext, LZ4F_createCompressionContext};
use std::ptr;
use lz4_sys::LZ4FFrameInfo;

impl CompressionState for Lz4Compression {
fn finish(&mut self) -> Buf {
let (v, result) = self.encoder.finish();
result.unwrap();
v.into_buf()
fn finish_ext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
let size = data.len();
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));

let mut encoder = EncoderBuilder::new()
.level(u32::from(self.config.level))
.checksum(ContentChecksum::NoChecksum)
.block_size(BlockSize::Max4MB)
.block_mode(BlockMode::Linked)
.build(buf)?;

encoder.write_all(data.as_ref())?;
let (compressed_data, result) = encoder.finish();

if let Err(e) = result {
panic!("Compression failed: {:?}", e);
}

let mut buf_opt = BufWrite::with_capacity(Block::round_up_from_bytes(compressed_data.as_slice().len() as u32));
buf_opt.write_all(compressed_data.as_slice());

Ok(buf_opt.as_slice().to_vec())
}

fn finish(&mut self, data: Buf) -> Result<Buf> {
let size = data.as_ref().len();

let mut buf: BufWrite = BufWrite::with_capacity(Block::round_up_from_bytes( (size as u32)));

let mut encoder = EncoderBuilder::new()
.level(u32::from(self.config.level))
.checksum(ContentChecksum::NoChecksum)
.block_size(BlockSize::Max4MB)
.block_mode(BlockMode::Linked)
.build(buf)?;

encoder.write_all(data.as_ref())?;
let (compressed_data, result) = encoder.finish();

if let Err(e) = result {
panic!("Compression failed: {:?}", e);
}

let mut buf_opt = BufWrite::with_capacity(Block::round_up_from_bytes(compressed_data.as_slice().len() as u32));
buf_opt.write_all(compressed_data.as_slice());

Ok(buf_opt.into_buf())
}
}

impl DecompressionState for Lz4Decompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
let mut output = Vec::with_capacity(DEFAULT_BUFFER_SIZE.to_bytes() as usize);
Decoder::new(&data[..])?.read_to_end(&mut output)?;
Ok(output.into_boxed_slice())
fn decompress_ext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
let size = data.as_ref().len() as u32;
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));
let mut decoder = Decoder::new(data.as_ref())?;

io::copy(&mut decoder, &mut buf)?;
Ok(buf.as_slice().to_vec())
}

fn decompress(&mut self, data: Buf) -> Result<Buf> {
let size = data.as_ref().len() as u32;
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));
let mut decoder = Decoder::new(data.as_ref())?;

io::copy(&mut decoder, &mut buf)?;
Ok(buf.into_buf())
}
}
Loading