Skip to content

Commit

Permalink
feat: Implement write and fork for the new memtable (#3357)
Browse files Browse the repository at this point in the history
* feat: write to a shard or a shard builder

* feat: freeze and fork for partition and shards

* chore: shard builder

* chore: change dict reader to support random access

* test: test write shard

* test: test write

* test: test memtable

* feat: add new and write_row to DataParts

* refactor: partition freeze shards

* refactor: write_with_pk_id

* style: fix clippy

* chore: add methods to get pk weights

* chroe: fix compiler errors
  • Loading branch information
evenyag authored Feb 23, 2024
1 parent 93d9f48 commit b144836
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 141 deletions.
82 changes: 82 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,85 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
))
}
}

#[cfg(test)]
mod tests {
use common_time::Timestamp;

use super::*;
use crate::test_util::memtable_util;

#[test]
fn test_memtable_sorted_input() {
write_sorted_input(true);
write_sorted_input(false);
}

fn write_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
let timestamps = (0..100).collect::<Vec<_>>();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(99)
)),
stats.time_range()
);
}

#[test]
fn test_memtable_unsorted_input() {
write_iter_unsorted_input(true);
write_iter_unsorted_input(false);
}

fn write_iter_unsorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
let memtable =
MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());

let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[1, 3, 7, 5, 6],
0, // sequence 0, 1, 2, 3, 4
);
memtable.write(&kvs).unwrap();

let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[5, 2, 4, 0, 7],
5, // sequence 5, 6, 7, 8, 9
);
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
stats.time_range()
);
}
}
32 changes: 17 additions & 15 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataNode, DataSource, Merger};
use crate::memtable::merge_tree::{PkId, PkIndex};
use crate::memtable::merge_tree::PkIndex;

const PK_INDEX_COLUMN_NAME: &str = "__pk_index";

/// Initial capacity for the data buffer.
pub(crate) const DATA_INIT_CAP: usize = 8;

/// Data part batches returns by `DataParts::read`.
#[derive(Debug, Clone)]
pub struct DataBatch {
Expand Down Expand Up @@ -128,9 +131,9 @@ impl DataBuffer {
}

/// Writes a row to data buffer.
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) {
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
self.ts_builder.push_value_ref(kv.timestamp());
self.pk_index_builder.push(Some(pk_id.pk_index));
self.pk_index_builder.push(Some(pk_index));
self.sequence_builder.push(Some(kv.sequence()));
self.op_type_builder.push(Some(kv.op_type() as u8));

Expand Down Expand Up @@ -662,9 +665,9 @@ pub struct ParquetPart {
/// Data parts under a shard.
pub struct DataParts {
/// The active writing buffer.
pub(crate) active: DataBuffer,
active: DataBuffer,
/// immutable (encoded) parts.
pub(crate) frozen: Vec<DataPart>,
frozen: Vec<DataPart>,
}

impl DataParts {
Expand All @@ -675,9 +678,14 @@ impl DataParts {
}
}

/// Writes one row into active part.
pub fn write_row(&mut self, pk_id: PkId, kv: KeyValue) {
self.active.write_row(pk_id, kv)
pub(crate) fn with_frozen(mut self, frozen: Vec<DataPart>) -> Self {
self.frozen = frozen;
self
}

/// Writes a row into parts.
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
self.active.write_row(pk_index, kv)
}

/// Freezes the active data buffer into frozen data parts.
Expand Down Expand Up @@ -932,13 +940,7 @@ mod tests {
);

for kv in kvs.iter() {
buffer.write_row(
PkId {
shard_id: 0,
pk_index,
},
kv,
);
buffer.write_row(pk_index, kv);
}
}

Expand Down
99 changes: 62 additions & 37 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,21 @@ impl KeyDictBuilder {
self.pk_to_index.get(key).copied()
}

/// Returns true if the builder is full.
pub fn is_full(&self) -> bool {
self.num_keys >= self.capacity
}

/// Adds the key to the builder and returns its index if the builder is not full.
///
/// Returns `None` if the builder is full.
pub fn try_insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> Option<PkIndex> {
/// # Panics
/// Panics if the builder is full.
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
assert!(!self.is_full());

if let Some(pk_index) = self.pk_to_index.get(key).copied() {
// Already in the builder.
return Some(pk_index);
}

// A new key.
if self.num_keys >= self.capacity {
// The builder is full.
return None;
return pk_index;
}

if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
Expand All @@ -91,7 +93,7 @@ impl KeyDictBuilder {
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();

Some(pk_index)
pk_index
}

/// Memory size of the builder.
Expand Down Expand Up @@ -129,11 +131,12 @@ impl KeyDictBuilder {
pk_to_index,
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index: self.key_bytes_in_index,
})
}

/// Scans the builder.
pub fn scan(&self) -> DictBuilderReader {
/// Reads the builder.
pub fn read(&self) -> DictBuilderReader {
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
let block = self.key_buffer.finish_cloned();
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
Expand Down Expand Up @@ -162,38 +165,46 @@ impl DictBuilderReader {
}
}

/// Returns true if the item in the reader is valid.
pub fn is_valid(&self) -> bool {
self.offset < self.sorted_pk_indices.len()
/// Returns the number of keys.
pub fn num_keys(&self) -> usize {
self.sorted_pk_indices.len()
}

/// Gets the i-th pk index.
pub fn pk_index(&self, offset: usize) -> PkIndex {
self.sorted_pk_indices[offset]
}

/// Returns current key.
pub fn current_key(&self) -> &[u8] {
let pk_index = self.current_pk_index();
/// Gets the i-th key.
pub fn key(&self, offset: usize) -> &[u8] {
let pk_index = self.pk_index(offset);
self.key_by_pk_index(pk_index)
}

/// Returns current [PkIndex] of the key.
pub fn current_pk_index(&self) -> PkIndex {
assert!(self.is_valid());
self.sorted_pk_indices[self.offset]
/// Gets the key by the pk index.
pub fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
self.blocks[block_idx as usize].key_by_pk_index(pk_index)
}

/// Advances the reader.
pub fn next(&mut self) {
assert!(self.is_valid());
self.offset += 1;
/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.sorted_pk_indices)
}

/// Returns pk indices sorted by keys.
pub(crate) fn sorted_pk_index(&self) -> &[PkIndex] {
&self.sorted_pk_indices
}
}

fn key_by_pk_index(&self, pk_index: PkIndex) -> &[u8] {
let block_idx = pk_index / MAX_KEYS_PER_BLOCK;
self.blocks[block_idx as usize].key_by_pk_index(pk_index)
/// Returns pk weights to sort a data part and replaces pk indices.
fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec<u16> {
let mut pk_weights = vec![0; sorted_pk_indices.len()];
for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
pk_weights[*pk_index as usize] = weight as u16;
}
pk_weights
}

/// A key dictionary.
Expand All @@ -206,6 +217,7 @@ pub struct KeyDict {
dict_blocks: Vec<DictBlock>,
/// Maps pk index to position of the key in [Self::dict_blocks].
key_positions: Vec<PkIndex>,
key_bytes_in_index: usize,
}

pub type KeyDictRef = Arc<KeyDict>;
Expand All @@ -220,6 +232,21 @@ impl KeyDict {
let block_index = position / MAX_KEYS_PER_BLOCK;
self.dict_blocks[block_index as usize].key_by_pk_index(position)
}

/// Gets the pk index by the key.
pub fn get_pk_index(&self, key: &[u8]) -> Option<PkIndex> {
self.pk_to_index.get(key).copied()
}

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.key_positions)
}

/// Returns the shared memory size.
pub(crate) fn shared_memory_size(&self) -> usize {
self.key_bytes_in_index
}
}

/// Buffer to store unsorted primary keys.
Expand Down Expand Up @@ -364,7 +391,8 @@ mod tests {
let mut last_pk_index = None;
let mut metrics = WriteMetrics::default();
for key in &keys {
let pk_index = builder.try_insert_key(key, &mut metrics).unwrap();
assert!(!builder.is_full());
let pk_index = builder.insert_key(key, &mut metrics);
last_pk_index = Some(pk_index);
}
assert_eq!(num_keys - 1, last_pk_index.unwrap());
Expand All @@ -379,10 +407,9 @@ mod tests {
expect.sort_unstable_by(|a, b| a.0.cmp(&b.0));

let mut result = Vec::with_capacity(expect.len());
let mut reader = builder.scan();
while reader.is_valid() {
result.push((reader.current_key().to_vec(), reader.current_pk_index()));
reader.next();
let reader = builder.read();
for i in 0..reader.num_keys() {
result.push((reader.key(i).to_vec(), reader.pk_index(i)));
}
assert_eq!(expect, result);
}
Expand All @@ -397,9 +424,7 @@ mod tests {
for i in 0..num_keys {
// Each key is 5 bytes.
let key = format!("{i:05}");
builder
.try_insert_key(key.as_bytes(), &mut metrics)
.unwrap();
builder.insert_key(key.as_bytes(), &mut metrics);
}
// num_keys * 5 * 2
assert_eq!(5130, metrics.key_bytes);
Expand Down
9 changes: 1 addition & 8 deletions src/mito2/src/memtable/merge_tree/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ mod tests {

use super::*;
use crate::memtable::merge_tree::data::DataBuffer;
use crate::memtable::merge_tree::PkId;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};

fn write_rows_to_buffer(
Expand All @@ -429,13 +428,7 @@ mod tests {
);

for kv in kvs.iter() {
buffer.write_row(
PkId {
shard_id: 0,
pk_index,
},
kv,
);
buffer.write_row(pk_index, kv);
}

*sequence += rows;
Expand Down
Loading

0 comments on commit b144836

Please sign in to comment.