diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index be5db7c6a36e..4ab65a6e97fd 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -262,3 +262,85 @@ impl MemtableBuilder for MergeTreeMemtableBuilder { )) } } + +#[cfg(test)] +mod tests { + use common_time::Timestamp; + + use super::*; + use crate::test_util::memtable_util; + + #[test] + fn test_memtable_sorted_input() { + write_sorted_input(true); + write_sorted_input(false); + } + + fn write_sorted_input(has_pk: bool) { + let metadata = if has_pk { + memtable_util::metadata_with_primary_key(vec![1, 0], true) + } else { + memtable_util::metadata_with_primary_key(vec![], false) + }; + let timestamps = (0..100).collect::>(); + let kvs = + memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); + let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default()); + memtable.write(&kvs).unwrap(); + + // TODO(yingwen): Test iter. + + let stats = memtable.stats(); + assert!(stats.bytes_allocated() > 0); + assert_eq!( + Some(( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(99) + )), + stats.time_range() + ); + } + + #[test] + fn test_memtable_unsorted_input() { + write_iter_unsorted_input(true); + write_iter_unsorted_input(false); + } + + fn write_iter_unsorted_input(has_pk: bool) { + let metadata = if has_pk { + memtable_util::metadata_with_primary_key(vec![1, 0], true) + } else { + memtable_util::metadata_with_primary_key(vec![], false) + }; + let memtable = + MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[1, 3, 7, 5, 6], + 0, // sequence 0, 1, 2, 3, 4 + ); + memtable.write(&kvs).unwrap(); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[5, 2, 4, 0, 7], + 5, // sequence 5, 6, 7, 8, 9 + ); + memtable.write(&kvs).unwrap(); + + // TODO(yingwen): Test iter. + + let stats = memtable.stats(); + assert!(stats.bytes_allocated() > 0); + assert_eq!( + Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))), + stats.time_range() + ); + } +} diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 96db38f673f4..f414ca38f99a 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -43,10 +43,13 @@ use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger}; -use crate::memtable::merge_tree::{PkId, PkIndex}; +use crate::memtable::merge_tree::PkIndex; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; +/// Initial capacity for the data buffer. +pub(crate) const DATA_INIT_CAP: usize = 8; + /// Data part batches returns by `DataParts::read`. #[derive(Debug, Clone)] pub struct DataBatch { @@ -128,9 +131,9 @@ impl DataBuffer { } /// Writes a row to data buffer. - pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) { + pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { self.ts_builder.push_value_ref(kv.timestamp()); - self.pk_index_builder.push(Some(pk_id.pk_index)); + self.pk_index_builder.push(Some(pk_index)); self.sequence_builder.push(Some(kv.sequence())); self.op_type_builder.push(Some(kv.op_type() as u8)); @@ -662,9 +665,9 @@ pub struct ParquetPart { /// Data parts under a shard. pub struct DataParts { /// The active writing buffer. - pub(crate) active: DataBuffer, + active: DataBuffer, /// immutable (encoded) parts. - pub(crate) frozen: Vec, + frozen: Vec, } impl DataParts { @@ -675,9 +678,14 @@ impl DataParts { } } - /// Writes one row into active part. - pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) { - self.active.write_row(pk_id, kv) + pub(crate) fn with_frozen(mut self, frozen: Vec) -> Self { + self.frozen = frozen; + self + } + + /// Writes a row into parts. + pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { + self.active.write_row(pk_index, kv) } /// Freezes the active data buffer into frozen data parts. @@ -932,13 +940,7 @@ mod tests { ); for kv in kvs.iter() { - buffer.write_row( - PkId { - shard_id: 0, - pk_index, - }, - kv, - ); + buffer.write_row(pk_index, kv); } } diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 43a53bd494f6..5c1c3c3a57f6 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -61,19 +61,21 @@ impl KeyDictBuilder { self.pk_to_index.get(key).copied() } + /// Returns true if the builder is full. + pub fn is_full(&self) -> bool { + self.num_keys >= self.capacity + } + /// Adds the key to the builder and returns its index if the builder is not full. /// - /// Returns `None` if the builder is full. - pub fn try_insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> Option { + /// # Panics + /// Panics if the builder is full. + pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex { + assert!(!self.is_full()); + if let Some(pk_index) = self.pk_to_index.get(key).copied() { // Already in the builder. - return Some(pk_index); - } - - // A new key. - if self.num_keys >= self.capacity { - // The builder is full. - return None; + return pk_index; } if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() { @@ -91,7 +93,7 @@ impl KeyDictBuilder { metrics.key_bytes += key.len() * 2; self.key_bytes_in_index += key.len(); - Some(pk_index) + pk_index } /// Memory size of the builder. @@ -129,11 +131,12 @@ impl KeyDictBuilder { pk_to_index, dict_blocks: std::mem::take(&mut self.dict_blocks), key_positions, + key_bytes_in_index: self.key_bytes_in_index, }) } - /// Scans the builder. - pub fn scan(&self) -> DictBuilderReader { + /// Reads the builder. + pub fn read(&self) -> DictBuilderReader { let sorted_pk_indices = self.pk_to_index.values().copied().collect(); let block = self.key_buffer.finish_cloned(); let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1); @@ -162,38 +165,46 @@ impl DictBuilderReader { } } - /// Returns true if the item in the reader is valid. - pub fn is_valid(&self) -> bool { - self.offset < self.sorted_pk_indices.len() + /// Returns the number of keys. + pub fn num_keys(&self) -> usize { + self.sorted_pk_indices.len() + } + + /// Gets the i-th pk index. + pub fn pk_index(&self, offset: usize) -> PkIndex { + self.sorted_pk_indices[offset] } - /// Returns current key. - pub fn current_key(&self) -> &[u8] { - let pk_index = self.current_pk_index(); + /// Gets the i-th key. + pub fn key(&self, offset: usize) -> &[u8] { + let pk_index = self.pk_index(offset); self.key_by_pk_index(pk_index) } - /// Returns current [PkIndex] of the key. - pub fn current_pk_index(&self) -> PkIndex { - assert!(self.is_valid()); - self.sorted_pk_indices[self.offset] + /// Gets the key by the pk index. + pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] { + let block_idx = pk_index / MAX_KEYS_PER_BLOCK; + self.blocks[block_idx as usize].key_by_pk_index(pk_index) } - /// Advances the reader. - pub fn next(&mut self) { - assert!(self.is_valid()); - self.offset += 1; + /// Returns pk weights to sort a data part and replaces pk indices. + pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { + compute_pk_weights(&self.sorted_pk_indices) } /// Returns pk indices sorted by keys. pub(crate) fn sorted_pk_index(&self) -> &[PkIndex] { &self.sorted_pk_indices } +} - fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] { - let block_idx = pk_index / MAX_KEYS_PER_BLOCK; - self.blocks[block_idx as usize].key_by_pk_index(pk_index) +/// Returns pk weights to sort a data part and replaces pk indices. +fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec { + let mut pk_weights = vec![0; sorted_pk_indices.len()]; + for (weight, pk_index) in sorted_pk_indices.iter().enumerate() { + pk_weights[*pk_index as usize] = weight as u16; } + pk_weights } /// A key dictionary. @@ -206,6 +217,7 @@ pub struct KeyDict { dict_blocks: Vec, /// Maps pk index to position of the key in [Self::dict_blocks]. key_positions: Vec, + key_bytes_in_index: usize, } pub type KeyDictRef = Arc; @@ -220,6 +232,21 @@ impl KeyDict { let block_index = position / MAX_KEYS_PER_BLOCK; self.dict_blocks[block_index as usize].key_by_pk_index(position) } + + /// Gets the pk index by the key. + pub fn get_pk_index(&self, key: &[u8]) -> Option { + self.pk_to_index.get(key).copied() + } + + /// Returns pk weights to sort a data part and replaces pk indices. + pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { + compute_pk_weights(&self.key_positions) + } + + /// Returns the shared memory size. + pub(crate) fn shared_memory_size(&self) -> usize { + self.key_bytes_in_index + } } /// Buffer to store unsorted primary keys. @@ -364,7 +391,8 @@ mod tests { let mut last_pk_index = None; let mut metrics = WriteMetrics::default(); for key in &keys { - let pk_index = builder.try_insert_key(key, &mut metrics).unwrap(); + assert!(!builder.is_full()); + let pk_index = builder.insert_key(key, &mut metrics); last_pk_index = Some(pk_index); } assert_eq!(num_keys - 1, last_pk_index.unwrap()); @@ -379,10 +407,9 @@ mod tests { expect.sort_unstable_by(|a, b| a.0.cmp(&b.0)); let mut result = Vec::with_capacity(expect.len()); - let mut reader = builder.scan(); - while reader.is_valid() { - result.push((reader.current_key().to_vec(), reader.current_pk_index())); - reader.next(); + let reader = builder.read(); + for i in 0..reader.num_keys() { + result.push((reader.key(i).to_vec(), reader.pk_index(i))); } assert_eq!(expect, result); } @@ -397,9 +424,7 @@ mod tests { for i in 0..num_keys { // Each key is 5 bytes. let key = format!("{i:05}"); - builder - .try_insert_key(key.as_bytes(), &mut metrics) - .unwrap(); + builder.insert_key(key.as_bytes(), &mut metrics); } // num_keys * 5 * 2 assert_eq!(5130, metrics.key_bytes); diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 7f54183cdd91..c758d3ecd909 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -407,7 +407,6 @@ mod tests { use super::*; use crate::memtable::merge_tree::data::DataBuffer; - use crate::memtable::merge_tree::PkId; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; fn write_rows_to_buffer( @@ -429,13 +428,7 @@ mod tests { ); for kv in kvs.iter() { - buffer.write_row( - PkId { - shard_id: 0, - pk_index, - }, - kv, - ); + buffer.write_row(pk_index, kv); } *sequence += rows; diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 69c92ff69f3a..dc817d134ded 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -26,6 +26,7 @@ use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; use crate::memtable::merge_tree::shard_builder::ShardBuilder; @@ -41,8 +42,12 @@ pub struct Partition { impl Partition { /// Creates a new partition. - pub fn new(_metadata: RegionMetadataRef, _config: &MergeTreeConfig) -> Self { - unimplemented!() + pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { + let shard_builder = ShardBuilder::new(metadata.clone(), config); + + Partition { + inner: RwLock::new(Inner::new(metadata, shard_builder)), + } } /// Writes to the partition with a primary key. @@ -56,40 +61,37 @@ impl Partition { // Now we ensure one key only exists in one shard. if let Some(pk_id) = inner.find_key_in_shards(primary_key) { // Key already in shards. - return inner.write_to_shard(pk_id, key_value); + inner.write_to_shard(pk_id, key_value); + return Ok(()); } if inner.shard_builder.should_freeze() { - let shard_id = inner.active_shard_id; - let shard = inner.shard_builder.finish(shard_id)?; - inner.active_shard_id += 1; - inner.shards.push(shard); + inner.freeze_active_shard()?; } // Write to the shard builder. inner .shard_builder - .write_with_key(primary_key, key_value, metrics)?; + .write_with_key(primary_key, key_value, metrics); + inner.num_rows += 1; Ok(()) } /// Writes to the partition without a primary key. - pub fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> { + pub fn write_no_key(&self, key_value: KeyValue) { let mut inner = self.inner.write().unwrap(); // If no primary key, always write to the first shard. - if inner.shards.is_empty() { - let shard_id = inner.active_shard_id; - inner.shards.push(Shard::new_no_dict(shard_id)); - inner.active_shard_id += 1; - } + debug_assert!(!inner.shards.is_empty()); + debug_assert_eq!(1, inner.active_shard_id); // A dummy pk id. let pk_id = PkId { - shard_id: inner.active_shard_id - 1, + shard_id: 0, pk_index: 0, }; - inner.shards[0].write_key_value(pk_id, key_value, metrics) + inner.shards[0].write_with_pk_id(pk_id, key_value); + inner.num_rows += 1; } /// Scans data in the partition. @@ -103,22 +105,47 @@ impl Partition { /// Freezes the partition. pub fn freeze(&self) -> Result<()> { - unimplemented!() + let mut inner = self.inner.write().unwrap(); + inner.freeze_active_shard()?; + Ok(()) } /// Forks the partition. - pub fn fork(&self, _metadata: &RegionMetadataRef) -> Partition { - unimplemented!() + pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition { + let inner = self.inner.read().unwrap(); + // TODO(yingwen): TTL or evict shards. + let shard_builder = ShardBuilder::new(metadata.clone(), config); + let shards = inner + .shards + .iter() + .map(|shard| shard.fork(metadata.clone())) + .collect(); + + Partition { + inner: RwLock::new(Inner { + metadata: metadata.clone(), + shard_builder, + active_shard_id: inner.active_shard_id, + shards, + num_rows: 0, + }), + } } /// Returns true if the partition has data. pub fn has_data(&self) -> bool { - unimplemented!() + let inner = self.inner.read().unwrap(); + inner.num_rows > 0 } /// Returns shared memory size of the partition. pub fn shared_memory_size(&self) -> usize { - unimplemented!() + let inner = self.inner.read().unwrap(); + inner + .shards + .iter() + .map(|shard| shard.shared_memory_size()) + .sum() } /// Get partition key from the key value. @@ -160,17 +187,37 @@ pub type PartitionRef = Arc; /// /// A key only exists in one shard. struct Inner { + metadata: RegionMetadataRef, /// Shard whose dictionary is active. shard_builder: ShardBuilder, active_shard_id: ShardId, - /// Shards with frozon dictionary. + /// Shards with frozen dictionary. shards: Vec, + num_rows: usize, } impl Inner { + fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder) -> Self { + let mut inner = Self { + metadata, + shard_builder, + active_shard_id: 0, + shards: Vec::new(), + num_rows: 0, + }; + + if inner.metadata.primary_key.is_empty() { + let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP); + inner.shards.push(Shard::new(0, None, data_parts)); + inner.active_shard_id = 1; + } + + inner + } + fn find_key_in_shards(&self, primary_key: &[u8]) -> Option { for shard in &self.shards { - if let Some(pkid) = shard.find_key(primary_key) { + if let Some(pkid) = shard.find_id_by_key(primary_key) { return Some(pkid); } } @@ -178,7 +225,24 @@ impl Inner { None } - fn write_to_shard(&mut self, _pk_id: PkId, _key_value: KeyValue) -> Result<()> { - unimplemented!() + fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) { + for shard in &mut self.shards { + if shard.shard_id == pk_id.shard_id { + shard.write_with_pk_id(pk_id, key_value); + self.num_rows += 1; + return; + } + } + } + + fn freeze_active_shard(&mut self) -> Result<()> { + if let Some(shard) = self + .shard_builder + .finish(self.active_shard_id, self.metadata.clone())? + { + self.active_shard_id += 1; + self.shards.push(shard); + } + Ok(()) } } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 9eceb4920130..86c5ea18f1a2 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -14,21 +14,16 @@ //! Shard in a partition. -use std::collections::HashSet; +use store_api::metadata::RegionMetadataRef; -use common_recordbatch::filter::SimpleFilterEvaluator; -use store_api::storage::ColumnId; - -use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::DataParts; +use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::dict::KeyDictRef; -use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::{PkId, ShardId}; /// Shard stores data related to the same key dictionary. pub struct Shard { - shard_id: ShardId, + pub(crate) shard_id: ShardId, /// Key dictionary of the shard. `None` if the schema of the tree doesn't have a primary key. key_dict: Option, /// Data in the shard. @@ -36,35 +31,153 @@ pub struct Shard { } impl Shard { - /// Returns a shard without dictionary. - pub fn new_no_dict(_shard_id: ShardId) -> Shard { - unimplemented!() + /// Returns a new shard. + pub fn new(shard_id: ShardId, key_dict: Option, data_parts: DataParts) -> Shard { + Shard { + shard_id, + key_dict, + data_parts, + } } /// Returns the pk id of the key if it exists. - pub fn find_key(&self, _key: &[u8]) -> Option { - unimplemented!() + pub fn find_id_by_key(&self, key: &[u8]) -> Option { + let key_dict = self.key_dict.as_ref()?; + let pk_index = key_dict.get_pk_index(key)?; + + Some(PkId { + shard_id: self.shard_id, + pk_index, + }) } /// Writes a key value into the shard. - pub fn write_key_value( - &mut self, - _pk_id: PkId, - _key_value: KeyValue, - _metrics: &mut WriteMetrics, - ) -> Result<()> { - unimplemented!() + pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: KeyValue) { + debug_assert_eq!(self.shard_id, pk_id.shard_id); + + self.data_parts.write_row(pk_id.pk_index, key_value); } /// Scans the shard. - pub fn scan( - &self, - _projection: &HashSet, - _filters: &[SimpleFilterEvaluator], - ) -> ShardReader { + // TODO(yingwen): Push down projection to data parts. + pub fn scan(&self) -> ShardReader { unimplemented!() } + + /// Returns the memory size of the shard part. + pub fn shared_memory_size(&self) -> usize { + self.key_dict + .as_ref() + .map(|dict| dict.shared_memory_size()) + .unwrap_or(0) + } + + /// Forks a shard. + pub fn fork(&self, metadata: RegionMetadataRef) -> Shard { + Shard { + shard_id: self.shard_id, + key_dict: self.key_dict.clone(), + data_parts: DataParts::new(metadata, DATA_INIT_CAP), + } + } } /// Reader to read rows in a shard. pub struct ShardReader {} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::memtable::merge_tree::dict::KeyDictBuilder; + use crate::memtable::merge_tree::metrics::WriteMetrics; + use crate::memtable::merge_tree::PkIndex; + use crate::memtable::KeyValues; + use crate::test_util::memtable_util::{ + build_key_values_with_ts_seq_values, encode_key, encode_key_by_kv, encode_keys, + metadata_for_test, + }; + + fn input_with_key(metadata: &RegionMetadataRef) -> Vec { + vec![ + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 2, + [20, 21].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 0, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 0, + [0, 1].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 1, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard".to_string(), + 1, + [10, 11].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 2, + ), + ] + } + + fn new_shard_with_dict( + shard_id: ShardId, + metadata: RegionMetadataRef, + input: &[KeyValues], + ) -> Shard { + let mut dict_builder = KeyDictBuilder::new(1024); + let mut metrics = WriteMetrics::default(); + let mut keys = Vec::with_capacity(input.len()); + for kvs in input { + encode_keys(&metadata, kvs, &mut keys); + } + for key in &keys { + dict_builder.insert_key(key, &mut metrics); + } + + let dict = dict_builder.finish().unwrap(); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP); + + Shard::new(shard_id, Some(Arc::new(dict)), data_parts) + } + + #[test] + fn test_shard_find_by_key() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let shard = new_shard_with_dict(8, metadata, &input); + for i in 0..input.len() { + let key = encode_key("shard", i as u32); + assert_eq!( + PkId { + shard_id: 8, + pk_index: i as PkIndex, + }, + shard.find_id_by_key(&key).unwrap() + ); + } + assert!(shard.find_id_by_key(&encode_key("shard", 100)).is_none()); + } + + #[test] + fn test_write_shard() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let mut shard = new_shard_with_dict(8, metadata, &input); + for key_values in &input { + for kv in key_values.iter() { + let key = encode_key_by_kv(&kv); + let pk_id = shard.find_id_by_key(&key).unwrap(); + shard.write_with_pk_id(pk_id, kv); + } + } + } +} diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index c8d78029043c..f9a32a17a563 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -14,13 +14,20 @@ //! Builder of a shard. +use std::collections::HashSet; +use std::sync::Arc; + +use common_recordbatch::filter::SimpleFilterEvaluator; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; + use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::DataBuffer; +use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::dict::KeyDictBuilder; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; -use crate::memtable::merge_tree::ShardId; +use crate::memtable::merge_tree::{MergeTreeConfig, ShardId}; /// Builder to write keys and data to a shard that the key dictionary /// is still active. @@ -29,43 +36,155 @@ pub struct ShardBuilder { dict_builder: KeyDictBuilder, /// Buffer to store data. data_buffer: DataBuffer, - /// Max keys in an index shard. - index_max_keys_per_shard: usize, /// Number of rows to freeze a data part. data_freeze_threshold: usize, } impl ShardBuilder { - /// Write a key value with its encoded primary key. - pub fn write_with_key( - &mut self, - _key: &[u8], - _key_value: KeyValue, - _metrics: &mut WriteMetrics, - ) -> Result<()> { - unimplemented!() + /// Returns a new builder. + pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> ShardBuilder { + ShardBuilder { + dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard), + data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + data_freeze_threshold: config.data_freeze_threshold, + } } - /// Returns true if the builder is empty. - pub fn is_empty(&self) -> bool { - unimplemented!() + /// Write a key value with its encoded primary key. + pub fn write_with_key(&mut self, key: &[u8], key_value: KeyValue, metrics: &mut WriteMetrics) { + // Safety: we check whether the builder need to freeze before. + let pk_index = self.dict_builder.insert_key(key, metrics); + self.data_buffer.write_row(pk_index, key_value); } /// Returns true if the builder need to freeze. pub fn should_freeze(&self) -> bool { - unimplemented!() + self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold } /// Builds a new shard and resets the builder. - pub fn finish(&mut self, _shard_id: ShardId) -> Result { - unimplemented!() + /// + /// Returns `None` if the builder is empty. + pub fn finish( + &mut self, + shard_id: ShardId, + metadata: RegionMetadataRef, + ) -> Result> { + if self.data_buffer.is_empty() { + return Ok(None); + } + + let key_dict = self.dict_builder.finish(); + let data_part = match &key_dict { + Some(dict) => { + let pk_weights = dict.pk_weights_to_sort_data(); + self.data_buffer.freeze(&pk_weights)? + } + None => { + let pk_weights = [0]; + self.data_buffer.freeze(&pk_weights)? + } + }; + + // build data parts. + let data_parts = DataParts::new(metadata, DATA_INIT_CAP).with_frozen(vec![data_part]); + let key_dict = key_dict.map(Arc::new); + + Ok(Some(Shard::new(shard_id, key_dict, data_parts))) } - /// Scans the shard builder - pub fn scan(&mut self, _shard_id: ShardId) -> Result { + /// Scans the shard builder. + pub fn scan( + &mut self, + _projection: &HashSet, + _filters: &[SimpleFilterEvaluator], + ) -> Result { unimplemented!() } } -/// Reader to scan a shard. builder. +/// Reader to scan a shard builder. pub struct ShardBuilderReader {} + +// TODO(yingwen): Can we use generic for data reader? + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::memtable::merge_tree::dict::KeyDictBuilder; + use crate::memtable::merge_tree::metrics::WriteMetrics; + use crate::memtable::KeyValues; + use crate::test_util::memtable_util::{ + build_key_values_with_ts_seq_values, encode_key_by_kv, encode_keys, metadata_for_test, + }; + + fn input_with_key(metadata: &RegionMetadataRef) -> Vec { + vec![ + build_key_values_with_ts_seq_values( + metadata, + "shard_builder".to_string(), + 3, + [30, 31].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 0, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard_builder".to_string(), + 1, + [10, 11].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 1, + ), + build_key_values_with_ts_seq_values( + metadata, + "shard_builder".to_string(), + 2, + [20, 21].into_iter(), + [Some(0.0), Some(1.0)].into_iter(), + 2, + ), + ] + } + + fn new_shard_builder( + shard_id: ShardId, + metadata: RegionMetadataRef, + input: &[KeyValues], + ) -> Shard { + let mut dict_builder = KeyDictBuilder::new(1024); + let mut metrics = WriteMetrics::default(); + let mut keys = Vec::with_capacity(input.len()); + for kvs in input { + encode_keys(&metadata, kvs, &mut keys); + } + for key in &keys { + dict_builder.insert_key(key, &mut metrics); + } + + let dict = dict_builder.finish().unwrap(); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP); + + Shard::new(shard_id, Some(Arc::new(dict)), data_parts) + } + + #[test] + fn test_write_shard_builder() { + let metadata = metadata_for_test(); + let input = input_with_key(&metadata); + let config = MergeTreeConfig::default(); + let mut shard_builder = ShardBuilder::new(metadata.clone(), &config); + let mut metrics = WriteMetrics::default(); + assert!(shard_builder.finish(1, metadata.clone()).unwrap().is_none()); + + for key_values in &input { + for kv in key_values.iter() { + let key = encode_key_by_kv(&kv); + shard_builder.write_with_key(&key, kv, &mut metrics); + } + } + shard_builder.finish(1, metadata).unwrap().unwrap(); + } +} diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index d9c26611f362..4ae7d197b2e7 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -103,7 +103,7 @@ impl MergeTree { if !has_pk { // No primary key. - self.write_no_key(kv, metrics)?; + self.write_no_key(kv); continue; } @@ -202,7 +202,7 @@ impl MergeTree { } // Only fork partitions that have data. - let forked_part = part.fork(&metadata); + let forked_part = part.fork(&metadata, &self.config); forked.insert(*part_key, Arc::new(forked_part)); } @@ -236,11 +236,11 @@ impl MergeTree { partition.write_with_key(primary_key, key_value, metrics) } - fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> { + fn write_no_key(&self, key_value: KeyValue) { let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); let partition = self.get_or_create_partition(partition_key); - partition.write_no_key(key_value, metrics) + partition.write_no_key(key_value) } fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef { diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 7e761cad771a..584f21350719 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -22,15 +22,18 @@ use api::v1::value::ValueData; use api::v1::{Row, Rows, SemanticType}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; +use datatypes::value::ValueRef; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; +use crate::memtable::key_values::KeyValue; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, }; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Empty memtable for test. #[derive(Debug, Default)] @@ -93,14 +96,19 @@ impl MemtableBuilder for EmptyMemtableBuilder { /// /// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`. pub(crate) fn metadata_for_test() -> RegionMetadataRef { - metadata_with_primary_key(vec![0, 1]) + metadata_with_primary_key(vec![0, 1], false) } /// Creates a region metadata to test memtable and specific primary key. /// -/// The schema is `k0, k1, ts, v0, v1`. -pub(crate) fn metadata_with_primary_key(primary_key: Vec) -> RegionMetadataRef { +/// If `enable_table_id` is false, the schema is `k0, k1, ts, v0, v1`. +/// If `enable_table_id` is true, the schema is `k0, __table_id, ts, v0, v1`. +pub(crate) fn metadata_with_primary_key( + primary_key: Vec, + enable_table_id: bool, +) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + let maybe_table_id = if enable_table_id { "table_id" } else { "k1" }; builder .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), @@ -108,7 +116,11 @@ pub(crate) fn metadata_with_primary_key(primary_key: Vec) -> RegionMet column_id: 0, }) .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false), + column_schema: ColumnSchema::new( + maybe_table_id, + ConcreteDataType::uint32_datatype(), + false, + ), semantic_type: semantic_type_of_column(1, &primary_key), column_id: 1, }) @@ -144,11 +156,31 @@ fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> Sem } } +/// Builds key values with `len` rows for test. +pub(crate) fn build_key_values( + schema: &RegionMetadataRef, + k0: String, + k1: u32, + timestamps: &[i64], + sequence: SequenceNumber, +) -> KeyValues { + let values = timestamps.iter().map(|v| Some(*v as f64)); + + build_key_values_with_ts_seq_values( + schema, + k0, + k1, + timestamps.iter().copied(), + values, + sequence, + ) +} + /// Builds key values with timestamps (ms) and sequences for test. pub(crate) fn build_key_values_with_ts_seq_values( schema: &RegionMetadataRef, k0: String, - k1: i64, + k1: u32, timestamps: impl Iterator, values: impl Iterator>, sequence: SequenceNumber, @@ -174,7 +206,7 @@ pub(crate) fn build_key_values_with_ts_seq_values( value_data: Some(ValueData::StringValue(k0.clone())), }, api::v1::Value { - value_data: Some(ValueData::I64Value(k1)), + value_data: Some(ValueData::U32Value(k1)), }, api::v1::Value { value_data: Some(ValueData::TimestampMillisecondValue(ts)), @@ -198,3 +230,40 @@ pub(crate) fn build_key_values_with_ts_seq_values( }; KeyValues::new(schema.as_ref(), mutation).unwrap() } + +/// Encode keys. +pub(crate) fn encode_keys( + metadata: &RegionMetadataRef, + key_values: &KeyValues, + keys: &mut Vec>, +) { + let row_codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + for kv in key_values.iter() { + let key = row_codec.encode(kv.primary_keys()).unwrap(); + keys.push(key); + } +} + +/// Encode one key. +pub(crate) fn encode_key(k0: &str, k1: u32) -> Vec { + let row_codec = McmpRowCodec::new(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::uint32_datatype()), + ]); + let key = [ValueRef::String(k0), ValueRef::UInt32(k1)]; + row_codec.encode(key.into_iter()).unwrap() +} + +/// Encode one key. +pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { + let row_codec = McmpRowCodec::new(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::uint32_datatype()), + ]); + row_codec.encode(key_value.primary_keys()).unwrap() +}