From 79fc5b24258604313d293ceb758a0b9b98361148 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 21:17:35 +0800 Subject: [PATCH 01/11] Optimize --- .../arrow/array_reader/cached_array_reader.rs | 14 ++- .../arrow/record_reader/definition_levels.rs | 57 +++++++--- parquet/src/column/reader/decoder.rs | 103 ++++++++++++++---- parquet/src/util/bit_util.rs | 16 +++ 4 files changed, 145 insertions(+), 45 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index a2fa0e903599..1170f39dd6fb 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -18,10 +18,11 @@ //! [`CachedArrayReader`] wrapper around [`ArrayReader`] use crate::arrow::array_reader::row_group_cache::BatchID; -use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; +use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; -use arrow_array::{ArrayRef, BooleanArray, new_empty_array}; +use ahash::RandomState; +use arrow_array::{new_empty_array, ArrayRef, BooleanArray}; use arrow_buffer::BooleanBufferBuilder; use arrow_schema::DataType as ArrowType; use std::any::Any; @@ -82,9 +83,10 @@ pub struct CachedArrayReader { selections: BooleanBufferBuilder, /// Role of this reader (Producer or Consumer) role: CacheRole, - /// Local cache to store batches between read_records and consume_batch calls - /// This ensures data is available even if the shared cache evicts items - local_cache: HashMap, + /// Local cache to store batches between read_records and consume_batch calls. + /// This ensures data is available even if the shared cache evicts items, and it uses + /// `ahash::RandomState` to minimize hashing overhead across the small batch IDs used here. + local_cache: HashMap, /// Statistics to report on the Cache behavior metrics: ArrowReaderMetrics, } @@ -109,7 +111,7 @@ impl CachedArrayReader { batch_size, selections: BooleanBufferBuilder::new(0), role, - local_cache: HashMap::new(), + local_cache: HashMap::default(), metrics, } } diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 34b728d6fa1e..1599bd15738d 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,7 +20,6 @@ use arrow_buffer::Buffer; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use bytes::Bytes; -use crate::arrow::buffer::bit_util::count_set_bits; use crate::basic::Encoding; use crate::column::reader::decoder::{ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, @@ -169,10 +168,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => { assert_eq!(self.max_level, 1); - let start = nulls.len(); - let levels_read = decoder.read(nulls, num_levels)?; - - let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read); + let (values_read, levels_read) = decoder.read(nulls, num_levels)?; Ok((values_read, levels_read)) } _ => unreachable!("inconsistent null mask"), @@ -284,20 +280,31 @@ impl PackedDecoder { self.data_offset = 0; } - fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result { - let mut read = 0; - while read != len { + /// Reads up to `len` definition levels directly into a boolean bitmask. + /// + /// Returns a tuple of `(values_read, levels_read)`, where `values_read` counts the + /// number of `true` bits appended to `buffer`. + fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<(usize, usize)> { + let mut levels_read = 0; + let mut values_read = 0; + while levels_read != len { if self.rle_left != 0 { - let to_read = self.rle_left.min(len - read); + let to_read = self.rle_left.min(len - levels_read); buffer.append_n(to_read, self.rle_value); self.rle_left -= to_read; - read += to_read; + if self.rle_value { + values_read += to_read; + } + levels_read += to_read; } else if self.packed_count != self.packed_offset { - let to_read = (self.packed_count - self.packed_offset).min(len - read); + let to_read = (self.packed_count - self.packed_offset).min(len - levels_read); let offset = self.data_offset * 8 + self.packed_offset; buffer.append_packed_range(offset..offset + to_read, self.data.as_ref()); + // Packed runs already encode bits densely; count the ones we just appended. + values_read += + UnalignedBitChunk::new(self.data.as_ref(), offset, to_read).count_ones(); self.packed_offset += to_read; - read += to_read; + levels_read += to_read; if self.packed_offset == self.packed_count { self.data_offset += self.packed_count / 8; @@ -308,7 +315,7 @@ impl PackedDecoder { self.next_rle_block()? } } - Ok(read) + Ok((values_read, levels_read)) } /// Skips `level_num` definition levels @@ -360,10 +367,14 @@ mod tests { let mut expected = BooleanBufferBuilder::new(len); let mut encoder = RleEncoder::new(1, 1024); + let mut expected_value_count = 0; for _ in 0..len { let bool = rng.random_bool(0.8); encoder.put(bool as u64); expected.append(bool); + if bool { + expected_value_count += 1; + } } assert_eq!(expected.len(), len); @@ -373,6 +384,8 @@ mod tests { // Decode data in random length intervals let mut decoded = BooleanBufferBuilder::new(len); + // Track how many `true` bits we appended to validate the returned counts. + let mut decoded_value_count = 0; loop { let remaining = len - decoded.len(); if remaining == 0 { @@ -380,11 +393,18 @@ mod tests { } let to_read = rng.random_range(1..=remaining); - decoder.read(&mut decoded, to_read).unwrap(); + let offset = decoded.len(); + let (values_read, levels_read) = decoder.read(&mut decoded, to_read).unwrap(); + assert_eq!(levels_read, to_read); + decoded_value_count += values_read; + let expected_chunk_ones = + UnalignedBitChunk::new(expected.as_slice(), offset, levels_read).count_ones(); + assert_eq!(values_read, expected_chunk_ones); } assert_eq!(decoded.len(), len); assert_eq!(decoded.as_slice(), expected.as_slice()); + assert_eq!(decoded_value_count, expected_value_count); } #[test] @@ -428,18 +448,23 @@ mod tests { skip_level += skip_level_num } else { let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level); - let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); + let (read_value_num, read_level_num) = + decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); read_level += read_level_num; + read_value += read_value_num; + // Verify the per-chunk counts match the exact bits we compared below. + let mut chunk_value_count = 0; for i in 0..read_level_num { assert!(!decoded.is_empty()); //check each read bit let read_bit = decoded.get_bit(i); if read_bit { - read_value += 1; + chunk_value_count += 1; } let expect_bit = expected.get_bit(i + offset); assert_eq!(read_bit, expect_bit); } + assert_eq!(chunk_value_count, read_value_num); } } assert_eq!(read_level + skip_level, len); diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index acf4827ed538..65e095b75c6e 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - use bytes::Bytes; use crate::basic::Encoding; @@ -68,9 +66,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder { } pub trait DefinitionLevelDecoder: ColumnLevelDecoder { - /// Read up to `num_levels` definition levels into `out` + /// Read up to `num_levels` definition levels into `out`. /// - /// Returns the number of values skipped, and the number of levels skipped + /// Returns the number of values read, and the number of levels read. /// /// # Panics /// @@ -81,9 +79,9 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder { num_levels: usize, ) -> Result<(usize, usize)>; - /// Skips over `num_levels` definition levels + /// Skips over `num_levels` definition levels. /// - /// Returns the number of values skipped, and the number of levels skipped + /// Returns the number of values skipped, and the number of levels skipped. fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>; } @@ -136,14 +134,76 @@ pub trait ColumnValueDecoder { fn skip_values(&mut self, num_values: usize) -> Result; } +/// Bucket-based storage for decoder instances keyed by `Encoding`. +/// +/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the +/// hot decoding paths. +const ENCODING_SLOTS: usize = 10; // covers the encodings handled in `enc_slot` + +#[inline] +fn enc_slot(e: Encoding) -> usize { + match e { + Encoding::PLAIN => 0, + Encoding::PLAIN_DICTIONARY => 2, + Encoding::RLE => 3, + #[allow(deprecated)] + Encoding::BIT_PACKED => 4, + Encoding::DELTA_BINARY_PACKED => 5, + Encoding::DELTA_LENGTH_BYTE_ARRAY => 6, + Encoding::DELTA_BYTE_ARRAY => 7, + Encoding::RLE_DICTIONARY => 8, + Encoding::BYTE_STREAM_SPLIT => 9, + _ => unreachable!("unsupported encoding: {e}"), + } +} + +/// Fixed-capacity storage for decoder instances keyed by Parquet encoding. +struct DecoderBuckets { + inner: [Option; ENCODING_SLOTS], +} + +impl DecoderBuckets { + #[inline] + fn new() -> Self { + Self { + inner: std::array::from_fn(|_| None), + } + } + + #[inline] + fn contains_key(&self, e: Encoding) -> bool { + self.inner[enc_slot(e)].is_some() + } + + #[inline] + fn get_mut(&mut self, e: Encoding) -> Option<&mut V> { + self.inner[enc_slot(e)].as_mut() + } + + #[inline] + fn insert_and_get_mut(&mut self, e: Encoding, v: V) -> &mut V { + let slot = &mut self.inner[enc_slot(e)]; + debug_assert!(slot.is_none()); + *slot = Some(v); + slot.as_mut().unwrap() + } +} + +impl Default for DecoderBuckets { + fn default() -> Self { + Self::new() + } +} + /// An implementation of [`ColumnValueDecoder`] for `[T::T]` pub struct ColumnValueDecoderImpl { descr: ColumnDescPtr, current_encoding: Option, - // Cache of decoders for existing encodings - decoders: HashMap>>, + /// Cache of decoders for existing encodings. + /// Uses `DecoderBuckets` instead of `HashMap` for predictable indexing. + decoders: DecoderBuckets>>, } impl ColumnValueDecoder for ColumnValueDecoderImpl { @@ -153,7 +213,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { Self { descr: descr.clone(), current_encoding: None, - decoders: Default::default(), + decoders: DecoderBuckets::new(), } } @@ -168,7 +228,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { encoding = Encoding::RLE_DICTIONARY } - if self.decoders.contains_key(&encoding) { + if self.decoders.contains_key(encoding) { return Err(general_err!("Column cannot have more than one dictionary")); } @@ -178,7 +238,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { let mut decoder = DictDecoder::new(); decoder.set_dict(Box::new(dictionary))?; - self.decoders.insert(encoding, Box::new(decoder)); + self.decoders + .insert_and_get_mut(encoding, Box::new(decoder)); Ok(()) } else { Err(nyi_err!( @@ -195,24 +256,20 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { num_levels: usize, num_values: Option, ) -> Result<()> { - use std::collections::hash_map::Entry; - if encoding == Encoding::PLAIN_DICTIONARY { encoding = Encoding::RLE_DICTIONARY; } let decoder = if encoding == Encoding::RLE_DICTIONARY { self.decoders - .get_mut(&encoding) + .get_mut(encoding) .expect("Decoder for dict should have been set") } else { - // Search cache for data page decoder - match self.decoders.entry(encoding) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(v) => { - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - v.insert(data_decoder) - } + if let Some(decoder) = self.decoders.get_mut(encoding) { + decoder + } else { + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders.insert_and_get_mut(encoding, data_decoder) } }; @@ -228,7 +285,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { let current_decoder = self .decoders - .get_mut(&encoding) + .get_mut(encoding) .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); // TODO: Push vec into decoder (#5177) @@ -246,7 +303,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { let current_decoder = self .decoders - .get_mut(&encoding) + .get_mut(encoding) .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); current_decoder.skip(num_values) diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 3a26603fabc4..806ecd156160 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -591,6 +591,22 @@ impl BitReader { values_to_read } + #[inline] + pub fn skip_aligned_bytes(&mut self, n: usize) { + // We do not care about the returned value, only about advancing the cursor. + // Callers use this from the RLE `reload_for_skip` path, which guarantees byte alignment + // by rounding the bit width to `ceil(bit_width, 8)`. + // Drop the bytes in chunks of up to eight bytes using `get_aligned::()`. + let mut left = n; + while left > 0 { + let chunk = left.min(8); + // Ignore the return value. If the source ends early, this mirrors the original + // "stop at the end" behavior. + let _ = self.get_aligned::(chunk); + left -= chunk; + } + } + /// Skip num_value values with num_bits bit width /// /// Return the number of values skipped (up to num_values) From c8afcb1132640cf7dd44b2e5b9473a91c2ec1a4e Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 21:31:59 +0800 Subject: [PATCH 02/11] reformat --- parquet/src/arrow/array_reader/cached_array_reader.rs | 2 +- parquet/src/arrow/record_reader/definition_levels.rs | 4 ++-- parquet/src/column/reader/decoder.rs | 5 ++--- parquet/src/util/bit_util.rs | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 1170f39dd6fb..b7f21f0388b0 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -353,8 +353,8 @@ impl ArrayReader for CachedArrayReader { #[cfg(test)] mod tests { use super::*; - use crate::arrow::array_reader::ArrayReader; use crate::arrow::array_reader::row_group_cache::RowGroupCache; + use crate::arrow::array_reader::ArrayReader; use arrow_array::{ArrayRef, Int32Array}; use std::sync::{Arc, Mutex}; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 1599bd15738d..9e122612c1e1 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -16,8 +16,8 @@ // under the License. use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::Buffer; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; +use arrow_buffer::Buffer; use bytes::Bytes; use crate::basic::Encoding; @@ -358,7 +358,7 @@ mod tests { use super::*; use crate::encodings::rle::RleEncoder; - use rand::{Rng, rng}; + use rand::{rng, Rng}; #[test] fn test_packed_decoder() { diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 65e095b75c6e..e9a7eb423e26 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -20,12 +20,12 @@ use bytes::Bytes; use crate::basic::Encoding; use crate::data_type::DataType; use crate::encodings::{ - decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder}, + decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, rle::RleDecoder, }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::bit_util::{BitReader, num_required_bits}; +use crate::util::bit_util::{num_required_bits, BitReader}; /// Decodes level data pub trait ColumnLevelDecoder { @@ -153,7 +153,6 @@ fn enc_slot(e: Encoding) -> usize { Encoding::DELTA_BYTE_ARRAY => 7, Encoding::RLE_DICTIONARY => 8, Encoding::BYTE_STREAM_SPLIT => 9, - _ => unreachable!("unsupported encoding: {e}"), } } diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 806ecd156160..1090f163d873 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96}; use crate::errors::{ParquetError, Result}; -use crate::util::bit_pack::{unpack8, unpack16, unpack32, unpack64}; +use crate::util::bit_pack::{unpack16, unpack32, unpack64, unpack8}; #[inline] fn array_from_slice(bs: &[u8]) -> Result<[u8; N]> { From a690b4a1cd347835dd52025c5906d919583da626 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 21:33:48 +0800 Subject: [PATCH 03/11] revert bit util --- parquet/src/util/bit_util.rs | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 1090f163d873..3a26603fabc4 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96}; use crate::errors::{ParquetError, Result}; -use crate::util::bit_pack::{unpack16, unpack32, unpack64, unpack8}; +use crate::util::bit_pack::{unpack8, unpack16, unpack32, unpack64}; #[inline] fn array_from_slice(bs: &[u8]) -> Result<[u8; N]> { @@ -591,22 +591,6 @@ impl BitReader { values_to_read } - #[inline] - pub fn skip_aligned_bytes(&mut self, n: usize) { - // We do not care about the returned value, only about advancing the cursor. - // Callers use this from the RLE `reload_for_skip` path, which guarantees byte alignment - // by rounding the bit width to `ceil(bit_width, 8)`. - // Drop the bytes in chunks of up to eight bytes using `get_aligned::()`. - let mut left = n; - while left > 0 { - let chunk = left.min(8); - // Ignore the return value. If the source ends early, this mirrors the original - // "stop at the end" behavior. - let _ = self.get_aligned::(chunk); - left -= chunk; - } - } - /// Skip num_value values with num_bits bit width /// /// Return the number of values skipped (up to num_values) From 43545d43f143b0963b56633807b1ebe9c0d5bdc1 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 22:58:36 +0800 Subject: [PATCH 04/11] reformat --- parquet/src/arrow/array_reader/cached_array_reader.rs | 6 +++--- parquet/src/arrow/record_reader/definition_levels.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index b7f21f0388b0..54549e2930c1 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -18,11 +18,11 @@ //! [`CachedArrayReader`] wrapper around [`ArrayReader`] use crate::arrow::array_reader::row_group_cache::BatchID; -use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader}; +use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; use ahash::RandomState; -use arrow_array::{new_empty_array, ArrayRef, BooleanArray}; +use arrow_array::{ArrayRef, BooleanArray, new_empty_array}; use arrow_buffer::BooleanBufferBuilder; use arrow_schema::DataType as ArrowType; use std::any::Any; @@ -353,8 +353,8 @@ impl ArrayReader for CachedArrayReader { #[cfg(test)] mod tests { use super::*; - use crate::arrow::array_reader::row_group_cache::RowGroupCache; use crate::arrow::array_reader::ArrayReader; + use crate::arrow::array_reader::row_group_cache::RowGroupCache; use arrow_array::{ArrayRef, Int32Array}; use std::sync::{Arc, Mutex}; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 9e122612c1e1..1599bd15738d 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -16,8 +16,8 @@ // under the License. use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use arrow_buffer::Buffer; +use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use bytes::Bytes; use crate::basic::Encoding; @@ -358,7 +358,7 @@ mod tests { use super::*; use crate::encodings::rle::RleEncoder; - use rand::{rng, Rng}; + use rand::{Rng, rng}; #[test] fn test_packed_decoder() { From f85181808210584f94f4db5ca69dfac4eb4745a3 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 23:06:32 +0800 Subject: [PATCH 05/11] reformat --- parquet/src/arrow/buffer/bit_util.rs | 1 + parquet/src/column/reader/decoder.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/buffer/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs index 1d2c953abcbb..a37527850068 100644 --- a/parquet/src/arrow/buffer/bit_util.rs +++ b/parquet/src/arrow/buffer/bit_util.rs @@ -18,6 +18,7 @@ use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use std::ops::Range; +#[allow(unused)] /// Counts the number of set bits in the provided range pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - range.start); diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index e9a7eb423e26..2db911bac88a 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -20,12 +20,12 @@ use bytes::Bytes; use crate::basic::Encoding; use crate::data_type::DataType; use crate::encodings::{ - decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, + decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder}, rle::RleDecoder, }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::bit_util::{num_required_bits, BitReader}; +use crate::util::bit_util::{BitReader, num_required_bits}; /// Decodes level data pub trait ColumnLevelDecoder { From 469574574de61359afcdf40d2c4c140a74cca538 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 23:31:35 +0800 Subject: [PATCH 06/11] Revert the map in cached array --- .../arrow/array_reader/cached_array_reader.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 54549e2930c1..0e837782faf5 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -18,11 +18,10 @@ //! [`CachedArrayReader`] wrapper around [`ArrayReader`] use crate::arrow::array_reader::row_group_cache::BatchID; -use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; +use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; -use ahash::RandomState; -use arrow_array::{ArrayRef, BooleanArray, new_empty_array}; +use arrow_array::{new_empty_array, ArrayRef, BooleanArray}; use arrow_buffer::BooleanBufferBuilder; use arrow_schema::DataType as ArrowType; use std::any::Any; @@ -83,10 +82,9 @@ pub struct CachedArrayReader { selections: BooleanBufferBuilder, /// Role of this reader (Producer or Consumer) role: CacheRole, - /// Local cache to store batches between read_records and consume_batch calls. - /// This ensures data is available even if the shared cache evicts items, and it uses - /// `ahash::RandomState` to minimize hashing overhead across the small batch IDs used here. - local_cache: HashMap, + /// Local cache to store batches between read_records and consume_batch calls + /// This ensures data is available even if the shared cache evicts items + local_cache: HashMap, /// Statistics to report on the Cache behavior metrics: ArrowReaderMetrics, } @@ -111,7 +109,7 @@ impl CachedArrayReader { batch_size, selections: BooleanBufferBuilder::new(0), role, - local_cache: HashMap::default(), + local_cache: HashMap::new(), metrics, } } @@ -353,8 +351,8 @@ impl ArrayReader for CachedArrayReader { #[cfg(test)] mod tests { use super::*; - use crate::arrow::array_reader::ArrayReader; use crate::arrow::array_reader::row_group_cache::RowGroupCache; + use crate::arrow::array_reader::ArrayReader; use arrow_array::{ArrayRef, Int32Array}; use std::sync::{Arc, Mutex}; From 9e7cb1537fb36b1ec280ea07d59051cb33cfc50e Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Tue, 14 Oct 2025 23:32:37 +0800 Subject: [PATCH 07/11] reformat --- parquet/src/arrow/array_reader/cached_array_reader.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 0e837782faf5..a2fa0e903599 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -18,10 +18,10 @@ //! [`CachedArrayReader`] wrapper around [`ArrayReader`] use crate::arrow::array_reader::row_group_cache::BatchID; -use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader}; +use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; -use arrow_array::{new_empty_array, ArrayRef, BooleanArray}; +use arrow_array::{ArrayRef, BooleanArray, new_empty_array}; use arrow_buffer::BooleanBufferBuilder; use arrow_schema::DataType as ArrowType; use std::any::Any; @@ -351,8 +351,8 @@ impl ArrayReader for CachedArrayReader { #[cfg(test)] mod tests { use super::*; - use crate::arrow::array_reader::row_group_cache::RowGroupCache; use crate::arrow::array_reader::ArrayReader; + use crate::arrow::array_reader::row_group_cache::RowGroupCache; use arrow_array::{ArrayRef, Int32Array}; use std::sync::{Arc, Mutex}; From 42035ebb3e4922933243e3d5b39d1dafe892f5e8 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 16 Oct 2025 10:05:13 +0800 Subject: [PATCH 08/11] fix a build error in clippy --- parquet/src/column/reader/decoder.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 2db911bac88a..993b0a475cab 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -263,13 +263,11 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { self.decoders .get_mut(encoding) .expect("Decoder for dict should have been set") + } else if let Some(decoder) = self.decoders.get_mut(encoding) { + decoder } else { - if let Some(decoder) = self.decoders.get_mut(encoding) { - decoder - } else { - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert_and_get_mut(encoding, data_decoder) - } + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders.insert_and_get_mut(encoding, data_decoder) }; decoder.set_data(data, num_values.unwrap_or(num_levels))?; From 65b57801c3f752f4bf495df677ad9f6e538befb3 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Sun, 19 Oct 2025 16:01:25 +0800 Subject: [PATCH 09/11] Using decode mask and revert change in definition_levels.rs --- parquet/src/arrow/buffer/bit_util.rs | 1 - .../arrow/record_reader/definition_levels.rs | 59 ++++------- parquet/src/basic.rs | 5 + parquet/src/column/reader/decoder.rs | 99 +++++-------------- 4 files changed, 47 insertions(+), 117 deletions(-) diff --git a/parquet/src/arrow/buffer/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs index a37527850068..1d2c953abcbb 100644 --- a/parquet/src/arrow/buffer/bit_util.rs +++ b/parquet/src/arrow/buffer/bit_util.rs @@ -18,7 +18,6 @@ use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use std::ops::Range; -#[allow(unused)] /// Counts the number of set bits in the provided range pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - range.start); diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 1599bd15738d..2f5ef1b9b1d3 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,6 +20,7 @@ use arrow_buffer::Buffer; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use bytes::Bytes; +use crate::arrow::buffer::bit_util::count_set_bits; use crate::basic::Encoding; use crate::column::reader::decoder::{ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, @@ -168,7 +169,10 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => { assert_eq!(self.max_level, 1); - let (values_read, levels_read) = decoder.read(nulls, num_levels)?; + let start = nulls.len(); + let levels_read = decoder.read(nulls, num_levels)?; + + let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read); Ok((values_read, levels_read)) } _ => unreachable!("inconsistent null mask"), @@ -280,31 +284,20 @@ impl PackedDecoder { self.data_offset = 0; } - /// Reads up to `len` definition levels directly into a boolean bitmask. - /// - /// Returns a tuple of `(values_read, levels_read)`, where `values_read` counts the - /// number of `true` bits appended to `buffer`. - fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<(usize, usize)> { - let mut levels_read = 0; - let mut values_read = 0; - while levels_read != len { + fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result { + let mut read = 0; + while read != len { if self.rle_left != 0 { - let to_read = self.rle_left.min(len - levels_read); + let to_read = self.rle_left.min(len - read); buffer.append_n(to_read, self.rle_value); self.rle_left -= to_read; - if self.rle_value { - values_read += to_read; - } - levels_read += to_read; + read += to_read; } else if self.packed_count != self.packed_offset { - let to_read = (self.packed_count - self.packed_offset).min(len - levels_read); + let to_read = (self.packed_count - self.packed_offset).min(len - read); let offset = self.data_offset * 8 + self.packed_offset; buffer.append_packed_range(offset..offset + to_read, self.data.as_ref()); - // Packed runs already encode bits densely; count the ones we just appended. - values_read += - UnalignedBitChunk::new(self.data.as_ref(), offset, to_read).count_ones(); self.packed_offset += to_read; - levels_read += to_read; + read += to_read; if self.packed_offset == self.packed_count { self.data_offset += self.packed_count / 8; @@ -315,7 +308,7 @@ impl PackedDecoder { self.next_rle_block()? } } - Ok((values_read, levels_read)) + Ok(read) } /// Skips `level_num` definition levels @@ -367,14 +360,10 @@ mod tests { let mut expected = BooleanBufferBuilder::new(len); let mut encoder = RleEncoder::new(1, 1024); - let mut expected_value_count = 0; for _ in 0..len { let bool = rng.random_bool(0.8); encoder.put(bool as u64); expected.append(bool); - if bool { - expected_value_count += 1; - } } assert_eq!(expected.len(), len); @@ -384,8 +373,6 @@ mod tests { // Decode data in random length intervals let mut decoded = BooleanBufferBuilder::new(len); - // Track how many `true` bits we appended to validate the returned counts. - let mut decoded_value_count = 0; loop { let remaining = len - decoded.len(); if remaining == 0 { @@ -393,18 +380,11 @@ mod tests { } let to_read = rng.random_range(1..=remaining); - let offset = decoded.len(); - let (values_read, levels_read) = decoder.read(&mut decoded, to_read).unwrap(); - assert_eq!(levels_read, to_read); - decoded_value_count += values_read; - let expected_chunk_ones = - UnalignedBitChunk::new(expected.as_slice(), offset, levels_read).count_ones(); - assert_eq!(values_read, expected_chunk_ones); + decoder.read(&mut decoded, to_read).unwrap(); } assert_eq!(decoded.len(), len); assert_eq!(decoded.as_slice(), expected.as_slice()); - assert_eq!(decoded_value_count, expected_value_count); } #[test] @@ -448,26 +428,21 @@ mod tests { skip_level += skip_level_num } else { let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level); - let (read_value_num, read_level_num) = - decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); + let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); read_level += read_level_num; - read_value += read_value_num; - // Verify the per-chunk counts match the exact bits we compared below. - let mut chunk_value_count = 0; for i in 0..read_level_num { assert!(!decoded.is_empty()); //check each read bit let read_bit = decoded.get_bit(i); if read_bit { - chunk_value_count += 1; + read_value += 1; } let expect_bit = expected.get_bit(i + offset); assert_eq!(read_bit, expect_bit); } - assert_eq!(chunk_value_count, read_value_num); } } assert_eq!(read_level + skip_level, len); assert_eq!(read_value + skip_value, total_value); } -} +} \ No newline at end of file diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index bd42f043eead..7f50eada46de 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -771,6 +771,11 @@ impl EncodingMask { Self(mask) } + /// Mark the given [`Encoding`] as present in this mask. + pub fn insert(&mut self, val: Encoding) { + self.0 |= 1 << (val as i32); + } + /// Test if a given [`Encoding`] is present in this mask. pub fn is_set(&self, val: Encoding) -> bool { self.0 & (1 << (val as i32)) != 0 diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 993b0a475cab..1d4e2f751181 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -17,7 +17,7 @@ use bytes::Bytes; -use crate::basic::Encoding; +use crate::basic::{Encoding, EncodingMask}; use crate::data_type::DataType; use crate::encodings::{ decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder}, @@ -138,61 +138,7 @@ pub trait ColumnValueDecoder { /// /// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the /// hot decoding paths. -const ENCODING_SLOTS: usize = 10; // covers the encodings handled in `enc_slot` - -#[inline] -fn enc_slot(e: Encoding) -> usize { - match e { - Encoding::PLAIN => 0, - Encoding::PLAIN_DICTIONARY => 2, - Encoding::RLE => 3, - #[allow(deprecated)] - Encoding::BIT_PACKED => 4, - Encoding::DELTA_BINARY_PACKED => 5, - Encoding::DELTA_LENGTH_BYTE_ARRAY => 6, - Encoding::DELTA_BYTE_ARRAY => 7, - Encoding::RLE_DICTIONARY => 8, - Encoding::BYTE_STREAM_SPLIT => 9, - } -} - -/// Fixed-capacity storage for decoder instances keyed by Parquet encoding. -struct DecoderBuckets { - inner: [Option; ENCODING_SLOTS], -} - -impl DecoderBuckets { - #[inline] - fn new() -> Self { - Self { - inner: std::array::from_fn(|_| None), - } - } - - #[inline] - fn contains_key(&self, e: Encoding) -> bool { - self.inner[enc_slot(e)].is_some() - } - - #[inline] - fn get_mut(&mut self, e: Encoding) -> Option<&mut V> { - self.inner[enc_slot(e)].as_mut() - } - - #[inline] - fn insert_and_get_mut(&mut self, e: Encoding, v: V) -> &mut V { - let slot = &mut self.inner[enc_slot(e)]; - debug_assert!(slot.is_none()); - *slot = Some(v); - slot.as_mut().unwrap() - } -} - -impl Default for DecoderBuckets { - fn default() -> Self { - Self::new() - } -} +const ENCODING_SLOTS: usize = Encoding::BYTE_STREAM_SPLIT as usize + 1; /// An implementation of [`ColumnValueDecoder`] for `[T::T]` pub struct ColumnValueDecoderImpl { @@ -201,8 +147,9 @@ pub struct ColumnValueDecoderImpl { current_encoding: Option, /// Cache of decoders for existing encodings. - /// Uses `DecoderBuckets` instead of `HashMap` for predictable indexing. - decoders: DecoderBuckets>>, + /// Uses `EncodingMask` and dense storage keyed by encoding discriminant. + decoder_mask: EncodingMask, + decoders: [Option>>; ENCODING_SLOTS], } impl ColumnValueDecoder for ColumnValueDecoderImpl { @@ -212,7 +159,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { Self { descr: descr.clone(), current_encoding: None, - decoders: DecoderBuckets::new(), + decoder_mask: EncodingMask::default(), + decoders: std::array::from_fn(|_| None), } } @@ -227,7 +175,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { encoding = Encoding::RLE_DICTIONARY } - if self.decoders.contains_key(encoding) { + if self.decoder_mask.is_set(encoding) { return Err(general_err!("Column cannot have more than one dictionary")); } @@ -237,8 +185,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { let mut decoder = DictDecoder::new(); decoder.set_dict(Box::new(dictionary))?; - self.decoders - .insert_and_get_mut(encoding, Box::new(decoder)); + self.decoders[encoding as usize] = Some(Box::new(decoder)); + self.decoder_mask.insert(encoding); Ok(()) } else { Err(nyi_err!( @@ -260,14 +208,19 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { } let decoder = if encoding == Encoding::RLE_DICTIONARY { - self.decoders - .get_mut(encoding) + self.decoders[encoding as usize] + .as_mut() .expect("Decoder for dict should have been set") - } else if let Some(decoder) = self.decoders.get_mut(encoding) { - decoder } else { - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert_and_get_mut(encoding, data_decoder) + let slot = encoding as usize; + if self.decoders[slot].is_none() { + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders[slot] = Some(data_decoder); + self.decoder_mask.insert(encoding); + } + self.decoders[slot] + .as_mut() + .expect("decoder should have been inserted") }; decoder.set_data(data, num_values.unwrap_or(num_levels))?; @@ -280,9 +233,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .current_encoding .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(encoding) + let current_decoder = self.decoders[encoding as usize] + .as_mut() .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); // TODO: Push vec into decoder (#5177) @@ -298,9 +250,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .current_encoding .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(encoding) + let current_decoder = self.decoders[encoding as usize] + .as_mut() .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); current_decoder.skip(num_values) From 68993555e952f41734c68e4904f3df32329863b8 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Sun, 19 Oct 2025 16:41:23 +0800 Subject: [PATCH 10/11] reformat definition_levels.rs --- parquet/src/arrow/record_reader/definition_levels.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 2f5ef1b9b1d3..a90b3c4ec795 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -16,8 +16,8 @@ // under the License. use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::Buffer; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; +use arrow_buffer::Buffer; use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; @@ -351,7 +351,7 @@ mod tests { use super::*; use crate::encodings::rle::RleEncoder; - use rand::{Rng, rng}; + use rand::{rng, Rng}; #[test] fn test_packed_decoder() { @@ -445,4 +445,4 @@ mod tests { assert_eq!(read_level + skip_level, len); assert_eq!(read_value + skip_value, total_value); } -} \ No newline at end of file +} From 406c12f15393754e95c028875e796225af91c8d0 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Mon, 20 Oct 2025 12:47:03 +0800 Subject: [PATCH 11/11] reformat --- parquet/src/arrow/record_reader/definition_levels.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index a90b3c4ec795..34b728d6fa1e 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -16,8 +16,8 @@ // under the License. use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use arrow_buffer::Buffer; +use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; @@ -351,7 +351,7 @@ mod tests { use super::*; use crate::encodings::rle::RleEncoder; - use rand::{rng, Rng}; + use rand::{Rng, rng}; #[test] fn test_packed_decoder() {