From a6d31289e8f2a642d15d52a0e3c535df41867ee1 Mon Sep 17 00:00:00 2001 From: Stephen Akinyemi Date: Fri, 17 Jan 2025 16:35:18 +0100 Subject: [PATCH] refactor(store): replace Rabin chunker with GearCDC implementation (#109) - Removed RabinChunker unfinished implementation - Added GearCDC chunker with rolling hash implementation - Added comprehensive test suite for GearCDC - Added constants for min/max/desired chunk sizes - Added gear table for rolling hash computation - Updated all references to use new constant name --- Cargo.lock | 1 + monofs/lib/store/flatfsstore.rs | 6 +- monoutils-store/Cargo.toml | 3 + .../lib/implementations/chunkers/constants.rs | 81 +++- .../chunkers/{rabin.rs => fastcdc.rs} | 58 ++- .../lib/implementations/chunkers/fixed.rs | 4 +- .../lib/implementations/chunkers/gearcdc.rs | 435 ++++++++++++++++++ .../lib/implementations/chunkers/mod.rs | 6 +- .../lib/implementations/stores/memstore.rs | 6 +- monoutils/lib/seekable.rs | 10 +- 10 files changed, 579 insertions(+), 31 deletions(-) rename monoutils-store/lib/implementations/chunkers/{rabin.rs => fastcdc.rs} (52%) create mode 100644 monoutils-store/lib/implementations/chunkers/gearcdc.rs diff --git a/Cargo.lock b/Cargo.lock index e19fc45..46a2116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1932,6 +1932,7 @@ dependencies = [ "libipld", "lru", "monoutils", + "rand", "serde", "serde_ipld_dagcbor", "thiserror 2.0.6", diff --git a/monofs/lib/store/flatfsstore.rs b/monofs/lib/store/flatfsstore.rs index 5276687..7d9a63b 100644 --- a/monofs/lib/store/flatfsstore.rs +++ b/monofs/lib/store/flatfsstore.rs @@ -393,7 +393,7 @@ where #[cfg(test)] mod tests { - use monoutils_store::DEFAULT_CHUNK_MAX_SIZE; + use monoutils_store::DEFAULT_MAX_CHUNK_SIZE; use std::fs; use tokio::io::AsyncReadExt; @@ -556,9 +556,9 @@ mod tests { // Verify size limits from chunker assert_eq!( store.get_node_block_max_size(), - Some(DEFAULT_CHUNK_MAX_SIZE) + Some(DEFAULT_MAX_CHUNK_SIZE) ); - assert_eq!(store.get_raw_block_max_size(), Some(DEFAULT_CHUNK_MAX_SIZE)); + assert_eq!(store.get_raw_block_max_size(), Some(DEFAULT_MAX_CHUNK_SIZE)); Ok(()) } diff --git a/monoutils-store/Cargo.toml b/monoutils-store/Cargo.toml index 0389066..2f7c08d 100644 --- a/monoutils-store/Cargo.toml +++ b/monoutils-store/Cargo.toml @@ -27,3 +27,6 @@ thiserror.workspace = true tokio = { workspace = true, features = ["sync"] } tokio-util = { workspace = true, features = ["io"] } monoutils.workspace = true + +[dev-dependencies] +rand.workspace = true diff --git a/monoutils-store/lib/implementations/chunkers/constants.rs b/monoutils-store/lib/implementations/chunkers/constants.rs index 3094db2..261f473 100644 --- a/monoutils-store/lib/implementations/chunkers/constants.rs +++ b/monoutils-store/lib/implementations/chunkers/constants.rs @@ -2,5 +2,82 @@ // Constants //-------------------------------------------------------------------------------------------------- -/// The default chunk size is 512 KiB. -pub const DEFAULT_CHUNK_MAX_SIZE: u64 = 512 * 1024; +/// The default maximum chunk size is 512 KiB. +pub const DEFAULT_MAX_CHUNK_SIZE: u64 = 512 * 1024; // TODO: 2KiB ??? + +/// The default minimum chunk size is 128 KiB. +pub const DEFAULT_MIN_CHUNK_SIZE: u64 = 128 * 1024; // TODO: 4KiB ??? + +/// The default desired chunk size is 256 KiB. +pub const DEFAULT_DESIRED_CHUNK_SIZE: u64 = 256 * 1024; // TODO: 6MiB ??? + +// const DEFAULT_ROLLING_HASH_MASK: u64 = 0x0000000000000000; + +/// The gear table is used to generate the rolling hash mask. +#[rustfmt::skip] +pub static DEFAULT_GEAR_TABLE: [u64; 256] = [ + 0x934dc63f1a2fbc75, 0xe42f5a7b7364d1a7, 0xfcc3a557352ead66, 0x1eb038d4e6c75ae9, + 0x9826d86994d27fd1, 0xdac0fcffe0399894, 0x2288ffe125735827, 0x0bd9fcd34c572187, + 0xbce855206433c277, 0x2d59433a15e2089d, 0x5111df55ce5ba7b8, 0xf4e20ce31df535b6, + 0xfee62fa9ae6a36dd, 0x5cc9a1fdb8e9d39f, 0xa1e2d19ff88c4738, 0x7b88cf0bec4ce823, + 0x37b3f9cb3b3fac27, 0xb8fb20c023c6c8b2, 0x3b42660df117bfd0, 0xafea8a871b318682, + 0x9ba9da395a5730eb, 0x5ba17f0d89de3973, 0x562168e510d6564a, 0x39adf42c51a542a5, + 0xcb4d7e410618e4c3, 0x162519d991abcc83, 0xb37e627f7623a61e, 0x4f3ddbd91e5c87a9, + 0x87e77c296adf3669, 0x587dd0305612722c, 0x063d37b3e59b6989, 0x368e661cf3434448, + 0xa48702aea94a0f46, 0xf8a809dc15c1be18, 0xac677f6498eab68e, 0x77dc02e66a8876bf, + 0xd102089ddf25dce2, 0x47365e75b22f7d4d, 0x8e87402901cd05ce, 0x7cdda092cc7f8fcb, + 0xa1d26bade1302aa5, 0xcfcd30b90314e8c6, 0x1949238103ef41b6, 0x63fc3684a9cc872e, + 0x16a3ac3cd5558592, 0x4ee1228377ea6d00, 0x17b7876d45f54350, 0xcd03b5232c70e911, + 0xc7ef23d2a017c930, 0xb89dc625d3907010, 0x6e438f90f74f615a, 0x2b832703c82b2cea, + 0x0f43485841d49906, 0xcd1bfdd09569132b, 0x95b9270e3e2e9c3a, 0x53b4ca21c14e940b, + 0xa44869f296d575ac, 0x641f05ca74e0355f, 0x939eada2f8e26790, 0x8739f9c4f926c947, + 0x09e7e0dd7a0195f2, 0x7cbde45676dec445, 0xe15b0ea0b16f9556, 0x5591406a6617099b, + 0x9809f6ddf7c32b72, 0xa6b806dbb39fb230, 0xebf0d07769c874a9, 0xf07e343f653edfc9, + 0x63ee20bc3fe1285a, 0xcb04eb23bd48b7e0, 0x4f61c9d4fd7eca50, 0x64c6fb630f22ac50, + 0x142a1f4ea6e7072d, 0x7330ae35c334f2dc, 0xa30bcf22e0d963cf, 0x63eff1b02d9b9dd3, + 0x6b37381dcb7a5d59, 0x545fcc5e4e6d7dab, 0x094f1ad34c3d1f29, 0x76ab9b8f4a0507b1, + 0xcf9181092beb4cc8, 0x5be896ff8506448d, 0xda2a079def5e56a9, 0xcecea52cf7975aa8, + 0xd903d0613a812353, 0x3ae7b86896c8e107, 0x5d893869ec46862a, 0x06dc65ec0f77cf08, + 0x977bb100c58ea221, 0xe21729b833df1401, 0x63639e075487c549, 0x885a2ba929a53b0b, + 0xd4f7253d68e1e693, 0x7b150a9734efe5c6, 0x2d1faf7174838274, 0x12f139bd8d761543, + 0x1cac6aa3bdf8405c, 0x2b1f5cb61b014c0c, 0x6cbe1a0531ea6d45, 0xb038e76dacc2d37e, + 0x87476df20ba6627b, 0x63bcd73d9bcf3854, 0xf8abfb21f28c88a3, 0xb8d1c0eaa2dc3a28, + 0xf416fca050a2ed70, 0x396d53f5e495969a, 0x6093c46ed8df4b09, 0x2ce6ccb27b6da1ba, + 0xd020db26beadd4fd, 0x1c60ede6b88fa4d8, 0xf9cbd1e60f5accea, 0xc9cbe32183cdf780, + 0xbe4248f8e41e896e, 0xcfa0ae01e14b1c52, 0xa7c8c8d2400eea30, 0x229376ca34eaabf5, + 0x3785ec931a7562b0, 0x19ad6f0aa76dda4a, 0x62461f0b1912bd8f, 0x7af96db1a84b1373, + 0x034e7d3e5547fd35, 0x3f0bfd17853f403c, 0x27dcf6026e19d44f, 0x3d658c7ebf2c03fe, + 0x451a37679de76be3, 0xa3f32da1a1559eac, 0xa6d466ee50c1f808, 0xd2166409b67ade33, + 0x57517e23a251c969, 0xda9f98fbb946750c, 0xef05aa950c0165b7, 0xa86aab56733d1d5a, + 0x314546298548db64, 0x4d13562cbf14abda, 0x65aa6e478ef65b80, 0x2b2886ecf0d83d62, + 0x1f957dc7d7e776f5, 0xe9444f727c6c3928, 0xdcceba0a933bce23, 0x4e0b955478834d48, + 0xdbb91b8b0585c263, 0xadc16e742df8698d, 0x1de068add4dddaf2, 0xc00de3921cb3097f, + 0x12f134c19342ef64, 0x1fdf38bbc41c16b8, 0xfebd3b2f77ca7a26, 0x99da3b50c0dd30cf, + 0xc79a3a8b284e2e77, 0x629cb257ac037762, 0x8abbf035d0bb69b0, 0x4adbfae80a85b75f, + 0xc8869cd11e3167a0, 0x550efebb4b07f3ad, 0x3b7f9e1f7cc60e23, 0xca56904382892e6f, + 0x0fadda765c741fdd, 0x81e9daf42ef32abb, 0x1fd30c90e4286783, 0xe7ca9bcaee7d4ea7, + 0x1b79ead70ba01456, 0x7d7eab8593b24d9f, 0x88690a1b301315df, 0x454eca065a5051db, + 0xca4fd9d61385e539, 0x0039e2c776ecef6c, 0xae68842ca70aea0b, 0x863a14316d601bf9, + 0xb8324636dc05022b, 0xaec62a278bded5da, 0xc77e48036be43f9c, 0x08150dec6308334b, + 0xafec69cc55acc2af, 0xbdbec85be0190e88, 0x584cba9a7899b3a7, 0x66691ba4c9628ca6, + 0x3e0edf7ed52faed3, 0x19e0e2487da291f7, 0x86b8f4d78569352d, 0x5d4d7dc9c206d788, + 0x608769243d4b3f55, 0x6a0566f48dcb7e90, 0x176fdeaca7205a42, 0x9435254b909d1679, + 0xe9a7d0c6377b9c39, 0x34b9a1eb78c34c7b, 0x871855f22bc99139, 0xe268ffef1f7f508f, + 0x5f47591fbbb42814, 0x9d795f85f4169dcc, 0xdfba9b3e0fd46412, 0xf85b0e4d4096b048, + 0x444a80bc63e04952, 0xc78dc90c7c5eceea, 0x7e99c7d3ac75ae93, 0x7fdf884d3f190ca3, + 0xf687d347814186ce, 0x63ef30025d6ca1a7, 0xb0d8bb4b66bef836, 0x5b52ef53be745d40, + 0xef31e39969d73dbc, 0xcc93d3ba46730343, 0x9f6628e2bf0d3176, 0xf403b20cf8f06505, + 0x335c5996e49be86f, 0xb2f5feae2de888a3, 0x1250dcef56cb0b11, 0xbb2cf97d8074a80f, + 0x6edd77ae233db9fc, 0x8c0b5dd112fd375b, 0x95f11213c18559e0, 0xc5d0da638b1d16c1, + 0xbe3919a9dc1c940c, 0x2c42bd5a5fb58291, 0x29349a51ffd60426, 0x0268f6100054560b, + 0x222767d096123693, 0x6eebef90a17e8771, 0x88ab85f7c1e03711, 0xf24725946153b4d4, + 0x4e2e482cd2b128c7, 0x9b33480e5905029e, 0x13a82c68d4ed7d6d, 0x896f727adcdd1607, + 0x084182eafda7e6d7, 0xf8d3761f72a21cd3, 0xafb1d5dd9406a46c, 0xac614b3a434c44bb, + 0xf0c2e04823152cd7, 0xfc0bc19a0ba17bcb, 0xe37cb42de36f49ea, 0x5edd06863e9f2488, + 0xd1242ef181256605, 0xc3c8b875cc5d7483, 0xac75a2c774350d19, 0xa918f4209d2d1219, + 0x70cf0f8d1e95c41f, 0xe2c063b89ceb35f2, 0x6c3b8e5b1dba4cd1, 0x848399c9cc10d552, + 0x63f6cb62d5f088a8, 0x3fa09e3bad673bd8, 0xbafc8915513554c4, 0xe062541c7d11ba74, + 0xd2a45c35598a5c8d, 0x8df580afac8f80d6, 0x589735cd31830eaf, 0x005533d654d3b4b2, + 0xb8d133c02207fa33, 0xecbfffd864b2e87c, 0xb240770c24e5cf38, 0xaf8c8783788bb279, + 0x47d0d26806edfd85, 0x8f218e6cc4a793c6, 0xd33257b8e2091db6, 0xf3cc67629aff40d3, +]; diff --git a/monoutils-store/lib/implementations/chunkers/rabin.rs b/monoutils-store/lib/implementations/chunkers/fastcdc.rs similarity index 52% rename from monoutils-store/lib/implementations/chunkers/rabin.rs rename to monoutils-store/lib/implementations/chunkers/fastcdc.rs index 44c70de..72fbc77 100644 --- a/monoutils-store/lib/implementations/chunkers/rabin.rs +++ b/monoutils-store/lib/implementations/chunkers/fastcdc.rs @@ -4,31 +4,48 @@ use tokio::io::AsyncRead; use crate::{Chunker, StoreResult}; -use super::DEFAULT_CHUNK_MAX_SIZE; +use super::{ + constants::DEFAULT_MAX_CHUNK_SIZE, DEFAULT_DESIRED_CHUNK_SIZE, DEFAULT_GEAR_TABLE, + DEFAULT_MIN_CHUNK_SIZE, +}; //-------------------------------------------------------------------------------------------------- // Types //-------------------------------------------------------------------------------------------------- -/// A chunker that splits data into variable-size chunks using the Rabin fingerprinting algorithm. -/// -/// The `RabinChunker` leverages the Rabin fingerprinting technique to produce chunks of data with -/// variable sizes. This algorithm is particularly effective for identifying duplicate content within -/// files, as well as across different files, by creating consistent chunk boundaries. The resulting -/// chunks are then processed and stored in an IPLD form. -pub struct RabinChunker { - /// The size of each chunk. - chunk_size: u64, +/// A chunker that splits data into variable-size chunks using the FastCDC algorithm. +pub struct FastCDC { + /// The gear table. + gear_table: [u64; 256], + + /// The desired chunk size. + desired_chunk_size: u64, + + /// The minimum size of each chunk. + min_chunk_size: u64, + + /// The maximum size of each chunk. + max_chunk_size: u64, } //-------------------------------------------------------------------------------------------------- // Methods //-------------------------------------------------------------------------------------------------- -impl RabinChunker { - /// Creates a new `RabinChunker` with the given `chunk_size`. - pub fn new(chunk_size: u64) -> Self { - Self { chunk_size } +impl FastCDC { + /// Creates a new `FastCDC` with the given `min_size` and `max_size`. + pub fn new( + desired_chunk_size: u64, + min_chunk_size: u64, + max_chunk_size: u64, + gear_table: [u64; 256], + ) -> Self { + Self { + gear_table, + desired_chunk_size, + min_chunk_size, + max_chunk_size, + } } } @@ -36,7 +53,7 @@ impl RabinChunker { // Trait Implementations //-------------------------------------------------------------------------------------------------- -impl Chunker for RabinChunker { +impl Chunker for FastCDC { async fn chunk<'a>( &self, _reader: impl AsyncRead + Send + 'a, @@ -46,12 +63,17 @@ impl Chunker for RabinChunker { } fn chunk_max_size(&self) -> Option { - Some(self.chunk_size) + Some(self.max_chunk_size) } } -impl Default for RabinChunker { +impl Default for FastCDC { fn default() -> Self { - Self::new(DEFAULT_CHUNK_MAX_SIZE) + Self::new( + DEFAULT_DESIRED_CHUNK_SIZE, + DEFAULT_MIN_CHUNK_SIZE, + DEFAULT_MAX_CHUNK_SIZE, + DEFAULT_GEAR_TABLE, + ) } } diff --git a/monoutils-store/lib/implementations/chunkers/fixed.rs b/monoutils-store/lib/implementations/chunkers/fixed.rs index 3a67b98..db64235 100644 --- a/monoutils-store/lib/implementations/chunkers/fixed.rs +++ b/monoutils-store/lib/implementations/chunkers/fixed.rs @@ -7,7 +7,7 @@ use tokio::io::{AsyncRead, AsyncReadExt}; use crate::{Chunker, StoreError, StoreResult}; -use super::DEFAULT_CHUNK_MAX_SIZE; +use super::constants::DEFAULT_MAX_CHUNK_SIZE; //-------------------------------------------------------------------------------------------------- // Types @@ -71,7 +71,7 @@ impl Chunker for FixedSizeChunker { impl Default for FixedSizeChunker { fn default() -> Self { - Self::new(DEFAULT_CHUNK_MAX_SIZE) + Self::new(DEFAULT_MAX_CHUNK_SIZE) } } diff --git a/monoutils-store/lib/implementations/chunkers/gearcdc.rs b/monoutils-store/lib/implementations/chunkers/gearcdc.rs new file mode 100644 index 0000000..a9c8042 --- /dev/null +++ b/monoutils-store/lib/implementations/chunkers/gearcdc.rs @@ -0,0 +1,435 @@ +use std::pin::pin; + +use async_stream::try_stream; +use bytes::Bytes; +use futures::stream::BoxStream; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use crate::{Chunker, StoreError, StoreResult}; + +use super::{DEFAULT_DESIRED_CHUNK_SIZE, DEFAULT_GEAR_TABLE}; + +//-------------------------------------------------------------------------------------------------- +// Types +//-------------------------------------------------------------------------------------------------- + +/// A content-defined chunking (CDC) implementation that uses a gear-based rolling hash to identify +/// chunk boundaries. +/// +/// CDC is a method of splitting data into chunks based on its content rather than fixed positions. +/// This results in better deduplication as insertions or deletions only affect nearby chunks rather +/// than shifting all subsequent chunk boundaries. +/// +/// The gear-based CDC algorithm works by: +/// 1. Computing a rolling hash over a sliding window of bytes +/// 2. Using the gear table to generate pseudo-random values for each byte +/// 3. Declaring a chunk boundary when the hash meets certain criteria (specific bits are zero) +/// +/// The average chunk size is controlled by the `desired_chunk_size` parameter, though actual +/// chunk sizes will vary based on content. +pub struct GearCDC { + /// The gear table used to generate pseudo-random values for each byte. + /// Each byte maps to a 64-bit value that contributes to the rolling hash. + gear_table: [u64; 256], + + /// The target average chunk size. + /// The actual chunk size will vary based on content, but will average around this value. + desired_chunk_size: u64, +} + +/// A rolling hash implementation used by [`GearCDC`] to identify chunk boundaries. +/// +/// The gear hash maintains a running hash value that is efficiently updated as new bytes +/// are processed. It uses a pre-computed gear table to map each input byte to a pseudo-random +/// value, which helps ensure an even distribution of chunk boundaries. +/// +/// The hash is updated using three components: +/// 1. Left-shifting the current hash by 1 bit (`hash << 1`) +/// 2. XORing with the gear table value for the new byte +/// 3. XORing with the top 11 bits of the hash (`hash >> 53`) +/// +/// The third component is crucial for handling real-world data: +/// - For random data, even a simple rolling hash (just components 1 and 2) would work well +/// - However, real data often contains repetitive patterns (e.g., repeated HTML tags, log lines) +/// - The feedback from high bits (component 3) provides additional mixing that helps break up +/// these patterns, ensuring we still get reasonable chunk boundaries even with repetitive data +/// +/// This implementation ensures robust chunking behavior for both random and non-random data, +/// which is essential for effective content-defined chunking in real-world applications. +pub struct GearHasher { + /// The gear table maps each possible byte value to a pseudo-random 64-bit number. + /// This helps ensure an even distribution of hash values. + gear_table: [u64; 256], + + /// The current hash value, updated as new bytes are processed. + /// The hash update includes feedback from high bits to ensure good mixing + /// even with repetitive data patterns. + hash: u64, +} + +//-------------------------------------------------------------------------------------------------- +// Methods +//-------------------------------------------------------------------------------------------------- + +impl GearCDC { + /// Creates a new `GearCDC` with the given `desired_chunk_size`. + pub fn new(desired_chunk_size: u64, gear_table: [u64; 256]) -> Self { + Self { + gear_table, + desired_chunk_size, + } + } +} + +impl GearHasher { + /// Creates a new `GearHasher` with the given `desired_chunk_size`. + pub fn new(gear_table: [u64; 256]) -> Self { + Self { + gear_table, + hash: 0, + } + } + + /// Updates the rolling hash with a new byte. + /// + /// The update process combines three operations: + /// 1. `hash << 1`: Shifts existing hash left, making room for new information + /// 2. `^ gear_table[byte]`: Incorporates the new byte's pseudo-random value + /// 3. `^ (hash >> 53)`: Feeds back high bits for better mixing + /// + /// Visually, the process looks like this: + /// ```text + /// Original hash: + /// ┌─────────────────────────────────────────┐ + /// │ bits [63 .. 0] │ + /// └─────────────────────────────────────────┘ + /// + /// After left shift (hash << 1): + /// ┌─────────────────────────────────────────┐ + /// │ bits [62 .. 0] 0 │ (the left bit is gone, the right is 0) + /// └─────────────────────────────────────────┘ + /// + /// High bits feedback (hash >> 53): + /// ┌─────────────────────────────────────────┐ + /// │ bits [63 .. 53] (rest are 0) │ (~the top 11 bits) + /// └─────────────────────────────────────────┘ + /// ``` + /// + /// The feedback from high bits (component 3) is particularly important for handling + /// repetitive data patterns. Without it, repeated sequences might not generate + /// enough variation in the lower bits to create chunk boundaries at the desired + /// frequency. This extra mixing ensures robust chunking even with non-random data. + pub fn roll(&mut self, byte: u8) { + self.hash = (self.hash << 1) ^ self.gear_table[byte as usize] ^ (self.hash >> 53); + } + + /// Returns the current hash value + pub fn fingerprint(&self) -> u64 { + self.hash + } + + /// Checks if the current hash indicates a chunk boundary + /// A chunk boundary is determined by checking if the lowest bits of the hash are all zeros + pub fn boundary_check(&self, mask: u64) -> bool { + (self.hash & mask) == 0 + } +} + +//-------------------------------------------------------------------------------------------------- +// Functions +//-------------------------------------------------------------------------------------------------- + +/// Converts a desired chunk size to a bit mask where the lowest log2(size) bits are set to 1. +/// This mask is used to determine chunk boundaries in content-defined chunking. +/// +/// For example: +/// - If size is 8192 (2^13), returns a mask with 13 lowest bits set: 0x1FFF +/// - If size is 16384 (2^14), returns a mask with 14 lowest bits set: 0x3FFF +/// +/// # Panics +/// Panics if size is 0 or greater than 2^63 (as it would exceed u64 capacity) +pub fn size_to_mask(size: u64) -> u64 { + assert!( + size > 0 && size <= (1 << 63), + "size must be between 1 and 2^63" + ); + + // If size == 1, the doc/tests want 1 bit => 0b1 = 1, even though log2(1) = 0. + if size == 1 { + return 0b1; + } + + // Round up to next power of two + let p = size.next_power_of_two(); // e.g. 7 -> 8, 9 -> 16, etc. + let bits = p.trailing_zeros(); // number of bits = log2(p) + (1 << bits) - 1 +} + +//-------------------------------------------------------------------------------------------------- +// Trait Implementations +//-------------------------------------------------------------------------------------------------- + +impl Chunker for GearCDC { + async fn chunk<'a>( + &self, + reader: impl AsyncRead + Send + 'a, + ) -> StoreResult>> { + let mask = size_to_mask(self.desired_chunk_size); + let gear_table = self.gear_table; + + let s = try_stream! { + let mut reader = pin!(reader); + let mut current_chunk = Vec::new(); + let mut hasher = GearHasher::new(gear_table); + let mut buffer = [0u8; 8192]; // Read in 8KB chunks + + loop { + let n = reader.read(&mut buffer).await.map_err(StoreError::custom)?; + if n == 0 { + // End of input - yield remaining bytes as final chunk if any + if !current_chunk.is_empty() { + yield Bytes::from(current_chunk); + } + break; + } + + // Process each byte, looking for chunk boundaries + for &byte in &buffer[..n] { + current_chunk.push(byte); + hasher.roll(byte); + + // Check if we've hit a chunk boundary + if hasher.boundary_check(mask) && !current_chunk.is_empty() { + yield Bytes::from(current_chunk); + current_chunk = Vec::new(); + } + } + } + }; + + Ok(Box::pin(s)) + } + + fn chunk_max_size(&self) -> Option { + None // Variable-size chunks don't have a fixed maximum size + } +} + +impl Default for GearCDC { + fn default() -> Self { + Self::new(DEFAULT_DESIRED_CHUNK_SIZE, DEFAULT_GEAR_TABLE) + } +} + +impl Default for GearHasher { + fn default() -> Self { + Self::new(DEFAULT_GEAR_TABLE) + } +} + +//-------------------------------------------------------------------------------------------------- +// Tests +//-------------------------------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + #[test] + fn test_size_to_mask() { + // Test powers of 2 + assert_eq!(size_to_mask(8), 0b111); // 3 bits for size 8 (2^3) + assert_eq!(size_to_mask(16), 0b1111); // 4 bits for size 16 (2^4) + assert_eq!(size_to_mask(8192), 0x1FFF); // 13 bits for size 8192 (2^13) + assert_eq!(size_to_mask(16384), 0x3FFF); // 14 bits for size 16384 (2^14) + + // Test non-powers of 2 (should round up to next power) + assert_eq!(size_to_mask(7), 0b111); // round up to 8 => 3 bits + assert_eq!(size_to_mask(9), 0b1111); // round up to 16 => 4 bits + assert_eq!(size_to_mask(8000), 0x1FFF); // round up to 8192 => 13 bits + + // Test edge cases + assert_eq!(size_to_mask(1), 0b1); // special-cased => 1 bit + assert_eq!(size_to_mask(2), 0b1); // log2(2) => 1 bit + } + + #[test] + #[should_panic(expected = "size must be between 1 and 2^63")] + fn test_size_to_mask_zero() { + size_to_mask(0); + } + + #[test] + #[should_panic(expected = "size must be between 1 and 2^63")] + fn test_size_to_mask_too_large() { + size_to_mask((1 << 63) + 1); + } + + #[test] + fn test_gear_hasher() { + // Create a simple gear table for testing + let mut gear_table = [0u64; 256]; + for i in 0..256 { + gear_table[i] = i as u64; + } + + let mut hasher = GearHasher::new(gear_table); + + // Test initial state + assert_eq!(hasher.fingerprint(), 0); + + // Test single byte + hasher.roll(1); + // hash = (0 << 1) ^ 1 ^ (0 >> 53) = 1 + assert_eq!(hasher.fingerprint(), 1); + + // Test multiple bytes + hasher.roll(2); + // hash = (1 << 1) ^ 2 ^ (1 >> 53) = 2 ^ 2 ^ 0 = 0 + assert_eq!(hasher.fingerprint(), 0); + + hasher.roll(3); + // hash = (0 << 1) ^ 3 ^ (0 >> 53) = 3 + assert_eq!(hasher.fingerprint(), 3); + + // Test boundary check + assert!(hasher.boundary_check(0)); // Always true for mask 0 + assert!(!hasher.boundary_check(0x2)); // 3 & 0010 != 0 + assert!(hasher.boundary_check(0x4)); // 3 & 0100 == 0 + } + + #[test] + fn test_gear_hasher_wrapping() { + let gear_table = [1u64; 256]; // All 1s for simplicity + let mut hasher = GearHasher::new(gear_table); + + // Roll enough times to cause wrapping + for _ in 0..100 { + hasher.roll(0); + } + + // The hash should still be valid (not panicked) + let _ = hasher.fingerprint(); + } + + #[tokio::test] + async fn test_gearcdc_basic_chunking() -> anyhow::Result<()> { + // Create repeatable data that should trigger chunk boundaries + let data = b"abcdefghijklmnopqrstuvwxyz".repeat(100); + + // Use a simple gear table where each byte maps to itself + // This makes boundary detection more predictable for testing + let mut gear_table = [0u64; 256]; + for i in 0..256 { + gear_table[i] = i as u64; + } + + let chunker = GearCDC::new(16, gear_table); // Small size for testing + let mut chunk_stream = chunker.chunk(&data[..]).await?; + let mut chunks = Vec::new(); + + while let Some(chunk) = chunk_stream.next().await { + chunks.push(chunk?); + } + + // Basic assertions + assert!(!chunks.is_empty(), "Should produce at least one chunk"); + assert_eq!( + chunks.iter().map(|c| c.len()).sum::(), + data.len(), + "Total chunked data should equal input size" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_gearcdc_empty_input() -> anyhow::Result<()> { + let data = b""; + let chunker = GearCDC::default(); + let mut chunk_stream = chunker.chunk(&data[..]).await?; + + assert!( + chunk_stream.next().await.is_none(), + "Empty input should produce no chunks" + ); + Ok(()) + } + + #[tokio::test] + async fn test_gearcdc_single_byte() -> anyhow::Result<()> { + let data = b"a"; + let chunker = GearCDC::default(); + let mut chunk_stream = chunker.chunk(&data[..]).await?; + + let chunk = chunk_stream.next().await.unwrap()?; + assert_eq!( + chunk.as_ref(), + b"a", + "Single byte should be returned as one chunk" + ); + assert!( + chunk_stream.next().await.is_none(), + "Should only produce one chunk" + ); + Ok(()) + } + + #[tokio::test] + async fn test_gearcdc_chunk_distribution() -> anyhow::Result<()> { + use rand::rngs::StdRng; + use rand::{Rng, SeedableRng}; + + // Test both random and repeating data to verify our high-bit feedback mechanism + let test_cases = vec![ + ("random", { + let mut rng = StdRng::seed_from_u64(12345); + (0..100_000).map(|_| rng.gen()).collect::>() + }), + ("repeating", { + (0..100_000).map(|i| (i % 251) as u8).collect::>() + }), + ]; + + for (data_type, data) in test_cases { + let chunker = GearCDC::new(1024, DEFAULT_GEAR_TABLE); + let mut chunk_stream = chunker.chunk(&data[..]).await?; + let mut chunk_sizes = Vec::new(); + + while let Some(chunk) = chunk_stream.next().await { + chunk_sizes.push(chunk?.len()); + } + + // Verify chunk size distribution + assert!( + !chunk_sizes.is_empty(), + "Should produce chunks for {} data", + data_type + ); + + // Calculate average chunk size + let avg_size: f64 = chunk_sizes.iter().sum::() as f64 / chunk_sizes.len() as f64; + + // Most chunks should be "near" the target size + // Allow for some variance since it's content-defined + let target = 1024.0; + assert!( + (avg_size - target).abs() < target * 0.5, + "Average chunk size for {} data ({}) should be roughly near target size ({})", + data_type, + avg_size, + target + ); + + // Print distribution statistics for debugging + println!("\n{} data statistics:", data_type); + println!("Number of chunks: {}", chunk_sizes.len()); + println!("Average chunk size: {:.2}", avg_size); + println!("Min chunk size: {}", chunk_sizes.iter().min().unwrap()); + println!("Max chunk size: {}", chunk_sizes.iter().max().unwrap()); + } + + Ok(()) + } +} diff --git a/monoutils-store/lib/implementations/chunkers/mod.rs b/monoutils-store/lib/implementations/chunkers/mod.rs index 2d33fba..e2d98ee 100644 --- a/monoutils-store/lib/implementations/chunkers/mod.rs +++ b/monoutils-store/lib/implementations/chunkers/mod.rs @@ -1,6 +1,7 @@ mod constants; mod fixed; -mod rabin; +mod fastcdc; +mod gearcdc; //-------------------------------------------------------------------------------------------------- // Exports @@ -8,4 +9,5 @@ mod rabin; pub use constants::*; pub use fixed::*; -pub use rabin::*; +pub use fastcdc::*; +pub use gearcdc::*; diff --git a/monoutils-store/lib/implementations/stores/memstore.rs b/monoutils-store/lib/implementations/stores/memstore.rs index dfd9c4c..325abd0 100644 --- a/monoutils-store/lib/implementations/stores/memstore.rs +++ b/monoutils-store/lib/implementations/stores/memstore.rs @@ -246,7 +246,7 @@ impl Default for MemoryStore { #[cfg(test)] mod tests { - use crate::DEFAULT_CHUNK_MAX_SIZE; + use crate::DEFAULT_MAX_CHUNK_SIZE; use super::fixtures::TestNode; use super::*; @@ -357,9 +357,9 @@ mod tests { // Verify size limits from chunker assert_eq!( store.get_node_block_max_size(), - Some(DEFAULT_CHUNK_MAX_SIZE) + Some(DEFAULT_MAX_CHUNK_SIZE) ); - assert_eq!(store.get_raw_block_max_size(), Some(DEFAULT_CHUNK_MAX_SIZE)); + assert_eq!(store.get_raw_block_max_size(), Some(DEFAULT_MAX_CHUNK_SIZE)); Ok(()) } diff --git a/monoutils/lib/seekable.rs b/monoutils/lib/seekable.rs index a012bff..1416059 100644 --- a/monoutils/lib/seekable.rs +++ b/monoutils/lib/seekable.rs @@ -3,7 +3,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; //-------------------------------------------------------------------------------------------------- // Types @@ -13,6 +13,10 @@ use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; #[derive(Debug)] pub struct EmptySeekableReader; +/// A seekable writer that always writes zero bytes and reports position as 0. +#[derive(Debug)] +pub struct EmptySeekableWriter; + //-------------------------------------------------------------------------------------------------- // Traits //-------------------------------------------------------------------------------------------------- @@ -20,12 +24,16 @@ pub struct EmptySeekableReader; /// A trait that extends the `AsyncRead` and `AsyncSeek` traits to allow for seeking. pub trait SeekableReader: AsyncRead + AsyncSeek {} +pub trait SeekableWriter: AsyncWrite + AsyncSeek {} + //-------------------------------------------------------------------------------------------------- // Trait Implementations //-------------------------------------------------------------------------------------------------- impl SeekableReader for T where T: AsyncRead + AsyncSeek {} +impl SeekableWriter for T where T: AsyncWrite + AsyncSeek {} + // Implement AsyncRead by always reading zero bytes impl AsyncRead for EmptySeekableReader { fn poll_read(