Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Byteview #15

Merged
merged 24 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "value-log"
description = "Value log implementation for key-value separated LSM storage"
license = "MIT OR Apache-2.0"
version = "1.4.1"
version = "1.5.0"
edition = "2021"
rust-version = "1.74.0"
readme = "README.md"
Expand All @@ -22,10 +22,11 @@ serde = ["dep:serde"]
bytes = ["dep:bytes"]

[dependencies]
bytes = { version = "1", optional = true }
byteorder = "1.5.0"
bytes = { version = "1.8.0", optional = true }
byteview = "0.4.0"
interval-heap = "0.0.5"
log = "0.4.22"
min-max-heap = "1.3.0"
path-absolutize = "3.1.1"
quick_cache = { version = "0.6.5", default-features = false }
rustc-hash = "2.0.0"
Expand Down
31 changes: 15 additions & 16 deletions src/segment/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
// (found in the LICENSE-* files in the repository)

use crate::{id::SegmentId, value::UserKey, Compressor, SegmentReader, UserValue};
use interval_heap::IntervalHeap;
use std::cmp::Reverse;

// TODO: replace with MinHeap...
use min_max_heap::MinMaxHeap;
macro_rules! fail_iter {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Some(Err(e.into())),
}
};
}

type IteratorIndex = usize;

Expand Down Expand Up @@ -42,16 +49,14 @@ impl Ord for IteratorValue {
#[allow(clippy::module_name_repetitions)]
pub struct MergeReader<C: Compressor + Clone> {
readers: Vec<SegmentReader<C>>,
heap: MinMaxHeap<IteratorValue>,
heap: IntervalHeap<IteratorValue>,
}

impl<C: Compressor + Clone> MergeReader<C> {
/// Initializes a new merging reader
pub fn new(readers: Vec<SegmentReader<C>>) -> Self {
Self {
readers,
heap: MinMaxHeap::new(),
}
let heap = IntervalHeap::with_capacity(readers.len());
Self { readers, heap }
}

fn advance_reader(&mut self, idx: usize) -> crate::Result<()> {
Expand Down Expand Up @@ -87,22 +92,16 @@ impl<C: Compressor + Clone> Iterator for MergeReader<C> {

fn next(&mut self) -> Option<Self::Item> {
if self.heap.is_empty() {
if let Err(e) = self.push_next() {
return Some(Err(e));
};
fail_iter!(self.push_next());
}

if let Some(head) = self.heap.pop_min() {
if let Err(e) = self.advance_reader(head.index) {
return Some(Err(e));
}
fail_iter!(self.advance_reader(head.index));

// Discard old items
while let Some(next) = self.heap.pop_min() {
if next.key == head.key {
if let Err(e) = self.advance_reader(next.index) {
return Some(Err(e));
}
fail_iter!(self.advance_reader(next.index));
} else {
// Reached next user key now
// Push back non-conflicting item and exit
Expand Down
76 changes: 27 additions & 49 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
// (found in the LICENSE-* files in the repository)

use super::{meta::METADATA_HEADER_MAGIC, writer::BLOB_HEADER_MAGIC};
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, UserValue};
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, Slice, UserValue};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
fs::File,
io::{BufReader, Read, Seek},
path::Path,
};

macro_rules! fail_iter {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Some(Err(e.into())),
}
};
}

/// Reads through a segment in order.
pub struct Reader<C: Compressor + Clone> {
pub(crate) segment_id: SegmentId,
Expand Down Expand Up @@ -62,10 +71,7 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {

{
let mut buf = [0; BLOB_HEADER_MAGIC.len()];

if let Err(e) = self.inner.read_exact(&mut buf) {
return Some(Err(e.into()));
};
fail_iter!(self.inner.read_exact(&mut buf));

if buf == METADATA_HEADER_MAGIC {
self.is_terminated = true;
Expand All @@ -79,54 +85,26 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {
}
}

let checksum = match self.inner.read_u64::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};
let checksum = fail_iter!(self.inner.read_u64::<BigEndian>());

let key_len = match self.inner.read_u16::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};

let mut key = vec![0; key_len.into()];
if let Err(e) = self.inner.read_exact(&mut key) {
return Some(Err(e.into()));
};

let val_len = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};

let mut val = vec![0; val_len as usize];
if let Err(e) = self.inner.read_exact(&mut val) {
return Some(Err(e.into()));
};
let key_len = fail_iter!(self.inner.read_u16::<BigEndian>());
let key = fail_iter!(Slice::from_reader(&mut self.inner, key_len as usize));

let val_len = fail_iter!(self.inner.read_u32::<BigEndian>());
let val = match &self.compression {
Some(compressor) => match compressor.decompress(&val) {
Ok(val) => val,
Err(e) => return Some(Err(e)),
},
None => val,
Some(compressor) => {
// TODO: https://github.com/PSeitz/lz4_flex/issues/166
let mut val = vec![0; val_len as usize];
fail_iter!(self.inner.read_exact(&mut val));
Slice::from(fail_iter!(compressor.decompress(&val)))
}
None => {
// NOTE: When not using compression, we can skip
// the intermediary heap allocation and read directly into a Slice
fail_iter!(Slice::from_reader(&mut self.inner, val_len as usize))
}
};

Some(Ok((key.into(), val.into(), checksum)))
Some(Ok((key, val, checksum)))
}
}
34 changes: 30 additions & 4 deletions src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

#[cfg(not(feature = "bytes"))]
mod slice_arc;
mod slice_default;

#[cfg(feature = "bytes")]
mod slice_bytes;
Expand All @@ -13,10 +13,10 @@ use std::{
sync::Arc,
};

#[cfg(not(feature = "bytes"))]
pub use slice_arc::Slice;
#[cfg(feature = "bytes")]
pub use slice_bytes::Slice;
#[cfg(not(feature = "bytes"))]
pub use slice_default::Slice;

impl AsRef<[u8]> for Slice {
fn as_ref(&self) -> &[u8] {
Expand All @@ -26,7 +26,21 @@ impl AsRef<[u8]> for Slice {

impl From<&[u8]> for Slice {
fn from(value: &[u8]) -> Self {
Self::new(value)
#[cfg(not(feature = "bytes"))]
{
Self(byteview::ByteView::new(value))
}

#[cfg(feature = "bytes")]
{
Self(bytes::Bytes::from(value.to_vec()))
}
}
}

impl From<Arc<[u8]>> for Slice {
fn from(value: Arc<[u8]>) -> Self {
Self::from(&*value)
}
}

Expand Down Expand Up @@ -180,6 +194,7 @@ mod serde {
mod tests {
use super::Slice;
use std::{fmt::Debug, sync::Arc};
use test_log::test;

fn assert_slice_handles<T>(v: T)
where
Expand All @@ -192,6 +207,17 @@ mod tests {
assert!(slice >= v, "slice_arc: {slice:?}, v: {v:?}");
}

#[test]
fn slice_empty() {
assert_eq!(Slice::empty(), []);
}

#[test]
fn slice_with_size() {
assert_eq!(Slice::with_size(5), [0, 0, 0, 0, 0]);
assert_eq!(Slice::with_size(50), [0; 50]);
}

/// This test verifies that we can create a `Slice` from various types and compare a `Slice` with them.
#[test]
fn test_slice_instantiation() {
Expand Down
31 changes: 22 additions & 9 deletions src/slice/slice_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use std::sync::Arc;

use bytes::{Bytes, BytesMut};

/// An immutable byte slice that can be cloned without additional heap allocation
///
/// There is no guarantee of any sort of alignment for zero-copy (de)serialization.
#[derive(Debug, Clone, Eq, Hash, Ord)]
pub struct Slice(pub(super) Bytes);

Expand All @@ -17,6 +17,26 @@ impl Slice {
Self(Bytes::copy_from_slice(bytes))
}

#[doc(hidden)]
#[must_use]
pub fn empty() -> Self {
Self(Bytes::from_static(&[]))
}

#[doc(hidden)]
#[must_use]
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
Self(self.0.slice(range))
}

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
let bytes = vec![0; len];
Self(Bytes::from(bytes))
}

/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
#[doc(hidden)]
pub fn from_reader<R: std::io::Read>(reader: &mut R, len: usize) -> std::io::Result<Self> {
let mut builder = BytesMut::zeroed(len);
Expand Down Expand Up @@ -50,10 +70,3 @@ impl From<String> for Slice {
Self(Bytes::from(value))
}
}

// Needed because slice_arc specializes this impl
impl From<Arc<[u8]>> for Slice {
fn from(value: Arc<[u8]>) -> Self {
Self::new(value.as_ref())
}
}
44 changes: 24 additions & 20 deletions src/slice/slice_arc.rs → src/slice/slice_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use std::sync::Arc;
use byteview::ByteView;

/// An immutable byte slice that can be cloned without additional heap allocation
///
/// There is no guarantee of any sort of alignment for zero-copy (de)serialization.
#[derive(Debug, Clone, Eq, Hash, Ord)]
pub struct Slice(pub(super) Arc<[u8]>);
pub struct Slice(pub(super) ByteView);

impl Slice {
/// Construct a [`Slice`] from a byte slice.
Expand All @@ -15,40 +17,42 @@ impl Slice {
Self(bytes.into())
}

#[doc(hidden)]
#[must_use]
pub fn empty() -> Self {
Self(ByteView::new(&[]))
}

#[doc(hidden)]
#[must_use]
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
Self(self.0.slice(range))
}

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
// TODO: optimize this with byteview to remove the reallocation
let v = vec![0; len];
Self(v.into())
Self(ByteView::with_size(len))
}

/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
#[doc(hidden)]
pub fn from_reader<R: std::io::Read>(reader: &mut R, len: usize) -> std::io::Result<Self> {
let mut view = Self::with_size(len);
let builder = Arc::get_mut(&mut view.0).expect("we are the owner");
reader.read_exact(builder)?;
Ok(view)
let view = ByteView::from_reader(reader, len)?;
Ok(Self(view))
}
}

// Arc::from<Vec<T>> is specialized
// Arc::from<Vec<u8>> is specialized
impl From<Vec<u8>> for Slice {
fn from(value: Vec<u8>) -> Self {
Self(Arc::from(value))
Self(ByteView::from(value))
}
}

// Arc::from<Vec<T>> is specialized
// Arc::from<Vec<String>> is specialized
impl From<String> for Slice {
fn from(value: String) -> Self {
Self(Arc::from(value.into_bytes()))
}
}

// direct conversion
impl From<Arc<[u8]>> for Slice {
fn from(value: Arc<[u8]>) -> Self {
Self(value)
Self(ByteView::from(value.into_bytes()))
}
}