Skip to content

Commit

Permalink
feat: finish up variable-length encodings in the full-zip path (#3344)
Browse files Browse the repository at this point in the history
This adds the last structural path for 2.1, full zip encoding of
variable length data. Scheduling this turned out to be a little trickier
than I had planned. There is no easy way to know where to slice the
fully-zipped buffer when doing decoding. Currently we settle this
problem by unzipping in the indirect scheduling task. There are some
alternative possibilities that I have documented but for now I think
this will be good enough and we can iterate on this going forwards.
  • Loading branch information
westonpace authored Jan 25, 2025
1 parent 6432a6b commit 5a92d31
Show file tree
Hide file tree
Showing 13 changed files with 811 additions and 196 deletions.
64 changes: 49 additions & 15 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,8 @@ message Binary {
uint64 null_adjustment = 3;
}

message BinaryMiniBlock {
}

message BinaryBlock {
}

message FsstMiniBlock {
ArrayEncoding BinaryMiniBlock = 1;
bytes symbol_table = 2;
message Variable {
uint32 bits_per_offset = 1;
}

message Fsst {
Expand Down Expand Up @@ -285,10 +278,8 @@ message ArrayEncoding {
BitpackedForNonNeg bitpacked_for_non_neg = 12;
Constant constant = 13;
Bitpack2 bitpack2 = 14;
BinaryMiniBlock binary_mini_block = 15;
FsstMiniBlock fsst_mini_block = 16;
BinaryBlock binary_block = 17;
PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 18;
Variable variable = 15;
PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 16;
}
}

Expand Down Expand Up @@ -316,6 +307,34 @@ message ColumnEncoding {
}
}

// # Standardized Interpretation of Counting Terms
//
// When working with 2.1 encodings we have a number of different "counting terms" and it can be
// difficult to understand what we mean when we are talking about a "number of values". Here is
// a standard interpretation of these terms:
//
// TODO: This is a newly added standardization and hasn't yet been applied to all code.
//
// To understand these definitions consider a data type FIXED_SIZE_LIST<LIST<INT32>>.
//
// A "value" is an abstract term when we aren't being specific.
//
// - num_rows: This is the highest level counting term. A single row includes everything in the
// fixed size list. This is what the user asks for when they asks for a range of rows.
// - num_elements: The number of elements is the number of rows multiplied by the dimension of any
// fixed size list wrappers. This is what you get when you flatten the FSL layer and
// is the starting point for structural encoding. Note that an element can be a list
// value or a single primitive value.
// - num_items: The number of items is the number of values in the repetition and definition vectors
// after everything has been flattened.
// - num_visible_items: The number of visible items is the number of items after invisible items
// have been removed. Invisible items are rep/def levels that don't correspond to an
// actual value.
//
// Note that we haven't exactly defined LIST<FIXED_SIZE_LIST<..>> yet. Both FIXED_SIZE_LIST<LIST<..>>
// and LIST<FIXED_SIZE_LIST<..>> haven't been fully implemented and tested.

/// Describes the meaning of each repdef layer in a mini-block layout
enum RepDefLayer {
// Should never be used, included for debugging purporses and general protobuf best practice
REPDEF_UNSPECIFIED = 0;
Expand Down Expand Up @@ -375,10 +394,25 @@ message FullZipLayout {
uint32 bits_rep = 1;
// The number of bits of definition info (0 if there is no definition)
uint32 bits_def = 2;
// The number of bits of value info
//
// Note: we use bits here (and not bytes) for consistency with other encodings. However, in practice,
// there is never a reason to use a bits per value that is not a multiple of 8. The complexity is not
// worth the small savings in space since this encoding is typically used with large values already.
oneof details {
// If this is a fixed width block then we need to have a fixed number of bits per value
uint32 bits_per_value = 3;
// If this is a variable width block then we need to have a fixed number of bits per offset
uint32 bits_per_offset = 4;
}
// The number of items in the page
uint32 num_items = 5;
// The number of visible items in the page
uint32 num_visible_items = 6;
// Description of the compression of values
ArrayEncoding value_compression = 3;
ArrayEncoding value_compression = 7;
// The meaning of each repdef layer, used to interpret repdef buffers correctly
repeated RepDefLayer layers = 4;
repeated RepDefLayer layers = 8;
}

/// A layout used for pages where all values are null
Expand Down
10 changes: 10 additions & 0 deletions rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,16 @@ impl VariableWidthBlock {
})
}

pub fn offsets_as_block(&mut self) -> DataBlock {
let offsets = self.offsets.borrow_and_clone();
DataBlock::FixedWidth(FixedWidthDataBlock {
data: offsets,
bits_per_value: self.bits_per_offset as u64,
num_values: self.num_values + 1,
block_info: BlockInfo::new(),
})
}

pub fn data_size(&self) -> u64 {
(self.data.len() + self.offsets.len()) as u64
}
Expand Down
51 changes: 37 additions & 14 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ use lance_core::{Error, Result};
use tracing::instrument;

use crate::buffer::LanceBuffer;
use crate::data::DataBlock;
use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
use crate::encoder::{values_column_encoding, EncodedBatch};
use crate::encodings::logical::binary::BinaryFieldScheduler;
use crate::encodings::logical::blob::BlobFieldScheduler;
Expand All @@ -248,7 +248,9 @@ use crate::encodings::logical::primitive::{
use crate::encodings::logical::r#struct::{
SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
};
use crate::encodings::physical::binary::{BinaryBlockDecompressor, BinaryMiniBlockDecompressor};
use crate::encodings::physical::binary::{
BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
};
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
Expand Down Expand Up @@ -459,17 +461,20 @@ pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
}

pub trait PerValueDecompressor: std::fmt::Debug + Send + Sync {
pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
/// Decompress one or more values
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock>;
/// The number of bits in each value
///
/// Returns 0 if the data type is variable-width
///
/// Currently (and probably long term) this must be a multiple of 8
fn bits_per_value(&self) -> u64;
}

pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
/// Decompress one or more values
fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
}

pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: LanceBuffer) -> Result<DataBlock>;
}
Expand All @@ -480,10 +485,15 @@ pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
description: &pb::ArrayEncoding,
) -> Result<Box<dyn MiniBlockDecompressor>>;

fn create_per_value_decompressor(
fn create_fixed_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>>;

fn create_variable_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn PerValueDecompressor>>;
) -> Result<Box<dyn VariablePerValueDecompressor>>;

fn create_block_decompressor(
&self,
Expand All @@ -506,10 +516,10 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
}
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => {
pb::array_encoding::ArrayEncoding::Variable(_) => {
Ok(Box::new(BinaryMiniBlockDecompressor::default()))
}
pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => {
pb::array_encoding::ArrayEncoding::Fsst(description) => {
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
}
pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
Expand All @@ -521,15 +531,28 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
}
}

fn create_per_value_decompressor(
fn create_fixed_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn PerValueDecompressor>> {
) -> Result<Box<dyn FixedPerValueDecompressor>> {
match description.array_encoding.as_ref().unwrap() {
pb::array_encoding::ArrayEncoding::Flat(flat) => {
Ok(Box::new(ValueDecompressor::new(flat)))
}
_ => todo!(),
_ => todo!("fixed-per-value decompressor for {:?}", description),
}
}

fn create_variable_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn VariablePerValueDecompressor>> {
match description.array_encoding.as_ref().unwrap() {
&pb::array_encoding::ArrayEncoding::Variable(variable) => {
assert!(variable.bits_per_offset < u8::MAX as u32);
Ok(Box::new(VariableDecoder::default()))
}
_ => todo!("variable-per-value decompressor for {:?}", description),
}
}

Expand All @@ -548,7 +571,7 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
constant.num_values,
)))
}
pb::array_encoding::ArrayEncoding::BinaryBlock(_) => {
pb::array_encoding::ArrayEncoding::Variable(_) => {
Ok(Box::new(BinaryBlockDecompressor::default()))
}
_ => todo!(),
Expand Down
45 changes: 34 additions & 11 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ use crate::encodings::logical::list::ListStructuralEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
use crate::encodings::physical::binary::{BinaryBlockEncoder, BinaryMiniBlockEncoder};
use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
use crate::encodings::physical::bitpack_fastlanes::{
compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder,
};
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
use crate::encodings::physical::fsst::{
FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
};
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
use crate::format::ProtobufUtils;
Expand Down Expand Up @@ -217,11 +219,21 @@ pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
/// A single buffer of value data and a buffer of offsets
///
/// TODO: In the future we may allow metadata buffers
#[derive(Debug)]
pub enum PerValueDataBlock {
Fixed(FixedWidthDataBlock),
Variable(VariableWidthBlock),
}

impl PerValueDataBlock {
pub fn data_size(&self) -> u64 {
match self {
Self::Fixed(fixed) => fixed.data_size(),
Self::Variable(variable) => variable.data_size(),
}
}
}

/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
///
/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock. This is because
Expand Down Expand Up @@ -884,8 +896,23 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
let encoder = Box::new(ValueEncoder::default());
Ok(encoder)
}
DataBlock::VariableWidth(_variable_width) => {
todo!()
DataBlock::VariableWidth(variable_width) => {
if variable_width.bits_per_offset == 32 {
let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);

let variable_compression = Box::new(VariableEncoder::default());

if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
&& data_size >= FSST_LEAST_INPUT_SIZE as u64
{
Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
} else {
Ok(variable_compression)
}
} else {
todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
}
}
_ => unreachable!(),
}
Expand All @@ -905,13 +932,9 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
Ok((encoder, encoding))
}
DataBlock::VariableWidth(variable_width) => {
if variable_width.bits_per_offset == 32 {
let encoder = Box::new(BinaryBlockEncoder::default());
let encoding = ProtobufUtils::binary_block();
Ok((encoder, encoding))
} else {
todo!("Implement BlockCompression for VariableWidth DataBlock with 64 bits offsets.")
}
let encoder = Box::new(VariableEncoder::default());
let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
Ok((encoder, encoding))
}
_ => unreachable!(),
}
Expand Down
Loading

0 comments on commit 5a92d31

Please sign in to comment.