diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index bd42f043eead..7f50eada46de 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -771,6 +771,11 @@ impl EncodingMask { Self(mask) } + /// Mark the given [`Encoding`] as present in this mask. + pub fn insert(&mut self, val: Encoding) { + self.0 |= 1 << (val as i32); + } + /// Test if a given [`Encoding`] is present in this mask. pub fn is_set(&self, val: Encoding) -> bool { self.0 & (1 << (val as i32)) != 0 diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index acf4827ed538..1d4e2f751181 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - use bytes::Bytes; -use crate::basic::Encoding; +use crate::basic::{Encoding, EncodingMask}; use crate::data_type::DataType; use crate::encodings::{ decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder}, @@ -68,9 +66,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder { } pub trait DefinitionLevelDecoder: ColumnLevelDecoder { - /// Read up to `num_levels` definition levels into `out` + /// Read up to `num_levels` definition levels into `out`. /// - /// Returns the number of values skipped, and the number of levels skipped + /// Returns the number of values read, and the number of levels read. /// /// # Panics /// @@ -81,9 +79,9 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder { num_levels: usize, ) -> Result<(usize, usize)>; - /// Skips over `num_levels` definition levels + /// Skips over `num_levels` definition levels. /// - /// Returns the number of values skipped, and the number of levels skipped + /// Returns the number of values skipped, and the number of levels skipped. fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>; } @@ -136,14 +134,22 @@ pub trait ColumnValueDecoder { fn skip_values(&mut self, num_values: usize) -> Result; } +/// Bucket-based storage for decoder instances keyed by `Encoding`. +/// +/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the +/// hot decoding paths. +const ENCODING_SLOTS: usize = Encoding::BYTE_STREAM_SPLIT as usize + 1; + /// An implementation of [`ColumnValueDecoder`] for `[T::T]` pub struct ColumnValueDecoderImpl { descr: ColumnDescPtr, current_encoding: Option, - // Cache of decoders for existing encodings - decoders: HashMap>>, + /// Cache of decoders for existing encodings. + /// Uses `EncodingMask` and dense storage keyed by encoding discriminant. + decoder_mask: EncodingMask, + decoders: [Option>>; ENCODING_SLOTS], } impl ColumnValueDecoder for ColumnValueDecoderImpl { @@ -153,7 +159,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { Self { descr: descr.clone(), current_encoding: None, - decoders: Default::default(), + decoder_mask: EncodingMask::default(), + decoders: std::array::from_fn(|_| None), } } @@ -168,7 +175,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { encoding = Encoding::RLE_DICTIONARY } - if self.decoders.contains_key(&encoding) { + if self.decoder_mask.is_set(encoding) { return Err(general_err!("Column cannot have more than one dictionary")); } @@ -178,7 +185,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { let mut decoder = DictDecoder::new(); decoder.set_dict(Box::new(dictionary))?; - self.decoders.insert(encoding, Box::new(decoder)); + self.decoders[encoding as usize] = Some(Box::new(decoder)); + self.decoder_mask.insert(encoding); Ok(()) } else { Err(nyi_err!( @@ -195,25 +203,24 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { num_levels: usize, num_values: Option, ) -> Result<()> { - use std::collections::hash_map::Entry; - if encoding == Encoding::PLAIN_DICTIONARY { encoding = Encoding::RLE_DICTIONARY; } let decoder = if encoding == Encoding::RLE_DICTIONARY { - self.decoders - .get_mut(&encoding) + self.decoders[encoding as usize] + .as_mut() .expect("Decoder for dict should have been set") } else { - // Search cache for data page decoder - match self.decoders.entry(encoding) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(v) => { - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - v.insert(data_decoder) - } + let slot = encoding as usize; + if self.decoders[slot].is_none() { + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders[slot] = Some(data_decoder); + self.decoder_mask.insert(encoding); } + self.decoders[slot] + .as_mut() + .expect("decoder should have been inserted") }; decoder.set_data(data, num_values.unwrap_or(num_levels))?; @@ -226,9 +233,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .current_encoding .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(&encoding) + let current_decoder = self.decoders[encoding as usize] + .as_mut() .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); // TODO: Push vec into decoder (#5177) @@ -244,9 +250,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .current_encoding .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(&encoding) + let current_decoder = self.decoders[encoding as usize] + .as_mut() .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); current_decoder.skip(num_values)