diff --git a/Cargo.toml b/Cargo.toml index 4de0588..eb957c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "value-log" description = "Value log implementation for key-value separated LSM storage" license = "MIT OR Apache-2.0" -version = "1.4.1" +version = "1.5.0" edition = "2021" rust-version = "1.74.0" readme = "README.md" @@ -22,10 +22,11 @@ serde = ["dep:serde"] bytes = ["dep:bytes"] [dependencies] +bytes = { version = "1", optional = true } byteorder = "1.5.0" -bytes = { version = "1.8.0", optional = true } +byteview = "0.4.0" +interval-heap = "0.0.5" log = "0.4.22" -min-max-heap = "1.3.0" path-absolutize = "3.1.1" quick_cache = { version = "0.6.5", default-features = false } rustc-hash = "2.0.0" diff --git a/src/segment/merge.rs b/src/segment/merge.rs index 587042c..5582a72 100644 --- a/src/segment/merge.rs +++ b/src/segment/merge.rs @@ -3,10 +3,17 @@ // (found in the LICENSE-* files in the repository) use crate::{id::SegmentId, value::UserKey, Compressor, SegmentReader, UserValue}; +use interval_heap::IntervalHeap; use std::cmp::Reverse; -// TODO: replace with MinHeap... -use min_max_heap::MinMaxHeap; +macro_rules! fail_iter { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return Some(Err(e.into())), + } + }; +} type IteratorIndex = usize; @@ -42,16 +49,14 @@ impl Ord for IteratorValue { #[allow(clippy::module_name_repetitions)] pub struct MergeReader { readers: Vec>, - heap: MinMaxHeap, + heap: IntervalHeap, } impl MergeReader { /// Initializes a new merging reader pub fn new(readers: Vec>) -> Self { - Self { - readers, - heap: MinMaxHeap::new(), - } + let heap = IntervalHeap::with_capacity(readers.len()); + Self { readers, heap } } fn advance_reader(&mut self, idx: usize) -> crate::Result<()> { @@ -87,22 +92,16 @@ impl Iterator for MergeReader { fn next(&mut self) -> Option { if self.heap.is_empty() { - if let Err(e) = self.push_next() { - return Some(Err(e)); - }; + fail_iter!(self.push_next()); } if let Some(head) = self.heap.pop_min() { - if let Err(e) = self.advance_reader(head.index) { - return Some(Err(e)); - } + fail_iter!(self.advance_reader(head.index)); // Discard old items while let Some(next) = self.heap.pop_min() { if next.key == head.key { - if let Err(e) = self.advance_reader(next.index) { - return Some(Err(e)); - } + fail_iter!(self.advance_reader(next.index)); } else { // Reached next user key now // Push back non-conflicting item and exit diff --git a/src/segment/reader.rs b/src/segment/reader.rs index 7bfeb90..42d11df 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use super::{meta::METADATA_HEADER_MAGIC, writer::BLOB_HEADER_MAGIC}; -use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, UserValue}; +use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, Slice, UserValue}; use byteorder::{BigEndian, ReadBytesExt}; use std::{ fs::File, @@ -11,6 +11,15 @@ use std::{ path::Path, }; +macro_rules! fail_iter { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return Some(Err(e.into())), + } + }; +} + /// Reads through a segment in order. pub struct Reader { pub(crate) segment_id: SegmentId, @@ -62,10 +71,7 @@ impl Iterator for Reader { { let mut buf = [0; BLOB_HEADER_MAGIC.len()]; - - if let Err(e) = self.inner.read_exact(&mut buf) { - return Some(Err(e.into())); - }; + fail_iter!(self.inner.read_exact(&mut buf)); if buf == METADATA_HEADER_MAGIC { self.is_terminated = true; @@ -79,54 +85,26 @@ impl Iterator for Reader { } } - let checksum = match self.inner.read_u64::() { - Ok(v) => v, - Err(e) => { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - return None; - } - return Some(Err(e.into())); - } - }; + let checksum = fail_iter!(self.inner.read_u64::()); - let key_len = match self.inner.read_u16::() { - Ok(v) => v, - Err(e) => { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - return None; - } - return Some(Err(e.into())); - } - }; - - let mut key = vec![0; key_len.into()]; - if let Err(e) = self.inner.read_exact(&mut key) { - return Some(Err(e.into())); - }; - - let val_len = match self.inner.read_u32::() { - Ok(v) => v, - Err(e) => { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - return None; - } - return Some(Err(e.into())); - } - }; - - let mut val = vec![0; val_len as usize]; - if let Err(e) = self.inner.read_exact(&mut val) { - return Some(Err(e.into())); - }; + let key_len = fail_iter!(self.inner.read_u16::()); + let key = fail_iter!(Slice::from_reader(&mut self.inner, key_len as usize)); + let val_len = fail_iter!(self.inner.read_u32::()); let val = match &self.compression { - Some(compressor) => match compressor.decompress(&val) { - Ok(val) => val, - Err(e) => return Some(Err(e)), - }, - None => val, + Some(compressor) => { + // TODO: https://github.com/PSeitz/lz4_flex/issues/166 + let mut val = vec![0; val_len as usize]; + fail_iter!(self.inner.read_exact(&mut val)); + Slice::from(fail_iter!(compressor.decompress(&val))) + } + None => { + // NOTE: When not using compression, we can skip + // the intermediary heap allocation and read directly into a Slice + fail_iter!(Slice::from_reader(&mut self.inner, val_len as usize)) + } }; - Some(Ok((key.into(), val.into(), checksum))) + Some(Ok((key, val, checksum))) } } diff --git a/src/slice.rs b/src/slice.rs index e9f4a35..9d150f4 100644 --- a/src/slice.rs +++ b/src/slice.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) #[cfg(not(feature = "bytes"))] -mod slice_arc; +mod slice_default; #[cfg(feature = "bytes")] mod slice_bytes; @@ -13,10 +13,10 @@ use std::{ sync::Arc, }; -#[cfg(not(feature = "bytes"))] -pub use slice_arc::Slice; #[cfg(feature = "bytes")] pub use slice_bytes::Slice; +#[cfg(not(feature = "bytes"))] +pub use slice_default::Slice; impl AsRef<[u8]> for Slice { fn as_ref(&self) -> &[u8] { @@ -26,7 +26,21 @@ impl AsRef<[u8]> for Slice { impl From<&[u8]> for Slice { fn from(value: &[u8]) -> Self { - Self::new(value) + #[cfg(not(feature = "bytes"))] + { + Self(byteview::ByteView::new(value)) + } + + #[cfg(feature = "bytes")] + { + Self(bytes::Bytes::from(value.to_vec())) + } + } +} + +impl From> for Slice { + fn from(value: Arc<[u8]>) -> Self { + Self::from(&*value) } } @@ -180,6 +194,7 @@ mod serde { mod tests { use super::Slice; use std::{fmt::Debug, sync::Arc}; + use test_log::test; fn assert_slice_handles(v: T) where @@ -192,6 +207,17 @@ mod tests { assert!(slice >= v, "slice_arc: {slice:?}, v: {v:?}"); } + #[test] + fn slice_empty() { + assert_eq!(Slice::empty(), []); + } + + #[test] + fn slice_with_size() { + assert_eq!(Slice::with_size(5), [0, 0, 0, 0, 0]); + assert_eq!(Slice::with_size(50), [0; 50]); + } + /// This test verifies that we can create a `Slice` from various types and compare a `Slice` with them. #[test] fn test_slice_instantiation() { diff --git a/src/slice/slice_bytes.rs b/src/slice/slice_bytes.rs index 01d8c09..80db6d6 100644 --- a/src/slice/slice_bytes.rs +++ b/src/slice/slice_bytes.rs @@ -2,11 +2,11 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use std::sync::Arc; - use bytes::{Bytes, BytesMut}; /// An immutable byte slice that can be cloned without additional heap allocation +/// +/// There is no guarantee of any sort of alignment for zero-copy (de)serialization. #[derive(Debug, Clone, Eq, Hash, Ord)] pub struct Slice(pub(super) Bytes); @@ -17,6 +17,26 @@ impl Slice { Self(Bytes::copy_from_slice(bytes)) } + #[doc(hidden)] + #[must_use] + pub fn empty() -> Self { + Self(Bytes::from_static(&[])) + } + + #[doc(hidden)] + #[must_use] + pub fn slice(&self, range: impl std::ops::RangeBounds) -> Self { + Self(self.0.slice(range)) + } + + #[must_use] + #[doc(hidden)] + pub fn with_size(len: usize) -> Self { + let bytes = vec![0; len]; + Self(Bytes::from(bytes)) + } + + /// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes. #[doc(hidden)] pub fn from_reader(reader: &mut R, len: usize) -> std::io::Result { let mut builder = BytesMut::zeroed(len); @@ -50,10 +70,3 @@ impl From for Slice { Self(Bytes::from(value)) } } - -// Needed because slice_arc specializes this impl -impl From> for Slice { - fn from(value: Arc<[u8]>) -> Self { - Self::new(value.as_ref()) - } -} diff --git a/src/slice/slice_arc.rs b/src/slice/slice_default.rs similarity index 51% rename from src/slice/slice_arc.rs rename to src/slice/slice_default.rs index 5037e91..1b3911d 100644 --- a/src/slice/slice_arc.rs +++ b/src/slice/slice_default.rs @@ -2,11 +2,13 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use std::sync::Arc; +use byteview::ByteView; /// An immutable byte slice that can be cloned without additional heap allocation +/// +/// There is no guarantee of any sort of alignment for zero-copy (de)serialization. #[derive(Debug, Clone, Eq, Hash, Ord)] -pub struct Slice(pub(super) Arc<[u8]>); +pub struct Slice(pub(super) ByteView); impl Slice { /// Construct a [`Slice`] from a byte slice. @@ -15,40 +17,42 @@ impl Slice { Self(bytes.into()) } + #[doc(hidden)] + #[must_use] + pub fn empty() -> Self { + Self(ByteView::new(&[])) + } + + #[doc(hidden)] + #[must_use] + pub fn slice(&self, range: impl std::ops::RangeBounds) -> Self { + Self(self.0.slice(range)) + } + #[must_use] #[doc(hidden)] pub fn with_size(len: usize) -> Self { - // TODO: optimize this with byteview to remove the reallocation - let v = vec![0; len]; - Self(v.into()) + Self(ByteView::with_size(len)) } + /// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes. #[doc(hidden)] pub fn from_reader(reader: &mut R, len: usize) -> std::io::Result { - let mut view = Self::with_size(len); - let builder = Arc::get_mut(&mut view.0).expect("we are the owner"); - reader.read_exact(builder)?; - Ok(view) + let view = ByteView::from_reader(reader, len)?; + Ok(Self(view)) } } -// Arc::from> is specialized +// Arc::from> is specialized impl From> for Slice { fn from(value: Vec) -> Self { - Self(Arc::from(value)) + Self(ByteView::from(value)) } } -// Arc::from> is specialized +// Arc::from> is specialized impl From for Slice { fn from(value: String) -> Self { - Self(Arc::from(value.into_bytes())) - } -} - -// direct conversion -impl From> for Slice { - fn from(value: Arc<[u8]>) -> Self { - Self(value) + Self(ByteView::from(value.into_bytes())) } }