Skip to content

Commit

Permalink
fix: freeze data buffer in shard (#3468)
Browse files Browse the repository at this point in the history
* feat: call freeze if the active data buffer in a shard is full

* chore: more metrics

* chore: print metrics

* chore: enlarge freeze threshold

* test: test freeze

* test: fix config test
  • Loading branch information
evenyag authored Mar 11, 2024
1 parent 0a4444a commit 06dcd0f
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Default for MergeTreeConfig {

Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 32768,
data_freeze_threshold: 131072,
dedup: true,
fork_dictionary_bytes,
}
Expand Down
17 changes: 17 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,18 @@ impl DataParts {
self.active.write_row(pk_index, kv)
}

/// Returns the number of rows in the active buffer.
pub fn num_active_rows(&self) -> usize {
self.active.num_rows()
}

/// Freezes active buffer and creates a new active buffer.
pub fn freeze(&mut self) -> Result<()> {
let part = self.active.freeze(None, false)?;
self.frozen.push(part);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
Expand All @@ -976,6 +988,11 @@ impl DataParts {
pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}

#[cfg(test)]
pub(crate) fn frozen_len(&self) -> usize {
self.frozen.len()
}
}

pub struct DataPartsReaderBuilder {
Expand Down
61 changes: 43 additions & 18 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Partition {

// Finds key in shards, now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
inner.write_to_shard(pk_id, &key_value);
inner.write_to_shard(pk_id, &key_value)?;
inner.num_rows += 1;
return Ok(());
}
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Partition {
}

/// Writes to the partition without a primary key.
pub fn write_no_key(&self, key_value: KeyValue) {
pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// If no primary key, always write to the first shard.
debug_assert!(!inner.shards.is_empty());
Expand All @@ -117,12 +117,15 @@ impl Partition {
shard_id: 0,
pk_index: 0,
};
inner.shards[0].write_with_pk_id(pk_id, &key_value);
inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
inner.num_rows += 1;

Ok(())
}

/// Scans data in the partition.
pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
let start = Instant::now();
let key_filter = if context.need_prune_key {
Some(PrimaryKeyFilter::new(
context.metadata.clone(),
Expand Down Expand Up @@ -150,7 +153,7 @@ impl Partition {
(builder_reader, shard_source)
};

context.metrics.num_shards = shard_reader_builders.len();
context.metrics.num_shards += shard_reader_builders.len();
let mut nodes = shard_reader_builders
.into_iter()
.map(|builder| {
Expand All @@ -161,7 +164,7 @@ impl Partition {
.collect::<Result<Vec<_>>>()?;

if let Some(builder) = builder_source {
context.metrics.read_builder = true;
context.metrics.num_builder += 1;
// Move the initialization of ShardBuilderReader out of read lock.
let shard_builder_reader =
builder.build(Some(&context.pk_weights), key_filter.clone())?;
Expand All @@ -172,8 +175,10 @@ impl Partition {
let merger = ShardMerger::try_new(nodes)?;
if self.dedup {
let source = DedupReader::try_new(merger)?;
context.metrics.build_partition_reader += start.elapsed();
PartitionReader::new(context, Box::new(source))
} else {
context.metrics.build_partition_reader += start.elapsed();
PartitionReader::new(context, Box::new(merger))
}
}
Expand Down Expand Up @@ -282,9 +287,10 @@ pub(crate) struct PartitionStats {

#[derive(Default)]
struct PartitionReaderMetrics {
build_partition_reader: Duration,
read_source: Duration,
data_batch_to_batch: Duration,
read_builder: bool,
num_builder: usize,
num_shards: usize,
}

Expand Down Expand Up @@ -440,9 +446,15 @@ impl Drop for ReadPartitionContext {
.observe(partition_data_batch_to_batch);

common_telemetry::debug!(
"TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s",
self.metrics.read_builder,
"TreeIter partitions metrics, \
num_builder: {}, \
num_shards: {}, \
build_partition_reader: {}s, \
partition_read_source: {}s, \
partition_data_batch_to_batch: {}s",
self.metrics.num_builder,
self.metrics.num_shards,
self.metrics.build_partition_reader.as_secs_f64(),
partition_read_source,
partition_data_batch_to_batch,
);
Expand Down Expand Up @@ -549,7 +561,16 @@ impl Inner {
fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
(vec![Shard::new(0, None, data_parts, config.dedup)], 1)
(
vec![Shard::new(
0,
None,
data_parts,
config.dedup,
config.data_freeze_threshold,
)],
1,
)
} else {
(Vec::new(), 0)
};
Expand All @@ -569,18 +590,22 @@ impl Inner {
self.pk_to_pk_id.get(primary_key).copied()
}

fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) {
fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
if pk_id.shard_id == self.shard_builder.current_shard_id() {
self.shard_builder.write_with_pk_id(pk_id, key_value);
return;
}
for shard in &mut self.shards {
if shard.shard_id == pk_id.shard_id {
shard.write_with_pk_id(pk_id, key_value);
self.num_rows += 1;
return;
}
return Ok(());
}

// Safety: We find the shard by shard id.
let shard = self
.shards
.iter_mut()
.find(|shard| shard.shard_id == pk_id.shard_id)
.unwrap();
shard.write_with_pk_id(pk_id, key_value)?;
self.num_rows += 1;

Ok(())
}

fn freeze_active_shard(&mut self) -> Result<()> {
Expand Down
87 changes: 74 additions & 13 deletions src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Shard {
/// Data in the shard.
data_parts: DataParts,
dedup: bool,
/// Number of rows to freeze a data part.
data_freeze_threshold: usize,
}

impl Shard {
Expand All @@ -48,20 +50,29 @@ impl Shard {
key_dict: Option<KeyDictRef>,
data_parts: DataParts,
dedup: bool,
data_freeze_threshold: usize,
) -> Shard {
Shard {
shard_id,
key_dict,
data_parts,
dedup,
data_freeze_threshold,
}
}

/// Writes a key value into the shard.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
///
/// It will freezes the active buffer if it is full.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
debug_assert_eq!(self.shard_id, pk_id.shard_id);

if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
self.data_parts.freeze()?;
}

self.data_parts.write_row(pk_id.pk_index, key_value);
Ok(())
}

/// Scans the shard.
Expand All @@ -83,6 +94,7 @@ impl Shard {
key_dict: self.key_dict.clone(),
data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
dedup: self.dedup,
data_freeze_threshold: self.data_freeze_threshold,
}
}

Expand Down Expand Up @@ -467,6 +479,7 @@ mod tests {
shard_id: ShardId,
metadata: RegionMetadataRef,
input: &[(KeyValues, PkIndex)],
data_freeze_threshold: usize,
) -> Shard {
let mut dict_builder = KeyDictBuilder::new(1024);
let mut metrics = WriteMetrics::default();
Expand All @@ -481,36 +494,84 @@ mod tests {
let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true)
Shard::new(
shard_id,
Some(Arc::new(dict)),
data_parts,
true,
data_freeze_threshold,
)
}

fn collect_timestamps(shard: &Shard) -> Vec<i64> {
let mut reader = shard.read().unwrap().build(None).unwrap();
let mut timestamps = Vec::new();
while reader.is_valid() {
let rb = reader.current_data_batch().slice_record_batch();
let ts_array = rb.column(1);
let ts_slice = timestamp_array_to_i64_slice(ts_array);
timestamps.extend_from_slice(ts_slice);

reader.next().unwrap();
}
timestamps
}

#[test]
fn test_write_read_shard() {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let mut shard = new_shard_with_dict(8, metadata, &input);
let mut shard = new_shard_with_dict(8, metadata, &input, 100);
assert!(shard.is_empty());
for (key_values, pk_index) in &input {
for kv in key_values.iter() {
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *pk_index,
};
shard.write_with_pk_id(pk_id, &kv);
shard.write_with_pk_id(pk_id, &kv).unwrap();
}
}
assert!(!shard.is_empty());

let mut reader = shard.read().unwrap().build(None).unwrap();
let mut timestamps = Vec::new();
while reader.is_valid() {
let rb = reader.current_data_batch().slice_record_batch();
let ts_array = rb.column(1);
let ts_slice = timestamp_array_to_i64_slice(ts_array);
timestamps.extend_from_slice(ts_slice);
let timestamps = collect_timestamps(&shard);
assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
}

reader.next().unwrap();
#[test]
fn test_shard_freeze() {
let metadata = metadata_for_test();
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"shard".to_string(),
0,
[0].into_iter(),
[Some(0.0)].into_iter(),
0,
);
let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
let expected: Vec<_> = (0..200).collect();
for i in &expected {
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"shard".to_string(),
0,
[*i].into_iter(),
[Some(0.0)].into_iter(),
*i as u64,
);
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *i as PkIndex,
};
for kv in kvs.iter() {
shard.write_with_pk_id(pk_id, &kv).unwrap();
}
}
assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
assert!(!shard.is_empty());
assert_eq!(3, shard.data_parts.frozen_len());

let timestamps = collect_timestamps(&shard);
assert_eq!(expected, timestamps);
}
}
8 changes: 7 additions & 1 deletion src/mito2/src/memtable/merge_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ impl ShardBuilder {
let shard_id = self.current_shard_id;
self.current_shard_id += 1;

Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup)))
Ok(Some(Shard::new(
shard_id,
key_dict,
data_parts,
self.dedup,
self.data_freeze_threshold,
)))
}

/// Scans the shard builder.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl MergeTree {

if !has_pk {
// No primary key.
self.write_no_key(kv);
self.write_no_key(kv)?;
continue;
}

Expand Down Expand Up @@ -299,7 +299,7 @@ impl MergeTree {
)
}

fn write_no_key(&self, key_value: KeyValue) {
fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
let partition = self.get_or_create_partition(partition_key);

Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ intermediate_path = ""
[datanode.region_engine.mito.memtable]
type = "experimental"
index_max_keys_per_shard = 8192
data_freeze_threshold = 32768
data_freeze_threshold = 131072
dedup = true
fork_dictionary_bytes = "1GiB"
Expand Down

0 comments on commit 06dcd0f

Please sign in to comment.