Skip to content

Commit

Permalink
feat: distinguish between different read paths
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Feb 23, 2024
1 parent b144836 commit 498c623
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 53 deletions.
171 changes: 143 additions & 28 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,35 @@ impl DataBuffer {
}
}

/// Freezes `DataBuffer` to bytes. Use `pk_weights` to sort rows and replace pk_index to pk_weights.
/// Freezes `DataBuffer` to bytes.
/// If `pk_weights` is present, it will be used to sort rows.
///
/// `freeze` clears the buffers of builders.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None);
pub fn freeze(
&mut self,
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index);
let parts = encoder.write(self)?;
Ok(parts)
}

/// Reads batches from data buffer without resetting builder's buffers.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataBufferReader> {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
true,
true,
true,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
false,
)?;
DataBufferReader::new(batch)
}
Expand Down Expand Up @@ -208,7 +219,7 @@ impl LazyMutableVectorBuilder {
fn data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &mut DataBuffer,
pk_weights: &[u16],
pk_weights: Option<&[u16]>,
keep_data: bool,
dedup: bool,
replace_pk_index: bool,
Expand Down Expand Up @@ -408,7 +419,7 @@ impl Ord for InnerKey {
}

fn build_rows_to_sort(
pk_weights: &[u16],
pk_weights: Option<&[u16]>,
pk_index: &UInt16Vector,
ts: &VectorRef,
sequence: &UInt64Vector,
Expand Down Expand Up @@ -453,11 +464,16 @@ fn build_rows_to_sort(
.zip(sequence_values.iter())
.enumerate()
.map(|(idx, ((timestamp, pk_index), sequence))| {
let pk_weight = if let Some(weights) = pk_weights {
weights[*pk_index as usize] // if pk_weights is present, sort according to weight.
} else {
*pk_index // otherwise pk_index has already been replaced by weights.
};
(
idx,
InnerKey {
timestamp: *timestamp,
pk_weight: pk_weights[*pk_index as usize],
pk_weight,
sequence: *sequence,
},
)
Expand Down Expand Up @@ -493,21 +509,24 @@ fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {

struct DataPartEncoder<'a> {
schema: SchemaRef,
pk_weights: &'a [u16],
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
}

impl<'a> DataPartEncoder<'a> {
pub fn new(
metadata: &RegionMetadataRef,
pk_weights: &'a [u16],
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
schema,
pk_weights,
row_group_size,
replace_pk_index,
}
}

Expand All @@ -528,7 +547,7 @@ impl<'a> DataPartEncoder<'a> {
self.pk_weights,
false,
true,
true,
self.replace_pk_index,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
Expand Down Expand Up @@ -689,19 +708,20 @@ impl DataParts {
}

/// Freezes the active data buffer into frozen data parts.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> {
self.frozen.push(self.active.freeze(pk_weights)?);
pub fn freeze(&mut self) -> Result<()> {
self.frozen.push(self.active.freeze(None, false)?);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
/// todo(hl): read may not take any pk weights if is read by `Shard`.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataPartsReader> {
pub fn read(&mut self) -> Result<DataPartsReader> {
let mut nodes = Vec::with_capacity(self.frozen.len() + 1);
nodes.push(DataNode::new(DataSource::Buffer(
self.active.read(pk_weights)?,
// `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
// then we pass None to sort rows directly according to pk_index.
self.active.read(None)?,
)));
for p in &self.frozen {
nodes.push(DataNode::new(DataSource::Part(p.read()?)));
Expand Down Expand Up @@ -742,6 +762,7 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};

#[test]
Expand Down Expand Up @@ -773,9 +794,15 @@ mod tests {
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true, true)
.unwrap();
let batch = data_buffer_to_record_batches(
schema,
&mut buffer,
Some(&[3, 1]),
keep_data,
true,
true,
)
.unwrap();

assert_eq!(
vec![1, 2, 1, 2],
Expand Down Expand Up @@ -839,7 +866,8 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, Some(&[0, 1]), true, true, true)
.unwrap();

assert_eq!(3, batch.num_rows());
assert_eq!(
Expand Down Expand Up @@ -893,7 +921,8 @@ mod tests {
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, false, true)
.unwrap();

assert_eq!(
vec![1, 1, 3, 3, 3],
Expand Down Expand Up @@ -944,6 +973,80 @@ mod tests {
}
}

fn check_data_buffer_freeze(
pk_weights: Option<&[u16]>,
replace_pk_weights: bool,
expected: &[(u16, Vec<(i64, u64)>)],
) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);

// write rows with null values.
write_rows_to_buffer(
&mut buffer,
&meta,
0,
vec![0, 1, 2],
vec![Some(1.0), None, Some(3.0)],
0,
);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);

let mut res = Vec::with_capacity(3);
let mut reader = buffer
.freeze(pk_weights, replace_pk_weights)
.unwrap()
.read()
.unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let sequence = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index, ts_and_seq));

reader.next().unwrap();
}
assert_eq!(expected, res);
}

#[test]
fn test_data_buffer_freeze() {
check_data_buffer_freeze(
None,
false,
&[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
);

check_data_buffer_freeze(
Some(&[1, 2]),
false,
&[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
);

check_data_buffer_freeze(
Some(&[3, 2]),
true,
&[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
);

check_data_buffer_freeze(
Some(&[3, 2]),
false,
&[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
);
}

#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
Expand All @@ -965,7 +1068,7 @@ mod tests {

assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
Expand Down Expand Up @@ -1010,8 +1113,7 @@ mod tests {
assert_eq!(None, search_next_pk_range(&a, 6));
}

#[test]
fn test_iter_data_buffer() {
fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);

Expand All @@ -1033,15 +1135,28 @@ mod tests {
2,
);

let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
let mut iter = buffer.read(pk_weights).unwrap();
check_buffer_values_equal(&mut iter, expected);
}

#[test]
fn test_iter_data_buffer() {
check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
check_iter_data_buffer(
Some(&[0, 1, 2, 3]),
&[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
);
check_iter_data_buffer(
Some(&[3, 2, 1, 0]),
&[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
);
}

#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}

Expand Down Expand Up @@ -1095,7 +1210,7 @@ mod tests {
4,
);

let encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true);
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
Expand Down
Loading

0 comments on commit 498c623

Please sign in to comment.