Skip to content

Commit

Permalink
Merge branch 'main' into fsst-miniblock
Browse files Browse the repository at this point in the history
  • Loading branch information
broccoliSpicy authored Nov 13, 2024
2 parents afa1768 + ec76db4 commit a392d13
Show file tree
Hide file tree
Showing 21 changed files with 1,511 additions and 189 deletions.
14 changes: 14 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,19 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

/// A layout used for pages where the data is large
///
/// In this case the cost of transposing the data is relatively small (compared to the cost of writing the data)
/// and so we just zip the buffers together
message FullZipLayout {
// The number of bits of repetition info (0 if there is no repetition)
uint32 bits_rep = 1;
// The number of bits of definition info (0 if there is no definition)
uint32 bits_def = 2;
// Description of the compression of values
ArrayEncoding value_compression = 3;
}

/// A layout used for pages where all values are null
///
/// In addition, there can be no repetition levels and only a single definition level
Expand All @@ -333,5 +346,6 @@ message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
FullZipLayout full_zip_layout = 3;
}
}
66 changes: 66 additions & 0 deletions rust/lance-core/src/utils/bit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,69 @@ pub fn pad_bytes_u64<const ALIGN: u64>(n: u64) -> u64 {
debug_assert!(is_pwr_two(ALIGN));
(ALIGN - (n & (ALIGN - 1))) & (ALIGN - 1)
}

// This is a lookup table for the log2 of the first 256 numbers
const LOG_TABLE_256: [u8; 256] = [
0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
];

/// Returns the number of bits needed to represent the given number
///
/// Inspired by https://graphics.stanford.edu/~seander/bithacks.html
pub fn log_2_ceil(val: u32) -> u32 {
assert!(val > 0);
let upper_half = val >> 16;
if upper_half == 0 {
let third_quarter = val >> 8;
if third_quarter == 0 {
// Use lowest 8 bits (upper 24 are 0)
LOG_TABLE_256[val as usize] as u32
} else {
// Use bits 16..24 (0..16 are 0)
LOG_TABLE_256[third_quarter as usize] as u32 + 8
}
} else {
let first_quarter = upper_half >> 8;
if first_quarter == 0 {
// Use bits 8..16 (0..8 are 0)
16 + LOG_TABLE_256[upper_half as usize] as u32
} else {
// Use most significant bits (it's a big number!)
24 + LOG_TABLE_256[first_quarter as usize] as u32
}
}
}

#[cfg(test)]

pub mod tests {
use crate::utils::bit::log_2_ceil;

#[test]
fn test_log_2_ceil() {
fn classic_approach(mut val: u32) -> u32 {
let mut counter = 0;
while val > 0 {
val >>= 1;
counter += 1;
}
counter
}

for i in 1..(16 * 1024) {
assert_eq!(log_2_ceil(i), classic_approach(i));
}
assert_eq!(log_2_ceil(50 * 1024), classic_approach(50 * 1024));
assert_eq!(
log_2_ceil(1024 * 1024 * 1024),
classic_approach(1024 * 1024 * 1024)
);
}
}
5 changes: 5 additions & 0 deletions rust/lance-encoding/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl LanceBuffer {
/// Reinterprets a LanceBuffer into a Vec<T>
///
/// If the underlying buffer is not properly aligned, this will involve a copy of the data
///
/// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
/// of the data. Lance does not support big-endian machines so this is safe. However, if we end
/// up supporting big-endian machines in the future, then any use of this method will need to be
/// carefully reviewed.
pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> impl AsRef<[T]> {
let align = std::mem::align_of::<T>();
let is_aligned = self.as_ptr().align_offset(align) == 0;
Expand Down
36 changes: 35 additions & 1 deletion rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
}
}

pub struct FixedWidthDataBlockBuilder {
struct FixedWidthDataBlockBuilder {
bits_per_value: u64,
bytes_per_value: u64,
values: Vec<u8>,
Expand Down Expand Up @@ -493,6 +493,33 @@ impl FixedSizeListBlock {
}
}

struct FixedSizeListBlockBuilder {
inner: Box<dyn DataBlockBuilderImpl>,
dimension: u64,
}

impl FixedSizeListBlockBuilder {
fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
Self { inner, dimension }
}
}

impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>) {
let selection = selection.start * self.dimension..selection.end * self.dimension;
let fsl = data_block.as_fixed_size_list_mut_ref().unwrap();
self.inner.append(fsl.child.as_mut(), selection);
}

fn finish(self: Box<Self>) -> DataBlock {
let inner_block = self.inner.finish();
DataBlock::FixedSizeList(FixedSizeListBlock {
child: Box::new(inner_block),
dimension: self.dimension,
})
}
}

/// A data block with no regular structure. There is no available spot to attach
/// validity / repdef information and it cannot be converted to Arrow without being
/// decoded
Expand Down Expand Up @@ -914,6 +941,13 @@ impl DataBlock {
todo!()
}
}
Self::FixedSizeList(inner) => {
let inner_builder = inner.child.make_builder(estimated_size_bytes);
Box::new(FixedSizeListBlockBuilder::new(
inner_builder,
inner.dimension,
))
}
_ => todo!(),
}
}
Expand Down
25 changes: 20 additions & 5 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ use crate::encodings::logical::r#struct::{
use crate::encodings::physical::binary::BinaryMiniBlockDecompressor;
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -455,8 +456,14 @@ pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
}

pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
pub trait PerValueDecompressor: std::fmt::Debug + Send + Sync {
/// Decompress one or more values
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
/// The number of bits in each value
///
/// Returns 0 if the data type is variable-width
///
/// Currently (and probably long term) this must be a multiple of 8
fn bits_per_value(&self) -> u64;
}

Expand All @@ -470,10 +477,10 @@ pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
description: &pb::ArrayEncoding,
) -> Result<Box<dyn MiniBlockDecompressor>>;

fn create_fixed_per_value_decompressor(
fn create_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>>;
) -> Result<Box<dyn PerValueDecompressor>>;

fn create_block_decompressor(
&self,
Expand Down Expand Up @@ -506,14 +513,22 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
}
}

fn create_fixed_per_value_decompressor(
fn create_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>> {
) -> Result<Box<dyn PerValueDecompressor>> {
match description.array_encoding.as_ref().unwrap() {
pb::array_encoding::ArrayEncoding::Flat(flat) => {
Ok(Box::new(ValueDecompressor::new(flat)))
}
pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
let items_decompressor =
self.create_per_value_decompressor(fsl.items.as_ref().unwrap())?;
Ok(Box::new(FslPerValueDecompressor::new(
items_decompressor,
fsl.dimension as u64,
)))
}
_ => todo!(),
}
}
Expand Down
93 changes: 53 additions & 40 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
use arrow_schema::DataType;
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{
Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
Expand All @@ -32,6 +31,7 @@ use crate::encodings::physical::bitpack_fastlanes::{
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
use crate::encodings::physical::fixed_size_list::FslPerValueCompressor;
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::format::ProtobufUtils;
use crate::repdef::RepDefBuilder;
Expand Down Expand Up @@ -210,20 +210,30 @@ pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
}

/// Per-value compression must either:
///
/// A single buffer of fixed-width values
/// A single buffer of value data and a buffer of offsets
///
/// TODO: In the future we may allow metadata buffers
pub enum PerValueDataBlock {
Fixed(FixedWidthDataBlock),
Variable(VariableWidthBlock),
}

/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
///
/// Compared to [`VariablePerValueCompressor`], these compressors are capable of compressing the data
/// so that every value has the exact same number of bits per value. For example, this is useful
/// for encoding vector embeddings where every value has a fixed size but the values themselves are
/// too large to use mini-block.
/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock. This is because
/// we need to zip the data and those are the only two blocks we know how to zip today.
///
/// The advantage of a fixed-bytes-per-value is that we can do random access in 1 IOP instead of 2
/// and do not need a repetition index.
pub trait FixedPerValueCompressor: std::fmt::Debug + Send + Sync {
/// Compress the data into a single buffer where each value is encoded with the same number of bits
/// In addition, the compressed data must be able to be decompressed in a random-access fashion.
/// This means that the decompression algorithm must be able to decompress any value without
/// decompressing all values before it.
pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
/// Compress the data into a single buffer
///
/// Also returns a description of the compression that can be used to decompress when reading the data back
fn compress(&self, data: DataBlock) -> Result<(FixedWidthDataBlock, pb::ArrayEncoding)>;
fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
}

/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
Expand Down Expand Up @@ -408,11 +418,9 @@ pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
/// There are several different kinds of compression.
///
/// - Block compression is the most generic, but most difficult to use efficiently
/// - Fixed-per-value compression results in a fixed number of bits for each value
/// It is used for wide fixed-width types like vector embeddings.
/// - Variable-per-value compression results in two buffers, one buffer of offsets
/// and one buffer of data bytes. It is used for wide variable-width types
/// like strings, variable-length lists, binary, etc.
/// - Per-value compression results in either a fixed width data block or a variable
/// width data block. In other words, there is some number of bits per value.
/// In addition, each value should be independently decompressible.
/// - Mini-block compression results in a small block of opaque data for chunks
/// of rows. Each block is somewhere between 0 and 16KiB in size. This is
/// used for narrow data types (both fixed and variable length) where we can
Expand All @@ -425,19 +433,12 @@ pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;

/// Create a fixed-per-value compressor for the given data
fn create_fixed_per_value(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn FixedPerValueCompressor>>;

/// Create a variable-per-value compressor for the given data
fn create_variable_per_value(
/// Create a per-value compressor for the given data
fn create_per_value(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn VariablePerValueCompressor>>;
) -> Result<Box<dyn PerValueCompressor>>;

/// Create a mini-block compressor for the given data
fn create_miniblock_compressor(
Expand Down Expand Up @@ -830,23 +831,33 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
Ok(Box::new(ValueEncoder::default()))
}

fn create_fixed_per_value(
fn create_per_value(
&self,
field: &Field,
_data: &DataBlock,
) -> Result<Box<dyn FixedPerValueCompressor>> {
// Right now we only need block compressors for rep/def which is u16. Will need to expand
// this if we need block compression of other types.
assert!(field.data_type().byte_width() > 0);
Ok(Box::new(ValueEncoder::default()))
}

fn create_variable_per_value(
&self,
_field: &Field,
_data: &DataBlock,
) -> Result<Box<dyn VariablePerValueCompressor>> {
todo!()
data: &DataBlock,
) -> Result<Box<dyn PerValueCompressor>> {
match data {
DataBlock::FixedWidth(_) => {
let encoder = Box::new(ValueEncoder::default());
Ok(encoder)
}
DataBlock::VariableWidth(_variable_width) => {
todo!()
}
DataBlock::FixedSizeList(fsl) => {
let DataType::FixedSizeList(inner_field, field_dim) = field.data_type() else {
panic!("FSL data block without FSL field")
};
debug_assert_eq!(fsl.dimension, field_dim as u64);
let inner_compressor = self.create_per_value(
&inner_field.as_ref().try_into().unwrap(),
fsl.child.as_ref(),
)?;
let fsl_compressor = FslPerValueCompressor::new(inner_compressor, fsl.dimension);
Ok(Box::new(fsl_compressor))
}
_ => unreachable!(),
}
}

fn create_block_compressor(
Expand All @@ -855,6 +866,8 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
match data {
// Right now we only need block compressors for rep/def which is u16. Will need to expand
// this if we need block compression of other types.
DataBlock::FixedWidth(fixed_width) => {
let encoder = Box::new(ValueEncoder::default());
let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
Expand Down
Loading

0 comments on commit a392d13

Please sign in to comment.