From b5abd00a0302ed5e0d5bccb175a37f32753bd4c9 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 22 Nov 2024 12:55:52 +0100 Subject: [PATCH 1/5] move variable to its own folder --- crates/polars-row/src/{variable.rs => variable/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename crates/polars-row/src/{variable.rs => variable/mod.rs} (100%) diff --git a/crates/polars-row/src/variable.rs b/crates/polars-row/src/variable/mod.rs similarity index 100% rename from crates/polars-row/src/variable.rs rename to crates/polars-row/src/variable/mod.rs From 3f94f05e992114e2b66ce91faf4a0ade7cdf6d59 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 22 Nov 2024 16:37:29 +0100 Subject: [PATCH 2/5] working varint encoding --- crates/polars-row/src/encode.rs | 1 + crates/polars-row/src/variable/mod.rs | 2 + crates/polars-row/src/variable/varint.rs | 198 +++++++++++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 crates/polars-row/src/variable/varint.rs diff --git a/crates/polars-row/src/encode.rs b/crates/polars-row/src/encode.rs index 65bf08b3fa29..09c4f8b602d3 100644 --- a/crates/polars-row/src/encode.rs +++ b/crates/polars-row/src/encode.rs @@ -6,6 +6,7 @@ use arrow::array::{ }; use arrow::bitmap::Bitmap; use arrow::datatypes::ArrowDataType; +use arrow::trusted_len::TrustMyLength; use arrow::types::{NativeType, Offset}; use crate::fixed::{get_null_sentinel, FixedLengthEncoding}; diff --git a/crates/polars-row/src/variable/mod.rs b/crates/polars-row/src/variable/mod.rs index 15f25143348f..fabbb58a78d0 100644 --- a/crates/polars-row/src/variable/mod.rs +++ b/crates/polars-row/src/variable/mod.rs @@ -10,6 +10,8 @@ //! - `0xFF_u8` if this is not the last block for this string //! - otherwise the length of the block as a `u8` +pub(crate) mod varint; + use std::mem::MaybeUninit; use arrow::array::{BinaryArray, BinaryViewArray, MutableBinaryViewArray}; diff --git a/crates/polars-row/src/variable/varint.rs b/crates/polars-row/src/variable/varint.rs new file mode 100644 index 000000000000..ad3529cd3fc2 --- /dev/null +++ b/crates/polars-row/src/variable/varint.rs @@ -0,0 +1,198 @@ +use std::mem::MaybeUninit; + +use arrow::array::PrimitiveArray; +use arrow::types::NativeType; +use bytemuck::Pod; +use polars_utils::slice::Slice2Uninit; + +use super::get_null_sentinel; +use crate::EncodingField; + +pub(crate) trait VarIntEncoding: Pod { + const IS_SIGNED: bool; + const INLINE_MASK: u8 = if Self::IS_SIGNED { 0x1F } else { 0x3F }; + const INLINE_THRESHOLD: usize = Self::INLINE_MASK.count_ones() as usize; + + #[inline(always)] + fn msb_to_byte_length(msb: u32) -> usize { + let msb = msb as usize; + 1 + if msb <= Self::INLINE_THRESHOLD { + 0 + } else { + (msb + usize::from(Self::IS_SIGNED)).div_ceil(8) + } + } + + fn msb(self) -> u32; + fn len_from_item(value: Option) -> usize { + match value { + None => 1, + Some(v) => Self::msb_to_byte_length(v.msb()), + } + } + + unsafe fn encode_one( + value: Option, + buffer: &mut [MaybeUninit], + offset: &mut usize, + field: &EncodingField, + ); + unsafe fn decode_one(buffer: &mut &[u8], field: &EncodingField) -> Option; +} + +pub(crate) unsafe fn len_from_buffer(buffer: &[u8]) -> usize { + let b = *buffer.get_unchecked(0); + 1 + if b & 0xC0 == 0xC0 { + (b & 0x3F) as usize + } else { + 0 + } +} + +macro_rules! implement_varint { + ($($t:ty,)+) => { + $( + impl VarIntEncoding for $t { + #[allow(unused_comparisons)] + const IS_SIGNED: bool = Self::MIN < 0; + + fn msb(self) -> u32 { + let mut v = self; + #[allow(unused_comparisons)] + if Self::IS_SIGNED && v < 0 { + v = !v; + } + Self::BITS - v.leading_zeros() + + } + + unsafe fn encode_one( + value: Option, + buffer: &mut [MaybeUninit], + offset: &mut usize, + field: &EncodingField, + ) { + match value { + None => { + buffer[*offset] = MaybeUninit::new(get_null_sentinel(field)); + *offset += 1; + }, + Some(v) => { + let msb = v.msb(); + if msb as usize <= Self::INLINE_THRESHOLD { + let mut b = v.to_le_bytes()[0] & 0x3F; + + #[allow(unused_comparisons)] + if Self::IS_SIGNED { + b ^= 0x20; + } + + b |= 0x80; + + if field.descending { + b = !b; + } + + buffer[*offset] = MaybeUninit::new(b); + *offset += 1; + } else { + let byte_length = Self::msb_to_byte_length(msb); + debug_assert!(byte_length < 64); + buffer[*offset] = + MaybeUninit::new(0xC0 | (((byte_length - 2) & 0x3F) as u8)); + + buffer[*offset + 1..*offset + byte_length].copy_from_slice( + &v.to_be_bytes().as_ref().as_uninit() + [size_of::() - (byte_length - 1)..], + ); + + if Self::IS_SIGNED { + *buffer[*offset + 1].assume_init_mut() ^= 0x80; + } + + if field.descending { + buffer[*offset + 1..*offset + byte_length] + .iter_mut() + .for_each(|v| *v = MaybeUninit::new(!*v.assume_init_ref())); + } + *offset += byte_length; + } + }, + } + } + + unsafe fn decode_one(buffer: &mut &[u8], field: &EncodingField) -> Option { + let null_sentinel = get_null_sentinel(field); + + let sentinel_byte = buffer[0]; + *buffer = &buffer[1..]; + + if sentinel_byte == null_sentinel { + return None; + } + + if sentinel_byte & 0x40 == 0 { + // Inlined + let sentinel_byte = if field.descending { + !sentinel_byte + } else { + sentinel_byte + }; + + let mut value = (sentinel_byte & 0x3F) as Self; + if Self::IS_SIGNED { + value ^= 0x20; + + // Sign-extend + value <<= Self::BITS - 5; + value >>= Self::BITS - 5; + } + Some(value) + } else { + let byte_length = (sentinel_byte & 0x3F) as usize + 1; + let mut intermediate = [0u8; size_of::()]; + intermediate[size_of::() - byte_length..] + .copy_from_slice(&buffer[..byte_length]); + + let mut v = Self::from_be_bytes(intermediate); + + if Self::IS_SIGNED { + v ^= 1 << (byte_length * 8 - 1); + + // Sign-extend + v <<= (size_of::() - byte_length) * 8; + v >>= (size_of::() - byte_length) * 8; + } + + if field.descending { + v = !v; + } + + *buffer = &buffer[byte_length..]; + Some(v) + } + } + } + )+ + }; +} + +pub(crate) unsafe fn encode_iter( + buffer: &mut [MaybeUninit], + iter: impl Iterator>, + field: &EncodingField, + offsets: &mut [usize], +) { + for (opt_value, offset) in iter.zip(offsets) { + T::encode_one(opt_value, buffer, offset, field); + } +} + +pub(crate) unsafe fn decode( + rows: &mut [&[u8]], + field: &EncodingField, +) -> PrimitiveArray { + PrimitiveArray::from_iter(rows.iter_mut().map(|row| T::decode_one(row, field))) +} + +implement_varint![i32, u32, usize,]; From 543e420306c8a310ac1b2ae7f5b1aabab2604848 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 22 Nov 2024 16:59:34 +0100 Subject: [PATCH 3/5] integrate varint into dictionary encoding --- crates/polars-row/src/encode.rs | 50 +++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/crates/polars-row/src/encode.rs b/crates/polars-row/src/encode.rs index 09c4f8b602d3..9f80a6cc824a 100644 --- a/crates/polars-row/src/encode.rs +++ b/crates/polars-row/src/encode.rs @@ -494,12 +494,34 @@ fn get_encoder(array: &dyn Array, field: &EncodingField, row_widths: &mut RowWid .as_any() .downcast_ref::>() .unwrap(); - let iter = dc_array - .iter_typed::() - .unwrap() - .map(|opt_s| opt_s.map_or(0, |s| s.len())); - // @TODO: Do a better job here. This is just plainly incorrect. - biniter_num_column_bytes(array, iter, dc_array.validity(), field, row_widths) + + if field.no_order { + let widths = + if dc_array.values().len() < 64 { + let mut widths = RowWidths::new(row_widths.num_rows()); + row_widths.push_constant(1); + widths.push_constant(1); + widths + } else { + row_widths.append_iter(unsafe { + TrustMyLength::new(dc_array.keys_iter().map(|k| { + ::len_from_item(k) + }), dc_array.len()) + }) + }; + + Encoder { + widths, + array: array.to_boxed(), + state: EncoderState::Stateless, + } + } else { + let iter = dc_array + .iter_typed::() + .unwrap() + .map(|opt_s| opt_s.map_or(0, |s| s.len())); + biniter_num_column_bytes(array, iter, dc_array.validity(), field, row_widths) + } }, D::Union(_, _, _) => todo!(), D::Map(_, _) => todo!(), @@ -605,11 +627,17 @@ unsafe fn encode_flat_array( .as_any() .downcast_ref::>() .unwrap(); - let iter = dc_array - .iter_typed::() - .unwrap() - .map(|opt_s| opt_s.map(|s| s.as_bytes())); - crate::variable::encode_iter(buffer, iter, field, offsets); + + if field.no_order { + let iter = unsafe { TrustMyLength::new(dc_array.keys_iter(), dc_array.len()) }; + crate::variable::varint::encode_iter(buffer, iter, field, offsets); + } else { + let iter = dc_array + .iter_typed::() + .unwrap() + .map(|opt_s| opt_s.map(|s| s.as_bytes())); + crate::variable::encode_iter(buffer, iter, field, offsets); + } }, D::FixedSizeBinary(_) => todo!(), From 861dbe909b923d037376a28d1f5a07ad4e0d782b Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 22 Nov 2024 17:35:33 +0100 Subject: [PATCH 4/5] Add docs --- crates/polars-row/src/variable/varint.rs | 102 ++++++++++++++++------- 1 file changed, 72 insertions(+), 30 deletions(-) diff --git a/crates/polars-row/src/variable/varint.rs b/crates/polars-row/src/variable/varint.rs index ad3529cd3fc2..0aa459db168e 100644 --- a/crates/polars-row/src/variable/varint.rs +++ b/crates/polars-row/src/variable/varint.rs @@ -1,3 +1,41 @@ +//! A variable-length integer encoding for the `polars-row` encoding. +//! +//! This compresses integers close to 0 to less bytes than the fixed size encoding. This can save +//! quite a lot of memory if most of your integers are close to zero (e.g. with dictionary keys). +//! +//! The encoding works as follows. +//! +//! Each value starts with a *sentinel` byte. This byte is build up as follows. +//! +//! +-----------------------------+ +//! | b7 : b6 : b5 b4 b3 b2 b1 b0 | +//! +-----------------------------+ +//! +//! * `b7` encodes the validity of the element: `0` means `missing`, `1` means `valid`. +//! * `b6` determines the meaning of `b5` to `b0`. +//! * `b5` to `b0`: +//! * `b6 == 0`, `b5` to `b0` is a 6-bit unsigned integer that encodes the value. +//! * `b6 == 1`, `b5` to `b0` is a 6-bit unsigned integer that encodes the additional byte-length +//! minus 1. +//! +//! If `b6 == 1`, the additional bytes encode the entirety of the value. +//! +//! Therefore, the following holds for byte sizes with 16-bit signed and unsigned integers. +//! +//! +-----------+------------------+-----------------+ +//! | Byte Size | Signed Values | Unsigned Values | +//! | 1 | -32 to 31 | 0 to 63 | +//! | 2 | -128 to -33 & | 64 to 255 | +//! | | 32 to 127 | | +//! | 3 | -32768 to -129 & | 256 to 65535 | +//! | | 128 to 32767 | | +//! +-----------+------------------+-----------------+ +//! +//! This can be extrapolated to other sizes of integers. +//! +//! Note 1. For signed integers the sign bit is flipped. This fixes the sort order. For example, +//! `0` would be smaller than `-1` without this. +//! Note 2. Given values represent the values for `descending=False` and `nulls_last=False`. use std::mem::MaybeUninit; use arrow::array::PrimitiveArray; @@ -10,13 +48,12 @@ use crate::EncodingField; pub(crate) trait VarIntEncoding: Pod { const IS_SIGNED: bool; - const INLINE_MASK: u8 = if Self::IS_SIGNED { 0x1F } else { 0x3F }; - const INLINE_THRESHOLD: usize = Self::INLINE_MASK.count_ones() as usize; + const INLINE_MSB_THRESHOLD: usize = if Self::IS_SIGNED { 5 } else { 6 }; #[inline(always)] fn msb_to_byte_length(msb: u32) -> usize { let msb = msb as usize; - 1 + if msb <= Self::INLINE_THRESHOLD { + 1 + if msb <= Self::INLINE_MSB_THRESHOLD { 0 } else { (msb + usize::from(Self::IS_SIGNED)).div_ceil(8) @@ -72,49 +109,53 @@ macro_rules! implement_varint { offset: &mut usize, field: &EncodingField, ) { + let null_sentinel = get_null_sentinel(field); + match value { None => { - buffer[*offset] = MaybeUninit::new(get_null_sentinel(field)); + buffer[*offset] = MaybeUninit::new(null_sentinel); *offset += 1; }, Some(v) => { let msb = v.msb(); - if msb as usize <= Self::INLINE_THRESHOLD { - let mut b = v.to_le_bytes()[0] & 0x3F; + if msb as usize <= Self::INLINE_MSB_THRESHOLD { + let mut sentinel = null_sentinel ^ 0x80; + sentinel |= v.to_le_bytes()[0] & 0x3F; - #[allow(unused_comparisons)] if Self::IS_SIGNED { - b ^= 0x20; + // Flip sign bit + sentinel ^= 0x20; } - - b |= 0x80; - if field.descending { - b = !b; + sentinel = !sentinel; } - buffer[*offset] = MaybeUninit::new(b); + buffer[*offset] = MaybeUninit::new(sentinel); *offset += 1; } else { let byte_length = Self::msb_to_byte_length(msb); - debug_assert!(byte_length < 64); - buffer[*offset] = - MaybeUninit::new(0xC0 | (((byte_length - 2) & 0x3F) as u8)); + let additional_bytes = byte_length - 1; + debug_assert!(additional_bytes > 0 && additional_bytes <= 64); + + let mut sentinel = null_sentinel ^ 0x80; + sentinel |= 0x40; // not inlined + sentinel |= (additional_bytes - 1) as u8; + buffer[*offset] = MaybeUninit::new(sentinel); - buffer[*offset + 1..*offset + byte_length].copy_from_slice( - &v.to_be_bytes().as_ref().as_uninit() - [size_of::() - (byte_length - 1)..], - ); + let bytes = v.to_be_bytes(); + let bytes = &bytes.as_ref().as_uninit()[size_of::() - (byte_length - 1)..]; + buffer[*offset + 1..*offset + byte_length].copy_from_slice( bytes); if Self::IS_SIGNED { + // Flip sign bit *buffer[*offset + 1].assume_init_mut() ^= 0x80; } - if field.descending { buffer[*offset + 1..*offset + byte_length] .iter_mut() .for_each(|v| *v = MaybeUninit::new(!*v.assume_init_ref())); } + *offset += byte_length; } }, @@ -131,16 +172,17 @@ macro_rules! implement_varint { return None; } - if sentinel_byte & 0x40 == 0 { - // Inlined - let sentinel_byte = if field.descending { - !sentinel_byte - } else { - sentinel_byte - }; + let sentinel_byte = if field.descending { + !sentinel_byte + } else { + sentinel_byte + }; + let is_inlined = sentinel_byte & 0x40 == 0; + if is_inlined { let mut value = (sentinel_byte & 0x3F) as Self; if Self::IS_SIGNED { + // Flip sign bit value ^= 0x20; // Sign-extend @@ -150,20 +192,20 @@ macro_rules! implement_varint { Some(value) } else { let byte_length = (sentinel_byte & 0x3F) as usize + 1; + let mut intermediate = [0u8; size_of::()]; intermediate[size_of::() - byte_length..] .copy_from_slice(&buffer[..byte_length]); - let mut v = Self::from_be_bytes(intermediate); if Self::IS_SIGNED { + // Flip sign bit v ^= 1 << (byte_length * 8 - 1); // Sign-extend v <<= (size_of::() - byte_length) * 8; v >>= (size_of::() - byte_length) * 8; } - if field.descending { v = !v; } From a506e5568168515c54deb499201e7e840ce11847 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Mon, 25 Nov 2024 15:28:07 +0100 Subject: [PATCH 5/5] this one is a bit doomed --- crates/polars-arrow/src/datatypes/mod.rs | 8 + .../src/chunked_array/ops/row_encode.rs | 1 + crates/polars-python/src/dataframe/general.rs | 5 +- crates/polars-python/src/series/general.rs | 5 +- crates/polars-row/src/decode.rs | 27 ++- crates/polars-row/src/encode.rs | 79 ++++++-- crates/polars-row/src/row.rs | 4 + crates/polars-row/src/utils.rs | 19 ++ crates/polars-row/src/variable/varint.rs | 190 ++++++++++-------- py-polars/polars/dataframe/frame.py | 3 +- py-polars/polars/series/series.py | 3 +- 11 files changed, 227 insertions(+), 117 deletions(-) diff --git a/crates/polars-arrow/src/datatypes/mod.rs b/crates/polars-arrow/src/datatypes/mod.rs index 21a2d26e45a0..6a5d6e29d2a3 100644 --- a/crates/polars-arrow/src/datatypes/mod.rs +++ b/crates/polars-arrow/src/datatypes/mod.rs @@ -385,6 +385,14 @@ impl ArrowDataType { ) } + pub fn is_integer(&self) -> bool { + use ArrowDataType as D; + matches!( + self, + D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64 + ) + } + pub fn to_fixed_size_list(self, size: usize, is_nullable: bool) -> ArrowDataType { ArrowDataType::FixedSizeList( Box::new(Field::new( diff --git a/crates/polars-core/src/chunked_array/ops/row_encode.rs b/crates/polars-core/src/chunked_array/ops/row_encode.rs index 2b683bef534b..0932a89b60d2 100644 --- a/crates/polars-core/src/chunked_array/ops/row_encode.rs +++ b/crates/polars-core/src/chunked_array/ops/row_encode.rs @@ -174,6 +174,7 @@ pub fn _get_rows_encoded( descending: *desc, nulls_last: *null_last, no_order: false, + enable_varint: false, }; cols.push(arr); fields.push(sort_field); diff --git a/crates/polars-python/src/dataframe/general.rs b/crates/polars-python/src/dataframe/general.rs index 9363d98fe76c..df7a7231d177 100644 --- a/crates/polars-python/src/dataframe/general.rs +++ b/crates/polars-python/src/dataframe/general.rs @@ -716,7 +716,7 @@ impl PyDataFrame { fn _row_encode<'py>( &'py self, py: Python<'py>, - fields: Vec<(bool, bool, bool)>, + fields: Vec<(bool, bool, bool, bool)>, ) -> PyResult { py.allow_threads(|| { let mut df = self.df.clone(); @@ -732,10 +732,11 @@ impl PyDataFrame { let fields = fields .into_iter() .map( - |(descending, nulls_last, no_order)| polars_row::EncodingField { + |(descending, nulls_last, no_order, enable_varint)| polars_row::EncodingField { descending, nulls_last, no_order, + enable_varint, }, ) .collect::>(); diff --git a/crates/polars-python/src/series/general.rs b/crates/polars-python/src/series/general.rs index 88afa077b2e6..e6bb45bb21e5 100644 --- a/crates/polars-python/src/series/general.rs +++ b/crates/polars-python/src/series/general.rs @@ -541,7 +541,7 @@ impl PySeries { &'py self, py: Python<'py>, dtypes: Vec<(String, Wrap)>, - fields: Vec<(bool, bool, bool)>, + fields: Vec<(bool, bool, bool, bool)>, ) -> PyResult { py.allow_threads(|| { assert_eq!(dtypes.len(), fields.len()); @@ -549,10 +549,11 @@ impl PySeries { let fields = fields .into_iter() .map( - |(descending, nulls_last, no_order)| polars_row::EncodingField { + |(descending, nulls_last, no_order, enable_varint)| polars_row::EncodingField { descending, nulls_last, no_order, + enable_varint, }, ) .collect::>(); diff --git a/crates/polars-row/src/decode.rs b/crates/polars-row/src/decode.rs index 060733681de1..b23217051dbc 100644 --- a/crates/polars-row/src/decode.rs +++ b/crates/polars-row/src/decode.rs @@ -70,13 +70,13 @@ unsafe fn decode_validity(rows: &mut [&[u8]], field: &EncodingField) -> Option usize { // Fast path: if the size is fixed, we can just divide. - if let Some(size) = fixed_size(dtype) { + if let Some(size) = fixed_size(dtype, field) { return size; } @@ -129,6 +129,16 @@ fn dtype_and_data_to_encoded_item_len( item_len }, + D::Int8 => ::len_from_buffer(data, field), + D::Int16 => ::len_from_buffer(data, field), + D::Int32 => ::len_from_buffer(data, field), + D::Int64 => ::len_from_buffer(data, field), + + D::UInt8 => ::len_from_buffer(data, field), + D::UInt16 => ::len_from_buffer(data, field), + D::UInt32 => ::len_from_buffer(data, field), + D::UInt64 => ::len_from_buffer(data, field), + D::Union(_, _, _) => todo!(), D::Map(_, _) => todo!(), D::Dictionary(_, _, _) => todo!(), @@ -152,7 +162,7 @@ fn rows_for_fixed_size_list<'a>( nested_rows.reserve(rows.len() * width); // Fast path: if the size is fixed, we can just divide. - if let Some(size) = fixed_size(dtype) { + if let Some(size) = fixed_size(dtype, field) { for row in rows.iter_mut() { for i in 0..width { nested_rows.push(&row[(i * size)..][..size]); @@ -204,7 +214,7 @@ fn rows_for_fixed_size_list<'a>( // @TODO: This is quite slow since we need to dispatch for possibly every nested type for row in rows.iter_mut() { for _ in 0..width { - let length = dtype_and_data_to_encoded_item_len(dtype, row, field); + let length = unsafe { dtype_and_data_to_encoded_item_len(dtype, row, field) }; let v; (v, *row) = row.split_at(length); nested_rows.push(v); @@ -223,7 +233,7 @@ fn offsets_from_dtype_and_data( offsets.clear(); // Fast path: if the size is fixed, we can just divide. - if let Some(size) = fixed_size(dtype) { + if let Some(size) = fixed_size(dtype, field) { assert!(size == 0 || data.len() % size == 0); offsets.extend((0..data.len() / size).map(|i| i * size)); return; @@ -267,7 +277,7 @@ fn offsets_from_dtype_and_data( let mut data = data; let mut offset = 0; while !data.is_empty() { - let length = dtype_and_data_to_encoded_item_len(dtype, data, field); + let length = unsafe { dtype_and_data_to_encoded_item_len(dtype, data, field) }; offsets.push(offset); data = &data[length..]; offset += length; @@ -362,6 +372,11 @@ unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, dtype: &ArrowDataTyp ) .to_boxed() }, + dt if field.enable_varint && dt.is_integer() => { + with_match_arrow_integer_type!(dt, |$T| { + crate::variable::varint::decode::<$T>(rows, field).to_boxed() + }) + }, dt => { with_match_arrow_primitive_type!(dt, |$T| { decode_primitive::<$T>(rows, field).to_boxed() diff --git a/crates/polars-row/src/encode.rs b/crates/polars-row/src/encode.rs index 9f80a6cc824a..c2b7e94c9771 100644 --- a/crates/polars-row/src/encode.rs +++ b/crates/polars-row/src/encode.rs @@ -11,7 +11,8 @@ use arrow::types::{NativeType, Offset}; use crate::fixed::{get_null_sentinel, FixedLengthEncoding}; use crate::row::{EncodingField, RowsEncoded}; -use crate::{with_match_arrow_primitive_type, ArrayRef}; +use crate::variable::varint::VarIntEncoding; +use crate::{with_match_arrow_integer_type, with_match_arrow_primitive_type, ArrayRef}; pub fn convert_columns( num_rows: usize, @@ -324,13 +325,33 @@ fn biniter_num_column_bytes( } } +fn varint_get_encoder( + array: &dyn Array, + row_widths: &mut RowWidths, +) -> Encoder { + let dc_array = array.as_any().downcast_ref::>().unwrap(); + let validity = dc_array.validity(); + + let widths = if validity.is_none() { + row_widths.append_iter(dc_array.values_iter().map(|v| T::len_from_item(Some(*v)))) + } else { + row_widths.append_iter(dc_array.iter().map(|v| T::len_from_item(v.copied()))) + }; + + Encoder { + widths, + array: array.to_boxed(), + state: EncoderState::Stateless, + } +} + /// Get the encoder for a specific array. fn get_encoder(array: &dyn Array, field: &EncodingField, row_widths: &mut RowWidths) -> Encoder { use ArrowDataType as D; let dtype = array.dtype(); // Fast path: column has a fixed size encoding - if let Some(size) = fixed_size(dtype) { + if let Some(size) = fixed_size(dtype, field) { row_widths.push_constant(size); let state = match dtype { D::FixedSizeList(_, width) => { @@ -413,6 +434,15 @@ fn get_encoder(array: &dyn Array, field: &EncodingField, row_widths: &mut RowWid } }, + D::Int8 => varint_get_encoder::(array, row_widths), + D::Int16 => varint_get_encoder::(array, row_widths), + D::Int32 => varint_get_encoder::(array, row_widths), + D::Int64 => varint_get_encoder::(array, row_widths), + D::UInt8 => varint_get_encoder::(array, row_widths), + D::UInt16 => varint_get_encoder::(array, row_widths), + D::UInt32 => varint_get_encoder::(array, row_widths), + D::UInt64 => varint_get_encoder::(array, row_widths), + D::List(_) => list_num_column_bytes::(array, field, row_widths), D::LargeList(_) => list_num_column_bytes::(array, field, row_widths), @@ -578,10 +608,18 @@ unsafe fn encode_flat_array( let array = array.as_any().downcast_ref::().unwrap(); crate::fixed::encode_iter(buffer, array.iter(), field, offsets); }, - dt if dt.is_numeric() => with_match_arrow_primitive_type!(dt, |$T| { - let array = array.as_any().downcast_ref::>().unwrap(); - encode_primitive(buffer, array, field, offsets); - }), + dt if dt.is_integer() && field.enable_varint => { + with_match_arrow_integer_type!(dt, |$T| { + let array = array.as_any().downcast_ref::>().unwrap(); + crate::variable::varint::encode_iter(buffer, array.iter().map(|v| v.copied()), field, offsets); + }) + }, + dt if dt.is_numeric() => { + with_match_arrow_primitive_type!(dt, |$T| { + let array = array.as_any().downcast_ref::>().unwrap(); + encode_primitive(buffer, array, field, offsets); + }) + }, D::Binary => { let array = array.as_any().downcast_ref::>().unwrap(); @@ -830,26 +868,33 @@ unsafe fn encode_primitive( } } -pub fn fixed_size(dtype: &ArrowDataType) -> Option { +pub fn fixed_size(dtype: &ArrowDataType, field: &EncodingField) -> Option { use ArrowDataType::*; + + if !field.enable_varint { + match dtype { + UInt8 => return Some(u8::ENCODED_LEN), + UInt16 => return Some(u16::ENCODED_LEN), + UInt32 => return Some(u32::ENCODED_LEN), + UInt64 => return Some(u64::ENCODED_LEN), + Int8 => return Some(i8::ENCODED_LEN), + Int16 => return Some(i16::ENCODED_LEN), + Int32 => return Some(i32::ENCODED_LEN), + Int64 => return Some(i64::ENCODED_LEN), + _ => {}, + } + } + Some(match dtype { - UInt8 => u8::ENCODED_LEN, - UInt16 => u16::ENCODED_LEN, - UInt32 => u32::ENCODED_LEN, - UInt64 => u64::ENCODED_LEN, - Int8 => i8::ENCODED_LEN, - Int16 => i16::ENCODED_LEN, - Int32 => i32::ENCODED_LEN, - Int64 => i64::ENCODED_LEN, Decimal(_, _) => i128::ENCODED_LEN, Float32 => f32::ENCODED_LEN, Float64 => f64::ENCODED_LEN, Boolean => bool::ENCODED_LEN, - FixedSizeList(f, width) => 1 + width * fixed_size(f.dtype())?, + FixedSizeList(f, width) => 1 + width * fixed_size(f.dtype(), field)?, Struct(fs) => { let mut sum = 0; for f in fs { - sum += fixed_size(f.dtype())?; + sum += fixed_size(f.dtype(), field)?; } 1 + sum }, diff --git a/crates/polars-row/src/row.rs b/crates/polars-row/src/row.rs index 1aa50e8b0e43..1c475997564b 100644 --- a/crates/polars-row/src/row.rs +++ b/crates/polars-row/src/row.rs @@ -13,6 +13,9 @@ pub struct EncodingField { /// Ignore all order-related flags and don't encode order-preserving. /// This is faster for variable encoding as we can just memcopy all the bytes. pub no_order: bool, + + /// Enable the variable integer encoding. This compresses large integers into smaller integers. + pub enable_varint: bool, } impl EncodingField { @@ -21,6 +24,7 @@ impl EncodingField { descending, nulls_last, no_order: false, + enable_varint: false, } } diff --git a/crates/polars-row/src/utils.rs b/crates/polars-row/src/utils.rs index 2681527f07fa..4dc860026112 100644 --- a/crates/polars-row/src/utils.rs +++ b/crates/polars-row/src/utils.rs @@ -19,3 +19,22 @@ macro_rules! with_match_arrow_primitive_type {( _ => unreachable!(), } })} + +#[macro_export] +macro_rules! with_match_arrow_integer_type {( + $key_type:expr, | $_:tt $T:ident | $($body:tt)* +) => ({ + macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )} + use arrow::datatypes::ArrowDataType::*; + match $key_type { + Int8 => __with_ty__! { i8 }, + Int16 => __with_ty__! { i16 }, + Int32 => __with_ty__! { i32 }, + Int64 => __with_ty__! { i64 }, + UInt8 => __with_ty__! { u8 }, + UInt16 => __with_ty__! { u16 }, + UInt32 => __with_ty__! { u32 }, + UInt64 => __with_ty__! { u64 }, + _ => unreachable!(), + } +})} diff --git a/crates/polars-row/src/variable/varint.rs b/crates/polars-row/src/variable/varint.rs index 0aa459db168e..11136eb87536 100644 --- a/crates/polars-row/src/variable/varint.rs +++ b/crates/polars-row/src/variable/varint.rs @@ -46,27 +46,61 @@ use polars_utils::slice::Slice2Uninit; use super::get_null_sentinel; use crate::EncodingField; -pub(crate) trait VarIntEncoding: Pod { +pub(crate) trait VarIntEncoding: Pod + std::fmt::Debug { const IS_SIGNED: bool; + + const NUM_BYTELEN_BITS: usize = size_of::().trailing_zeros() as usize; + const FIRST_BYTE_BITS: usize = 6 - (Self::IS_SIGNED as usize) - Self::NUM_BYTELEN_BITS; + const INLINE_MSB_THRESHOLD: usize = if Self::IS_SIGNED { 5 } else { 6 }; + fn msb(self) -> u32; + #[inline(always)] fn msb_to_byte_length(msb: u32) -> usize { let msb = msb as usize; - 1 + if msb <= Self::INLINE_MSB_THRESHOLD { - 0 + let extra_bits = if msb > Self::FIRST_BYTE_BITS + Self::NUM_BYTELEN_BITS { + msb - Self::FIRST_BYTE_BITS } else { - (msb + usize::from(Self::IS_SIGNED)).div_ceil(8) - } + 0 + }; + let result = 1 + extra_bits.div_ceil(8); + debug_assert!(result <= size_of::() + 1); + result } - - fn msb(self) -> u32; fn len_from_item(value: Option) -> usize { match value { None => 1, Some(v) => Self::msb_to_byte_length(v.msb()), } } + unsafe fn len_from_buffer(buffer: &[u8], field: &EncodingField) -> usize { + let mut b = *buffer.get_unchecked(0); + + if b == get_null_sentinel(field) { + return 1; + } + + if field.descending { + b = !b; + } + + let is_inline = b & (1 << (6 + u32::from(!Self::IS_SIGNED))) == 0; + let num_bytes = b >> (6 - usize::from(Self::IS_SIGNED) - Self::NUM_BYTELEN_BITS); + let num_bytes = num_bytes & (1 << Self::NUM_BYTELEN_BITS) - 1; + let mut num_bytes = num_bytes as usize; + + if Self::IS_SIGNED && b & 0x40 == 0 { + num_bytes = !num_bytes; + } + + debug_assert_ne!(num_bytes, (1 << Self::NUM_BYTELEN_BITS) - 1); + if is_inline { + 1 + } else { + 1 + num_bytes + } + } unsafe fn encode_one( value: Option, @@ -77,15 +111,6 @@ pub(crate) trait VarIntEncoding: Pod { unsafe fn decode_one(buffer: &mut &[u8], field: &EncodingField) -> Option; } -pub(crate) unsafe fn len_from_buffer(buffer: &[u8]) -> usize { - let b = *buffer.get_unchecked(0); - 1 + if b & 0xC0 == 0xC0 { - (b & 0x3F) as usize - } else { - 0 - } -} - macro_rules! implement_varint { ($($t:ty,)+) => { $( @@ -118,46 +143,46 @@ macro_rules! implement_varint { }, Some(v) => { let msb = v.msb(); - if msb as usize <= Self::INLINE_MSB_THRESHOLD { - let mut sentinel = null_sentinel ^ 0x80; - sentinel |= v.to_le_bytes()[0] & 0x3F; + let bytelen = Self::msb_to_byte_length(msb); - if Self::IS_SIGNED { - // Flip sign bit - sentinel ^= 0x20; - } - if field.descending { - sentinel = !sentinel; - } + let uses_all_bytes = bytelen > size_of::(); - buffer[*offset] = MaybeUninit::new(sentinel); - *offset += 1; + let bytes = v.to_be_bytes(); + let bytes = bytes.as_ref(); + + buffer[*offset] = MaybeUninit::new(0); + buffer[*offset + usize::from(uses_all_bytes)..][..bytelen - usize::from(uses_all_bytes)].copy_from_slice(&bytes.as_uninit()[size_of::().saturating_sub(bytelen)..]); + + let sentinel_value_bit_mask = if msb == 1 { + (1 << (Self::FIRST_BYTE_BITS + Self::NUM_BYTELEN_BITS)) - 1 } else { - let byte_length = Self::msb_to_byte_length(msb); - let additional_bytes = byte_length - 1; - debug_assert!(additional_bytes > 0 && additional_bytes <= 64); - - let mut sentinel = null_sentinel ^ 0x80; - sentinel |= 0x40; // not inlined - sentinel |= (additional_bytes - 1) as u8; - buffer[*offset] = MaybeUninit::new(sentinel); - - let bytes = v.to_be_bytes(); - let bytes = &bytes.as_ref().as_uninit()[size_of::() - (byte_length - 1)..]; - buffer[*offset + 1..*offset + byte_length].copy_from_slice( bytes); - - if Self::IS_SIGNED { - // Flip sign bit - *buffer[*offset + 1].assume_init_mut() ^= 0x80; - } - if field.descending { - buffer[*offset + 1..*offset + byte_length] - .iter_mut() - .for_each(|v| *v = MaybeUninit::new(!*v.assume_init_ref())); - } + (1 << Self::FIRST_BYTE_BITS) - 1 + }; + + let sentinel_byte = unsafe { buffer[*offset].assume_init_mut() }; - *offset += byte_length; + *sentinel_byte &= sentinel_value_bit_mask; + if Self::IS_SIGNED { + *sentinel_byte |= (!bytes[0] & 0x80) >> 1; + } + *sentinel_byte |= 0x80 & !null_sentinel; + *sentinel_byte |= u8::from(msb == 1) << (7 - usize::from(Self::IS_SIGNED)); + let sentinel_bytelen = (bytelen - 1) as u8; + #[allow(unused_comparisons)] + let sentinel_bytelen = if Self::IS_SIGNED && v < 0 { + (!sentinel_bytelen) & ((1 << Self::NUM_BYTELEN_BITS) - 1) + } else { + sentinel_bytelen + }; + *sentinel_byte |= sentinel_bytelen << (6 - usize::from(Self::IS_SIGNED) - Self::NUM_BYTELEN_BITS); + + if field.descending { + *sentinel_byte ^= 0x7F; + for i in 1..bytelen { + *unsafe { buffer[*offset + i].assume_init_mut() } ^= 0xFF; + } } + *offset += bytelen; }, } } @@ -165,54 +190,43 @@ macro_rules! implement_varint { unsafe fn decode_one(buffer: &mut &[u8], field: &EncodingField) -> Option { let null_sentinel = get_null_sentinel(field); - let sentinel_byte = buffer[0]; - *buffer = &buffer[1..]; - - if sentinel_byte == null_sentinel { + if buffer[0] == null_sentinel { return None; } - let sentinel_byte = if field.descending { - !sentinel_byte - } else { - sentinel_byte - }; + let bytelen = Self::len_from_buffer(*buffer, field); - let is_inlined = sentinel_byte & 0x40 == 0; - if is_inlined { - let mut value = (sentinel_byte & 0x3F) as Self; - if Self::IS_SIGNED { - // Flip sign bit - value ^= 0x20; + let value = if bytelen <= size_of::() { + let mut intermediate = [0u8; size_of::()]; + intermediate[size_of::() - bytelen..].copy_from_slice(&buffer[..bytelen]); - // Sign-extend - value <<= Self::BITS - 5; - value >>= Self::BITS - 5; + let first_byte_bits = if bytelen == 1 { + Self::FIRST_BYTE_BITS + Self::NUM_BYTELEN_BITS + } else { + Self::FIRST_BYTE_BITS + }; + + if Self::IS_SIGNED { + intermediate[size_of::() - bytelen] = (intermediate[size_of::() - bytelen] & ((1 << Self::FIRST_BYTE_BITS) - 1)) | ((!(intermediate[size_of::() - bytelen] & 0x40)) >> (1 + Self::NUM_BYTELEN_BITS)); + } else { + intermediate[size_of::() - bytelen] = intermediate[size_of::() - bytelen] & ((1 << Self::FIRST_BYTE_BITS) - 1); } - Some(value) - } else { - let byte_length = (sentinel_byte & 0x3F) as usize + 1; - let mut intermediate = [0u8; size_of::()]; - intermediate[size_of::() - byte_length..] - .copy_from_slice(&buffer[..byte_length]); - let mut v = Self::from_be_bytes(intermediate); + let mut value = Self::from_be_bytes(intermediate); if Self::IS_SIGNED { - // Flip sign bit - v ^= 1 << (byte_length * 8 - 1); - // Sign-extend - v <<= (size_of::() - byte_length) * 8; - v >>= (size_of::() - byte_length) * 8; - } - if field.descending { - v = !v; + value <<= (8 * size_of::() - (bytelen - 1)) + 7 - first_byte_bits; + value >>= (8 * size_of::() - (bytelen - 1)) + 7 - first_byte_bits; } - *buffer = &buffer[byte_length..]; - Some(v) - } + value + } else { + Self::from_be_bytes(unsafe { buffer.get_unchecked(1..1 + size_of::()) }.try_into().unwrap()) + }; + + *buffer = &buffer[bytelen..]; + Some(value) } } )+ @@ -237,4 +251,4 @@ pub(crate) unsafe fn decode( PrimitiveArray::from_iter(rows.iter_mut().map(|row| T::decode_one(row, field))) } -implement_varint![i32, u32, usize,]; +implement_varint![i8, i16, i32, i64, i128, u8, u16, u32, u64, usize,]; diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index c2dbc3c944ad..4c6923d6ef3a 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -11282,7 +11282,7 @@ def _to_metadata( def _row_encode( self, - fields: list[tuple[bool, bool, bool]], + fields: list[tuple[bool, bool, bool, bool]], ) -> Series: """ Row encode the given DataFrame. @@ -11294,6 +11294,7 @@ def _row_encode( - descending - nulls_last - no_order + - enable_varint """ return pl.Series._from_pyseries(self._df._row_encode(fields)) diff --git a/py-polars/polars/series/series.py b/py-polars/polars/series/series.py index 0baa84c58d58..c6b458d64c3a 100644 --- a/py-polars/polars/series/series.py +++ b/py-polars/polars/series/series.py @@ -7527,7 +7527,7 @@ def plot(self) -> SeriesPlot: def _row_decode( self, dtypes: Iterable[tuple[str, DataType]], # type: ignore[valid-type] - fields: Iterable[tuple[bool, bool, bool]], + fields: Iterable[tuple[bool, bool, bool, bool]], ) -> DataFrame: """ Row decode the given Series. @@ -7539,6 +7539,7 @@ def _row_decode( - descending - nulls_last - no_order + - enable_varint """ return pl.DataFrame._from_pydf(self._s._row_decode(list(dtypes), list(fields)))