Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fsst compression with mini-block #3121

Merged
merged 7 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ message Binary {
message BinaryMiniBlock {
}

message FsstMiniBlock {
ArrayEncoding BinaryMiniBlock = 1;
bytes symbol_table = 2;
}

message Fsst {
ArrayEncoding binary = 1;
bytes symbol_table = 2;
Expand Down Expand Up @@ -273,6 +278,7 @@ message ArrayEncoding {
Constant constant = 13;
Bitpack2 bitpack2 = 14;
BinaryMiniBlock binary_mini_block = 15;
FsstMiniBlock fsst_mini_block = 16;
}
}

Expand Down
7 changes: 5 additions & 2 deletions rust/lance-encoding/src/compression_algo/fsst/src/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ const FSST_CODE_MASK: u16 = FSST_CODE_MAX - 1;
const FSST_SAMPLETARGET: usize = 1 << 14;
const FSST_SAMPLEMAXSZ: usize = 2 * FSST_SAMPLETARGET;

// the the input size is less than 4MB, we mark the file header and copy the input to the output as is
const FSST_LEAST_INPUT_SIZE: usize = 4 * 1024 * 1024; // 4MB
// if the input size is less than 4MB, we mark the file header and copy the input to the output as is
pub const FSST_LEAST_INPUT_SIZE: usize = 4 * 1024 * 1024; // 4MB

// if the max length of the input strings are less than `FSST_LEAST_INPUT_MAX_LENGTH`, we shouldn't use FSST.
pub const FSST_LEAST_INPUT_MAX_LENGTH: u64 = 5;

// we only use the lower 32 bits in icl, so we can use 1 << 32 to represent a free slot in the hash table
const FSST_ICL_FREE: u64 = 1 << 32;
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ use crate::encodings::logical::r#struct::{
use crate::encodings::physical::binary::BinaryMiniBlockDecompressor;
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -505,6 +506,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => {
Ok(Box::new(BinaryMiniBlockDecompressor::default()))
}
pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => {
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
}
_ => todo!(),
}
}
Expand Down
20 changes: 18 additions & 2 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::encodings::physical::bitpack_fastlanes::{
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fixed_size_list::FslPerValueCompressor;
use crate::encodings::physical::fsst::FsstArrayEncoder;
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::format::ProtobufUtils;
use crate::repdef::RepDefBuilder;
Expand All @@ -49,6 +49,7 @@ use crate::{
},
format::pb,
};
use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};

use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use std::collections::hash_map::RandomState;
Expand Down Expand Up @@ -792,7 +793,7 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
if let DataBlock::FixedWidth(ref fixed_width_data) = data {
let bit_widths = data
.get_stat(Stat::BitWidth)
.expect("FixedWidthDataBlock should have valid bit width statistics");
.expect("FixedWidthDataBlock should have valid `Stat::BitWidth` statistics");
Comment on lines 795 to +796
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: if you find yourself repeating the same expect statement again and again then maybe it would be worth it to make an expect_stat method which does the get_stat / expect combination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha, I actually didn't know about this, thanks for the suggestion. I will fill a separate PR for this.

// Temporary hack to work around https://github.com/lancedb/lance/issues/3102
// Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
let has_all_zeros = bit_widths
Expand All @@ -811,6 +812,21 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
}
if let DataBlock::VariableWidth(ref variable_width_data) = data {
if variable_width_data.bits_per_offset == 32 {
let data_size = variable_width_data.get_stat(Stat::DataSize).expect(
"VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
);
let data_size = data_size.as_primitive::<UInt64Type>().value(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also could be helpful to have a expect_single_state / get_single_stat method. Then you can just do:

let data_size = variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion! I will create a separate PR for this.


let max_len = variable_width_data.get_stat(Stat::MaxLength).expect(
"VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
);
let max_len = max_len.as_primitive::<UInt64Type>().value(0);

if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
&& data_size >= FSST_LEAST_INPUT_SIZE as u64
{
return Ok(Box::new(FsstMiniBlockEncoder::default()));
}
return Ok(Box::new(BinaryMiniBlockEncoder::default()));
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ pub fn decoder_from_array_encoding(
pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::FsstMiniBlock(_) => unreachable!(),
}
}

Expand Down
154 changes: 148 additions & 6 deletions rust/lance-encoding/src/encodings/physical/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ use arrow_buffer::ScalarBuffer;
use arrow_schema::DataType;
use futures::{future::BoxFuture, FutureExt};

use lance_core::Result;
use lance_core::{Error, Result};
use snafu::{location, Location};

use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlock, NullableDataBlock, UsedEncoding, VariableWidthBlock},
decoder::{PageScheduler, PrimitivePageDecoder},
encoder::{ArrayEncoder, EncodedArray},
format::ProtobufUtils,
decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder},
encoder::{ArrayEncoder, EncodedArray, MiniBlockCompressed, MiniBlockCompressor},
format::{
pb::{self},
ProtobufUtils,
},
EncodingsIo,
};

use super::binary::{BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder};

#[derive(Debug)]
pub struct FsstPageScheduler {
inner_scheduler: Box<dyn PageScheduler>,
Expand Down Expand Up @@ -201,14 +207,132 @@ impl ArrayEncoder for FsstArrayEncoder {
}
}

#[derive(Debug, Default)]
pub struct FsstMiniBlockEncoder {}

impl MiniBlockCompressor for FsstMiniBlockEncoder {
fn compress(
&self,
data: DataBlock,
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
match data {
DataBlock::VariableWidth(mut variable_width) => {
let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
let offsets_slice = offsets.as_ref();
let bytes_data = variable_width.data.into_buffer();

// prepare compression output buffer
let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
let mut dest_values = vec![0_u8; bytes_data.len() * 2];
let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];

// fsst compression
fsst::fsst::compress(
&mut symbol_table,
bytes_data.as_slice(),
offsets_slice,
&mut dest_values,
&mut dest_offsets,
)?;

// construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
let data_block = DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::reinterpret_vec(dest_values),
bits_per_offset: 32,
offsets: LanceBuffer::reinterpret_vec(dest_offsets),
num_values: variable_width.num_values,
block_info: BlockInfo::new(),
used_encodings: UsedEncoding::new(),
});

// compress the fsst compressed data using `BinaryMiniBlockEncoder`
let binary_compressor =
Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;

let (binary_miniblock_compressed, binary_array_encoding) =
binary_compressor.compress(data_block)?;

Ok((
binary_miniblock_compressed,
ProtobufUtils::fsst_mini_block(binary_array_encoding, symbol_table),
))
}
_ => Err(Error::InvalidInput {
source: format!(
"Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
data.name()
)
.into(),
location: location!(),
}),
}
}
}

#[derive(Debug)]
pub struct FsstMiniBlockDecompressor {
symbol_table: Vec<u8>,
}

impl FsstMiniBlockDecompressor {
pub fn new(description: &pb::FsstMiniBlock) -> Self {
Self {
symbol_table: description.symbol_table.clone(),
}
}
}

impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
// Step 1. decompress data use `BinaryMiniBlockDecompressor`
let binary_decompressor =
Box::new(BinaryMiniBlockDecompressor::default()) as Box<dyn MiniBlockDecompressor>;
let compressed_data_block = binary_decompressor.decompress(data, num_values)?;
let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
};

// Step 2. FSST decompress
let bytes = compressed_data_block.data.borrow_to_typed_slice::<u8>();
let bytes = bytes.as_ref();
let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
let offsets = offsets.as_ref();

// FSST decompression output buffer, the `MiniBlock` has a size limit of `4 KiB` and
// the FSST decompression algorithm output is at most `8 * input_size`
// Since `MiniBlock Size` <= 4 KiB and `offsets` are type `i32, it has number of `offsets` <= 1024.
let mut decompress_bytes_buf = vec![0u8; 4 * 1024 * 8];
let mut decompress_offset_buf = vec![0i32; 1024];
fsst::fsst::decompress(
&self.symbol_table,
bytes,
offsets,
&mut decompress_bytes_buf,
&mut decompress_offset_buf,
)?;

Ok(DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::Owned(decompress_bytes_buf),
offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
bits_per_offset: 32,
num_values,
block_info: BlockInfo::new(),
used_encodings: UsedEncoding::new(),
}))
}
}

#[cfg(test)]
mod tests {

use std::collections::HashMap;

use lance_datagen::{ByteCount, RowCount};

use crate::testing::{check_round_trip_encoding_of_data, TestCases};
use crate::{
testing::{check_round_trip_encoding_of_data, TestCases},
version::LanceFileVersion,
};

#[test_log::test(tokio::test)]
async fn test_fsst() {
Expand All @@ -218,6 +342,24 @@ mod tests {
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
check_round_trip_encoding_of_data(
vec![arr],
&TestCases::default().with_file_version(LanceFileVersion::V2_1),
HashMap::new(),
)
.await;

let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
.into_batch_rows(RowCount::from(1_000_000))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(
vec![arr],
&TestCases::default().with_file_version(LanceFileVersion::V2_1),
HashMap::new(),
)
.await;
}
}
16 changes: 14 additions & 2 deletions rust/lance-encoding/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use pb::{
nullable::{AllNull, NoNull, Nullability, SomeNull},
page_layout::Layout,
AllNullLayout, ArrayEncoding, Binary, BinaryMiniBlock, Bitpack2, Bitpacked, BitpackedForNonNeg,
Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable,
PackedStruct, PageLayout,
Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, MiniBlockLayout,
Nullable, PackedStruct, PageLayout,
};

use crate::encodings::physical::block_compress::CompressionConfig;
Expand Down Expand Up @@ -139,6 +139,18 @@ impl ProtobufUtils {
}
}

// Construct a `FsstMiniBlock` ArrayEncoding, the inner `binary_mini_block` encoding is actually
// not used and `FsstMiniBlockDecompressor` constructs a `binary_mini_block` in a `hard-coded` fashion.
// This can be an optimization later.
pub fn fsst_mini_block(data: ArrayEncoding, symbol_table: Vec<u8>) -> ArrayEncoding {
ArrayEncoding {
array_encoding: Some(ArrayEncodingEnum::FsstMiniBlock(Box::new(FsstMiniBlock {
binary_mini_block: Some(Box::new(data)),
symbol_table,
}))),
}
}

pub fn packed_struct(
child_encodings: Vec<ArrayEncoding>,
packed_buffer_index: u32,
Expand Down
Loading