Skip to content

Commit

Permalink
feat: enable write variable width data in 2.1 (#3090)
Browse files Browse the repository at this point in the history
This PR adds a `Compute_Stat` trait to DataBlock, to separate the
statistics computation and statistics inquiry.

This PR also modifies `do_flush` and `encode_miniblock` in
`PrimitiveStructuralEncoder` to enable write variable width data in
2.1(PrimitiveStructuralEncoder)
  • Loading branch information
broccoliSpicy authored Nov 5, 2024
1 parent 83439ef commit 98f642c
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 138 deletions.
11 changes: 9 additions & 2 deletions rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use snafu::{location, Location};

use lance_core::{Error, Result};

use crate::{buffer::LanceBuffer, statistics::Stat};
use crate::{
buffer::LanceBuffer,
statistics::{ComputeStat, Stat},
};

/// `Encoding` enum serves as a encoding registration center.
///
Expand Down Expand Up @@ -1232,7 +1235,7 @@ impl DataBlock {
return Self::AllNull(AllNullDataBlock { num_values });
}

let encoded = match data_type {
let mut encoded = match data_type {
DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
DataType::BinaryView | DataType::Utf8View => {
todo!()
Expand Down Expand Up @@ -1318,6 +1321,10 @@ impl DataBlock {
)
}
};

// compute statistics
encoded.compute_stat();

if !matches!(data_type, DataType::Dictionary(_, _)) {
match nulls {
Nullability::None => encoded,
Expand Down
92 changes: 61 additions & 31 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
use std::{collections::VecDeque, fmt::Debug, iter, ops::Range, sync::Arc, vec};

use arrow::array::AsArray;
use arrow_array::{make_array, Array, ArrayRef};
use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer};
use arrow_schema::{DataType, Field as ArrowField};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryStreamExt};
use lance_arrow::{deepcopy::deep_copy_array, DataTypeExt};
use lance_arrow::deepcopy::deep_copy_array;
use log::{debug, trace};
use snafu::{location, Location};

use crate::data::DataBlock;
use crate::statistics::{GetStat, Stat};
use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};

use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlock, DataBlockBuilder, FixedWidthDataBlock, UsedEncoding},
data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock, UsedEncoding},
decoder::{
BlockDecompressor, ColumnInfo, DecodeArrayTask, DecodePageTask, DecodedArray, DecodedPage,
DecompressorStrategy, FieldScheduler, FilterExpression, LoadedPage, LogicalPageDecoder,
Expand Down Expand Up @@ -1534,9 +1536,17 @@ impl PrimitiveStructuralEncoder {
// As data gets wide then the # of values per block shrinks (very wide)
// data doesn't even fit in a mini-block and the block overhead gets
// too large and we prefer zipped.
fn is_narrow(arrays: &[ArrayRef], data_type: &DataType) -> bool {
let avg_bytes_per_row = Self::get_avg_value_size(arrays, data_type);
avg_bytes_per_row < 128
fn is_narrow(data_block: &DataBlock) -> bool {
if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
let max_len_array = max_len_array
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
.unwrap();
if max_len_array.value(0) < 128 {
return true;
}
}
false
}

// Converts value data, repetition levels, and definition levels into a single
Expand Down Expand Up @@ -1695,9 +1705,8 @@ impl PrimitiveStructuralEncoder {
column_idx: u32,
field: &Field,
compression_strategy: &dyn CompressionStrategy,
arrays: Vec<ArrayRef>,
data: DataBlock,
repdefs: Vec<RepDefBuilder>,
num_values: u64,
row_number: u64,
) -> Result<EncodedPage> {
let repdef = RepDefBuilder::serialize(repdefs);
Expand All @@ -1707,7 +1716,6 @@ impl PrimitiveStructuralEncoder {
// and potentially more decoder asymmetry. However, it may be worth
// investigating at some point

let data = DataBlock::from_arrays(&arrays, num_values);
let num_values = data.num_values();
// The validity is encoded in repdef so we can remove it
let data = data.remove_validity();
Expand Down Expand Up @@ -1743,17 +1751,6 @@ impl PrimitiveStructuralEncoder {
})
}

fn get_avg_value_size(_arrays: &[ArrayRef], data_type: &DataType) -> u64 {
// Simple types, we can infer avg size without looking at value
let byte_width = data_type.byte_width_opt();
if let Some(byte_width) = byte_width {
return byte_width as u64;
}

// Other types, we need to inspect buffers
todo!()
}

// Creates an encode task, consuming all buffered data
fn do_flush(
&mut self,
Expand All @@ -1773,18 +1770,20 @@ impl PrimitiveStructuralEncoder {

if num_values == num_nulls {
Self::encode_all_null(column_idx, num_values, row_number)
} else if Self::is_narrow(&arrays, &field.data_type()) {
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
arrays,
repdefs,
num_values,
row_number,
)
} else {
todo!("Full zipped encoding")
let data_block = DataBlock::from_arrays(&arrays, num_values);
if Self::is_narrow(&data_block) {
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
data_block,
repdefs,
row_number,
)
} else {
todo!("Full zipped encoding")
}
}
})
.boxed();
Expand Down Expand Up @@ -1857,3 +1856,34 @@ impl FieldEncoder for PrimitiveStructuralEncoder {
std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::{ArrayRef, Int8Array, StringArray};

use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;

use super::DataBlock;

#[test]
fn test_is_narrow() {
let int8_array = Int8Array::from(vec![1, 2, 3]);
let array_ref: ArrayRef = Arc::new(int8_array);
let block = DataBlock::from_array(array_ref);

assert!(PrimitiveStructuralEncoder::is_narrow(&block));

let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
let block = DataBlock::from_array(string_array);
assert!(PrimitiveStructuralEncoder::is_narrow(&block));

let string_array = StringArray::from(vec![
Some("hello world".repeat(100)),
Some("world".to_string()),
]);
let block = DataBlock::from_array(string_array);
assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
}
}
Loading

0 comments on commit 98f642c

Please sign in to comment.