diff --git a/crates/polars-arrow/src/array/dictionary/typed_iterator.rs b/crates/polars-arrow/src/array/dictionary/typed_iterator.rs index 5e4d89db695d..5257bde2cae0 100644 --- a/crates/polars-arrow/src/array/dictionary/typed_iterator.rs +++ b/crates/polars-arrow/src/array/dictionary/typed_iterator.rs @@ -42,7 +42,7 @@ impl DictValue for Utf8Array { arr.null_count(), 0, "null values in values not supported in iteration" - ) + ); }) } } @@ -69,7 +69,7 @@ impl DictValue for Utf8ViewArray { arr.null_count(), 0, "null values in values not supported in iteration" - ) + ); }) } } diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index 3b3528310318..ce26c1a57c77 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -435,10 +435,10 @@ mod stats { // Default: read the file _ => Ok(true), }; - out.inspect(|read| { - if state.verbose() && *read { + out.inspect(|&read| { + if state.verbose() && read { eprintln!("parquet file must be read, statistics not sufficient for predicate.") - } else if state.verbose() && !*read { + } else if state.verbose() && !read { eprintln!("parquet file can be skipped, the statistics were sufficient to apply the predicate.") } }) diff --git a/crates/polars-io/src/parquet/read/mmap.rs b/crates/polars-io/src/parquet/read/mmap.rs index 5d7c67ff318c..c2b8da81a7b4 100644 --- a/crates/polars-io/src/parquet/read/mmap.rs +++ b/crates/polars-io/src/parquet/read/mmap.rs @@ -6,7 +6,7 @@ use polars_core::datatypes::PlHashMap; use polars_error::PolarsResult; use polars_parquet::read::{ column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData, - PageReader, + Filter, PageReader, }; use polars_utils::mmap::{MemReader, MemSlice}; @@ -87,5 +87,5 @@ pub(super) fn to_deserializer<'a>( }) .unzip(); - column_iter_to_arrays(columns, types, field, num_rows) + column_iter_to_arrays(columns, types, field, Some(Filter::new_limited(num_rows))) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs index abdd372088cc..4fa516aa763f 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use arrow::array::specification::try_check_utf8; use arrow::array::{Array, BinaryArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8Array}; -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::bitmap::MutableBitmap; use arrow::datatypes::{ArrowDataType, PhysicalType}; use arrow::offset::Offset; use polars_error::PolarsResult; @@ -33,14 +33,13 @@ impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder> for BinaryStateTransl page: &'a DataPage, dict: Option<&'a as utils::Decoder>::Dict>, page_validity: Option<&utils::PageValidity<'a>>, - filter: Option<&utils::filter::Filter<'a>>, ) -> PolarsResult { let is_string = matches!( page.descriptor.primitive_type.logical_type, Some(PrimitiveLogicalType::String) ); decoder.check_utf8.store(is_string, Ordering::Relaxed); - BinaryStateTranslation::new(page, dict, page_validity, filter, is_string) + BinaryStateTranslation::new(page, dict, page_validity, is_string) } fn len_when_not_nullable(&self) -> usize { @@ -154,6 +153,7 @@ impl utils::Decoder for BinaryDecoder { type Translation<'a> = BinaryStateTranslation<'a>; type Dict = BinaryDict; type DecodedState = (Binary, MutableBitmap); + type Output = Box; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( @@ -272,16 +272,19 @@ impl utils::Decoder for BinaryDecoder { fn finalize( &self, data_type: ArrowDataType, + _dict: Option, (values, validity): Self::DecodedState, ) -> ParquetResult> { super::finalize(data_type, values, validity) } +} +impl utils::DictDecodable for BinaryDecoder { fn finalize_dict_array( &self, data_type: ArrowDataType, dict: Self::Dict, - (values, validity): (Vec, Option), + keys: PrimitiveArray, ) -> ParquetResult> { let value_data_type = match data_type.clone() { ArrowDataType::Dictionary(_, values, _) => *values, @@ -299,10 +302,8 @@ impl utils::Decoder for BinaryDecoder { _ => unreachable!(), }; - let indices = PrimitiveArray::new(K::PRIMITIVE.into(), values.into(), validity); - // @TODO: Is this datatype correct? - Ok(DictionaryArray::try_new(data_type, indices, dict).unwrap()) + Ok(DictionaryArray::try_new(data_type, keys, dict).unwrap()) } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index 073f73d354ca..bb1e77b870dd 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -7,7 +7,6 @@ use super::utils::*; use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding}; use crate::parquet::error::ParquetResult; use crate::parquet::page::{split_buffer, DataPage}; -use crate::read::deserialize::utils::filter::Filter; use crate::read::deserialize::utils::PageValidity; pub(crate) type BinaryDict = BinaryArray; @@ -150,7 +149,6 @@ impl<'a> BinaryStateTranslation<'a> { page: &'a DataPage, dict: Option<&'a BinaryDict>, _page_validity: Option<&PageValidity<'a>>, - _filter: Option<&Filter<'a>>, is_string: bool, ) -> PolarsResult { match (page.encoding(), dict) { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs index b45905f3cbab..26e5045d74a2 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs @@ -13,7 +13,6 @@ use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{DataPage, DictPage}; use crate::read::deserialize::binary::utils::BinaryIter; -use crate::read::deserialize::utils::filter::Filter; use crate::read::deserialize::utils::{ self, binary_views_dict, extend_from_decoder, Decoder, PageValidity, StateTranslation, TranslatedHybridRle, @@ -30,14 +29,13 @@ impl<'a> StateTranslation<'a, BinViewDecoder> for BinaryStateTranslation<'a> { page: &'a DataPage, dict: Option<&'a ::Dict>, page_validity: Option<&PageValidity<'a>>, - filter: Option<&Filter<'a>>, ) -> PolarsResult { let is_string = matches!( page.descriptor.primitive_type.logical_type, Some(PrimitiveLogicalType::String) ); decoder.check_utf8.store(is_string, Ordering::Relaxed); - Self::new(page, dict, page_validity, filter, is_string) + Self::new(page, dict, page_validity, is_string) } fn len_when_not_nullable(&self) -> usize { @@ -149,6 +147,7 @@ impl utils::Decoder for BinViewDecoder { type Translation<'a> = BinaryStateTranslation<'a>; type Dict = BinaryDict; type DecodedState = DecodedStateTuple; + type Output = Box; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( @@ -232,6 +231,7 @@ impl utils::Decoder for BinViewDecoder { fn finalize( &self, data_type: ArrowDataType, + _dict: Option, (values, validity): Self::DecodedState, ) -> ParquetResult> { let mut array: BinaryViewArray = values.freeze(); @@ -260,12 +260,14 @@ impl utils::Decoder for BinViewDecoder { _ => unreachable!(), } } +} +impl utils::DictDecodable for BinViewDecoder { fn finalize_dict_array( &self, data_type: ArrowDataType, dict: Self::Dict, - (values, validity): (Vec, Option), + keys: PrimitiveArray, ) -> ParquetResult> { let value_data_type = match &data_type { ArrowDataType::Dictionary(_, values, _) => values.as_ref().clone(), @@ -278,14 +280,13 @@ impl utils::Decoder for BinViewDecoder { } let view_dict = view_dict.freeze(); - let array = PrimitiveArray::::new(K::PRIMITIVE.into(), values.into(), validity); let dict = match value_data_type.to_physical_type() { PhysicalType::Utf8View => view_dict.to_utf8view().unwrap().boxed(), PhysicalType::BinaryView => view_dict.boxed(), _ => unreachable!(), }; - Ok(DictionaryArray::try_new(data_type, array, dict).unwrap()) + Ok(DictionaryArray::try_new(data_type, keys, dict).unwrap()) } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs index 0f5fe49be768..80fab4c372ee 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs @@ -11,7 +11,6 @@ use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::encoding::Encoding; use crate::parquet::error::ParquetResult; use crate::parquet::page::{split_buffer, DataPage, DictPage}; -use crate::read::deserialize::utils::filter::Filter; use crate::read::deserialize::utils::{BatchableCollector, PageValidity}; #[allow(clippy::large_enum_variant)] @@ -29,7 +28,6 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> { page: &'a DataPage, _dict: Option<&'a ::Dict>, page_validity: Option<&PageValidity<'a>>, - _filter: Option<&Filter<'a>>, ) -> PolarsResult { let values = split_buffer(page)?.values; @@ -189,6 +187,7 @@ impl Decoder for BooleanDecoder { type Translation<'a> = StateTranslation<'a>; type Dict = (); type DecodedState = (MutableBitmap, MutableBitmap); + type Output = BooleanArray; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( @@ -230,22 +229,10 @@ impl Decoder for BooleanDecoder { fn finalize( &self, data_type: ArrowDataType, + _dict: Option, (values, validity): Self::DecodedState, - ) -> ParquetResult> { - Ok(Box::new(BooleanArray::new( - data_type, - values.into(), - validity.into(), - ))) - } - - fn finalize_dict_array( - &self, - _data_type: ArrowDataType, - _dict: Self::Dict, - _decoded: (Vec, Option), - ) -> ParquetResult> { - unimplemented!() + ) -> ParquetResult { + Ok(BooleanArray::new(data_type, values.into(), validity.into())) } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs index 87e92e6c33ee..2fa74cb02976 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs @@ -1,12 +1,13 @@ -use arrow::array::{Array, DictionaryKey, PrimitiveArray}; +use std::sync::atomic::AtomicUsize; + +use arrow::array::{DictionaryArray, DictionaryKey, PrimitiveArray}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use polars_error::PolarsResult; -use super::utils::filter::Filter; use super::utils::{ - self, dict_indices_decoder, extend_from_decoder, BatchableCollector, Decoder, PageValidity, - StateTranslation, + self, dict_indices_decoder, extend_from_decoder, BatchableCollector, Decoder, DictDecodable, + ExactSize, PageValidity, StateTranslation, }; use super::ParquetError; use crate::parquet::encoding::hybrid_rle::{self, HybridRleDecoder, Translator}; @@ -14,15 +15,16 @@ use crate::parquet::encoding::Encoding; use crate::parquet::error::ParquetResult; use crate::parquet::page::{DataPage, DictPage}; -impl<'a, K: DictionaryKey> StateTranslation<'a, DictionaryDecoder> for HybridRleDecoder<'a> { +impl<'a, K: DictionaryKey, D: utils::DictDecodable> StateTranslation<'a, DictionaryDecoder> + for HybridRleDecoder<'a> +{ type PlainDecoder = HybridRleDecoder<'a>; fn new( - _decoder: &DictionaryDecoder, + _decoder: &DictionaryDecoder, page: &'a DataPage, - _dict: Option<&'a as Decoder>::Dict>, + _dict: Option<&'a as Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, - _filter: Option<&Filter<'a>>, ) -> PolarsResult { if !matches!( page.encoding(), @@ -44,16 +46,22 @@ impl<'a, K: DictionaryKey> StateTranslation<'a, DictionaryDecoder> for Hybrid fn extend_from_state( &mut self, - decoder: &mut DictionaryDecoder, - decoded: &mut as Decoder>::DecodedState, + decoder: &mut DictionaryDecoder, + decoded: &mut as Decoder>::DecodedState, page_validity: &mut Option>, additional: usize, ) -> ParquetResult<()> { let (values, validity) = decoded; + let dict_size = decoder.dict_size.load(std::sync::atomic::Ordering::Relaxed); + + if dict_size == usize::MAX { + panic!("Dictionary not set for dictionary array"); + } + let mut collector = DictArrayCollector { values: self, - dict_size: decoder.dict_size, + dict_size, }; match page_validity { @@ -68,24 +76,27 @@ impl<'a, K: DictionaryKey> StateTranslation<'a, DictionaryDecoder> for Hybrid } #[derive(Debug)] -pub struct DictionaryDecoder { - dict_size: usize, +pub struct DictionaryDecoder { + dict_size: AtomicUsize, + decoder: D, _pd: std::marker::PhantomData, } -impl DictionaryDecoder { - pub fn new(dict_size: usize) -> Self { +impl DictionaryDecoder { + pub fn new(decoder: D) -> Self { Self { - dict_size, + dict_size: AtomicUsize::new(usize::MAX), + decoder, _pd: std::marker::PhantomData, } } } -impl utils::Decoder for DictionaryDecoder { +impl utils::Decoder for DictionaryDecoder { type Translation<'a> = HybridRleDecoder<'a>; - type Dict = (); + type Dict = D::Dict; type DecodedState = (Vec, MutableBitmap); + type Output = DictionaryArray; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( @@ -94,18 +105,29 @@ impl utils::Decoder for DictionaryDecoder { ) } - fn deserialize_dict(&self, _: DictPage) -> Self::Dict {} + fn deserialize_dict(&self, page: DictPage) -> Self::Dict { + let dict = self.decoder.deserialize_dict(page); + self.dict_size + .store(dict.len(), std::sync::atomic::Ordering::Relaxed); + dict + } fn finalize( &self, - _data_type: ArrowDataType, + data_type: ArrowDataType, + dict: Option, (values, validity): Self::DecodedState, - ) -> ParquetResult> { - Ok(Box::new(PrimitiveArray::new( - K::PRIMITIVE.into(), - values.into(), - validity.into(), - ))) + ) -> ParquetResult> { + let validity = if validity.is_empty() || validity.unset_bits() == 0 { + None + } else { + Some(validity.freeze()) + }; + + let dict = dict.unwrap(); + let keys = PrimitiveArray::new(K::PRIMITIVE.into(), values.into(), validity); + + self.decoder.finalize_dict_array(data_type, dict, keys) } fn decode_plain_encoded<'a>( @@ -128,18 +150,9 @@ impl utils::Decoder for DictionaryDecoder { ) -> ParquetResult<()> { unreachable!() } - - fn finalize_dict_array( - &self, - _data_type: ArrowDataType, - _dict: Self::Dict, - _decoded: (Vec, Option), - ) -> ParquetResult> { - unimplemented!() - } } -impl utils::NestedDecoder for DictionaryDecoder { +impl utils::NestedDecoder for DictionaryDecoder { fn validity_extend( _: &mut utils::State<'_, Self>, (_, validity): &mut Self::DecodedState, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs index 2c1bbef0cb28..7936307f9c97 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs @@ -1,5 +1,5 @@ -use arrow::array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}; -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::array::{DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}; +use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use polars_error::PolarsResult; @@ -8,7 +8,6 @@ use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; use crate::parquet::encoding::{hybrid_rle, Encoding}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{split_buffer, DataPage, DictPage}; -use crate::read::deserialize::utils::filter::Filter; use crate::read::deserialize::utils::{self, BatchableCollector, GatheredHybridRle, PageValidity}; #[allow(clippy::large_enum_variant)] @@ -31,7 +30,6 @@ impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> { page: &'a DataPage, dict: Option<&'a ::Dict>, _page_validity: Option<&PageValidity<'a>>, - _filter: Option<&Filter<'a>>, ) -> PolarsResult { match (page.encoding(), dict) { (Encoding::Plain, _) => { @@ -122,6 +120,7 @@ impl Decoder for BinaryDecoder { type Translation<'a> = StateTranslation<'a>; type Dict = Vec; type DecodedState = (FixedSizeBinary, MutableBitmap); + type Output = FixedSizeBinaryArray; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { let size = self.size; @@ -272,25 +271,27 @@ impl Decoder for BinaryDecoder { fn finalize( &self, data_type: ArrowDataType, + _dict: Option, (values, validity): Self::DecodedState, - ) -> ParquetResult> { - Ok(Box::new(FixedSizeBinaryArray::new( + ) -> ParquetResult { + Ok(FixedSizeBinaryArray::new( data_type, values.values.into(), validity.into(), - ))) + )) } +} +impl utils::DictDecodable for BinaryDecoder { fn finalize_dict_array( &self, data_type: ArrowDataType, dict: Self::Dict, - (values, validity): (Vec, Option), + keys: PrimitiveArray, ) -> ParquetResult> { let dict = FixedSizeBinaryArray::new(ArrowDataType::FixedSizeBinary(self.size), dict.into(), None); - let array = PrimitiveArray::::new(K::PRIMITIVE.into(), values.into(), validity); - Ok(DictionaryArray::try_new(data_type, array, Box::new(dict)).unwrap()) + Ok(DictionaryArray::try_new(data_type, keys, Box::new(dict)).unwrap()) } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs index f80fdd94ca88..9dd573b643a0 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs @@ -18,7 +18,8 @@ use arrow::offset::Offsets; use polars_utils::mmap::MemReader; use simple::page_iter_to_array; -pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState}; +pub use self::nested_utils::{init_nested, InitNested, NestedState}; +pub use self::utils::filter::Filter; use super::*; use crate::parquet::read::get_page_iterator as _get_page_iterator; use crate::parquet::schema::types::PrimitiveType; @@ -127,28 +128,25 @@ fn is_primitive(data_type: &ArrowDataType) -> bool { ) } -fn columns_to_iter_recursive<'a, I>( - mut columns: Vec>, +fn columns_to_iter_recursive( + mut columns: Vec, mut types: Vec<&PrimitiveType>, field: Field, init: Vec, - num_rows: usize, -) -> PolarsResult<(NestedState, Box)> -where - I: 'a + CompressedPagesIter, -{ + filter: Option, +) -> PolarsResult<(NestedState, Box)> { if init.is_empty() && is_primitive(&field.data_type) { let array = page_iter_to_array( columns.pop().unwrap(), types.pop().unwrap(), field.data_type, - num_rows, + filter, )?; return Ok((NestedState::default(), array)); } - nested::columns_to_iter_recursive(columns, types, field, init, num_rows) + nested::columns_to_iter_recursive(columns, types, field, init, filter) } /// Returns the number of (parquet) columns that a [`ArrowDataType`] contains. @@ -194,16 +192,13 @@ pub fn n_columns(data_type: &ArrowDataType) -> usize { /// For nested types, `columns` must be composed by all parquet columns with associated types `types`. /// /// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`. -pub fn column_iter_to_arrays<'a, I>( - columns: Vec>, +pub fn column_iter_to_arrays<'a>( + columns: Vec, types: Vec<&PrimitiveType>, field: Field, - num_rows: usize, -) -> PolarsResult> -where - I: 'a + CompressedPagesIter, -{ - let (_, array) = columns_to_iter_recursive(columns, types, field, vec![], num_rows)?; + filter: Option, +) -> PolarsResult> { + let (_, array) = columns_to_iter_recursive(columns, types, field, vec![], filter)?; Ok(Box::new(std::iter::once(Ok(array)))) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs index 4a122580e53a..9a300cc7231d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs @@ -1,23 +1,19 @@ -use arrow::array::{DictionaryArray, FixedSizeBinaryArray, PrimitiveArray, StructArray}; +use arrow::array::{DictionaryArray, PrimitiveArray, StructArray}; use arrow::match_integer_type; use ethnum::I256; use polars_error::polars_bail; -use self::nested::deserialize::nested_utils::PageNestedDictArrayDecoder; use self::nested_utils::PageNestedDecoder; use self::primitive::{self}; use super::*; -pub fn columns_to_iter_recursive( - mut columns: Vec>, +pub fn columns_to_iter_recursive( + mut columns: Vec, mut types: Vec<&PrimitiveType>, field: Field, mut init: Vec, - num_rows: usize, -) -> PolarsResult<(NestedState, Box)> -where - I: CompressedPagesIter, -{ + filter: Option, +) -> PolarsResult<(NestedState, Box)> { use arrow::datatypes::PhysicalType::*; use arrow::datatypes::PrimitiveType::*; @@ -32,7 +28,8 @@ where null::NullDecoder, init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Boolean => { init.push(InitNested::Primitive(field.is_nullable)); @@ -43,7 +40,8 @@ where boolean::BooleanDecoder, init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(Int8) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -54,7 +52,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(Int16) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -65,7 +64,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(Int32) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -76,7 +76,8 @@ where primitive::PrimitiveDecoder::::unit(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(Int64) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -87,7 +88,8 @@ where primitive::PrimitiveDecoder::::unit(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(UInt8) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -98,7 +100,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(UInt16) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -109,7 +112,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(UInt32) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -121,7 +125,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)?, + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))?, // some implementations of parquet write arrow's u32 into i64. PhysicalType::Int64 => PageNestedDecoder::new( columns.pop().unwrap(), @@ -129,7 +134,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)?, + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))?, other => { polars_bail!(ComputeError: "deserializing UInt32 from {other:?}'s parquet" @@ -146,7 +152,8 @@ where primitive::PrimitiveDecoder::::cast_as(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(Float32) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -157,7 +164,8 @@ where primitive::PrimitiveDecoder::::unit(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, Primitive(Float64) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -168,7 +176,8 @@ where primitive::PrimitiveDecoder::::unit(), init, )? - .collect_n(num_rows)? + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))? }, BinaryView | Utf8View => { init.push(InitNested::Primitive(field.is_nullable)); @@ -179,7 +188,7 @@ where binview::BinViewDecoder::default(), init, )? - .collect_n(num_rows)? + .collect_n(filter)? }, LargeBinary | LargeUtf8 => { init.push(InitNested::Primitive(field.is_nullable)); @@ -190,7 +199,7 @@ where binary::BinaryDecoder::::default(), init, )? - .collect_n(num_rows)? + .collect_n(filter)? }, _ => match field.data_type().to_logical_type() { ArrowDataType::Dictionary(key_type, _, _) => { @@ -200,7 +209,7 @@ where let data_type = field.data_type().clone(); match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(iter, init, type_, data_type, num_rows).map(|(s, arr)| (s, Box::new(arr) as Box<_>)) + dict_read::<$K>(iter, init, type_, data_type, filter).map(|(s, arr)| (s, Box::new(arr) as Box<_>)) })? }, ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => { @@ -210,7 +219,7 @@ where types, inner.as_ref().clone(), init, - num_rows, + filter, )?; let array = create_list(field.data_type().clone(), &mut nested, array); (nested, array) @@ -222,7 +231,7 @@ where types, inner.as_ref().clone(), init, - num_rows, + filter, )?; let array = create_list(field.data_type().clone(), &mut nested, array); (nested, array) @@ -237,14 +246,16 @@ where primitive::PrimitiveDecoder::::cast_into(), init, )? - .collect_n(num_rows)?, + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))?, PhysicalType::Int64 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type.clone(), primitive::PrimitiveDecoder::::cast_into(), init, )? - .collect_n(num_rows)?, + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))?, PhysicalType::FixedLenByteArray(n) if n > 16 => { polars_bail!( ComputeError: "Can't decode Decimal128 type from `FixedLenByteArray` of len {n}" @@ -257,12 +268,7 @@ where fixed_size_binary::BinaryDecoder { size }, init, )? - .collect_n(num_rows)?; - - let array = array - .as_any() - .downcast_ref::() - .unwrap(); + .collect_n(filter)?; // Convert the fixed length byte array to Decimal. let values = array @@ -301,14 +307,16 @@ where primitive::PrimitiveDecoder::closure(|x: i32| i256(I256::new(x as i128))), init, )? - .collect_n(num_rows)?, + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))?, PhysicalType::Int64 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type.clone(), primitive::PrimitiveDecoder::closure(|x: i64| i256(I256::new(x as i128))), init, )? - .collect_n(num_rows)?, + .collect_n(filter) + .map(|(s, a)| (s, Box::new(a) as Box<_>))?, PhysicalType::FixedLenByteArray(size) if size <= 16 => { let (mut nested, array) = PageNestedDecoder::new( columns.pop().unwrap(), @@ -316,12 +324,7 @@ where fixed_size_binary::BinaryDecoder { size }, init, )? - .collect_n(num_rows)?; - - let array = array - .as_any() - .downcast_ref::() - .unwrap(); + .collect_n(filter)?; // Convert the fixed length byte array to Decimal. let values = array @@ -350,12 +353,7 @@ where fixed_size_binary::BinaryDecoder { size }, init, )? - .collect_n(num_rows)?; - - let array = array - .as_any() - .downcast_ref::() - .unwrap(); + .collect_n(filter)?; // Convert the fixed length byte array to Decimal. let values = array @@ -399,7 +397,7 @@ where let n = n_columns(&f.data_type); let columns = columns.drain(columns.len() - n..).collect(); let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive(columns, types, f.clone(), init, num_rows) + columns_to_iter_recursive(columns, types, f.clone(), init, filter.clone()) }) .collect::)>>>()?; @@ -431,7 +429,7 @@ where types, inner.as_ref().clone(), init, - num_rows, + filter, )?; let array = create_map(field.data_type().clone(), &mut nested, array); (nested, array) @@ -445,12 +443,12 @@ where }) } -fn dict_read<'a, K: DictionaryKey, I: 'a + CompressedPagesIter>( - iter: BasicDecompressor, +fn dict_read( + iter: BasicDecompressor, init: Vec, _type_: &PrimitiveType, data_type: ArrowDataType, - num_rows: usize, + filter: Option, ) -> PolarsResult<(NestedState, DictionaryArray)> { use ArrowDataType::*; let values_data_type = if let Dictionary(_, v, _) = &data_type { @@ -460,94 +458,108 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + CompressedPagesIter>( }; Ok(match values_data_type.to_logical_type() { - UInt8 => PageNestedDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - init, - )? - .collect_n(num_rows)?, - UInt16 => PageNestedDictArrayDecoder::<_, K, _>::new( + UInt8 => { + PageNestedDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), + init, + )? + .collect_n(filter)? + }, + UInt16 => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::cast_as(), + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), init, )? - .collect_n(num_rows)?, - UInt32 => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + UInt32 => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::cast_as(), + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), init, )? - .collect_n(num_rows)?, - Int8 => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + Int8 => { + PageNestedDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), + init, + )? + .collect_n(filter)? + }, + Int16 => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::cast_as(), + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), init, )? - .collect_n(num_rows)?, - Int16 => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::cast_as(), + dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::::unit()), init, )? - .collect_n(num_rows)?, - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - PageNestedDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::unit(), - init, - )? - .collect_n(num_rows)? - }, - Int64 | Date64 | Time64(_) | Duration(_) => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + Int64 | Date64 | Time64(_) | Duration(_) => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::cast_as(), + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), init, )? - .collect_n(num_rows)?, - Float32 => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + Float32 => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::unit(), + dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::::unit()), init, )? - .collect_n(num_rows)?, - Float64 => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + Float64 => PageNestedDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::unit(), + dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::::unit()), init, )? - .collect_n(num_rows)?, - LargeUtf8 | LargeBinary => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + LargeUtf8 | LargeBinary => PageNestedDecoder::new( iter, data_type, - binary::BinaryDecoder::::default(), + dictionary::DictionaryDecoder::new(binary::BinaryDecoder::::default()), init, )? - .collect_n(num_rows)?, - Utf8View | BinaryView => PageNestedDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + Utf8View | BinaryView => PageNestedDecoder::new( iter, data_type, - binview::BinViewDecoder::default(), + dictionary::DictionaryDecoder::new(binview::BinViewDecoder::default()), init, )? - .collect_n(num_rows)?, + .collect_n(filter)?, FixedSizeBinary(size) => { let size = *size; - PageNestedDictArrayDecoder::<_, K, _>::new( + PageNestedDecoder::new( iter, data_type, - fixed_size_binary::BinaryDecoder { size }, + dictionary::DictionaryDecoder::new(fixed_size_binary::BinaryDecoder { size }), init, )? - .collect_n(num_rows)? + .collect_n(filter)? }, /* diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index 515eb916b3d7..b7064e6a1bab 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -1,16 +1,16 @@ -use arrow::array::{Array, DictionaryArray, DictionaryKey}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use polars_error::PolarsResult; use super::utils::{self, BatchableCollector}; -use super::{BasicDecompressor, CompressedPagesIter, ParquetError}; -use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; +use super::{BasicDecompressor, Filter}; +use crate::parquet::encoding::hybrid_rle::gatherer::{ + HybridRleGatherer, ZeroCount, ZeroCountGatherer, +}; use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::error::ParquetResult; -use crate::parquet::page::{split_buffer, DataPage, Page}; +use crate::parquet::page::{split_buffer, DataPage}; use crate::parquet::read::levels::get_bit_width; -use crate::read::deserialize::dictionary::DictionaryDecoder; use crate::read::deserialize::utils::BatchedCollector; #[derive(Debug)] @@ -286,22 +286,7 @@ impl NestedState { } } -#[allow(clippy::too_many_arguments)] -fn extend_offsets2<'a, D: utils::NestedDecoder>( - def_iter: HybridRleDecoder<'a>, - rep_iter: HybridRleDecoder<'a>, - batched_collector: &mut BatchedCollector< - '_, - (), - D::DecodedState, - BatchedNestedDecoder<'a, '_, '_, D>, - >, - nested: &mut [Nested], - additional: usize, - // Amortized allocations - def_levels: &[u16], - rep_levels: &[u16], -) -> PolarsResult { +fn idx_to_limit(rep_iter: &HybridRleDecoder<'_>, idx: usize) -> ParquetResult { struct RowIdxOffsetGatherer; struct RowIdxOffsetState { num_elements_seen: usize, @@ -367,6 +352,131 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>( // @TODO: Add specialization for other methods } + let mut state = RowIdxOffsetState { + num_elements_seen: 0, + top_level_limit: idx, + found: None, + }; + + const ROW_IDX_BATCH_SIZE: usize = 1024; + + let mut row_idx_iter = rep_iter.clone(); + while row_idx_iter.len() > 0 && state.found.is_none() { + row_idx_iter.gather_n_into(&mut state, ROW_IDX_BATCH_SIZE, &RowIdxOffsetGatherer)?; + } + + Ok(state.found.unwrap_or(rep_iter.len())) +} + +#[allow(clippy::too_many_arguments)] +fn extend_offsets2<'a, D: utils::NestedDecoder>( + mut def_iter: HybridRleDecoder<'a>, + mut rep_iter: HybridRleDecoder<'a>, + batched_collector: &mut BatchedCollector< + '_, + (), + D::DecodedState, + BatchedNestedDecoder<'a, '_, '_, D>, + >, + nested: &mut [Nested], + filter: Option, + // Amortized allocations + def_levels: &[u16], + rep_levels: &[u16], +) -> PolarsResult<()> { + match filter { + None => { + extend_offsets_limited( + &mut def_iter, + &mut rep_iter, + batched_collector, + nested, + usize::MAX, + def_levels, + rep_levels, + )?; + + debug_assert_eq!(def_iter.len(), rep_iter.len()); + debug_assert_eq!(def_iter.len(), 0); + + Ok(()) + }, + Some(Filter::Range(range)) => { + let start = range.start; + let end = range.end; + + if start > 0 { + let start_cell = idx_to_limit(&rep_iter, start)?; + + rep_iter.skip_in_place(start_cell)?; + def_iter.skip_in_place(start_cell)?; + } + + if end - start > 0 { + let limit = idx_to_limit(&rep_iter, end - start)?; + + extend_offsets_limited( + &mut def_iter, + &mut rep_iter, + batched_collector, + nested, + limit, + def_levels, + rep_levels, + )?; + } + + // @NOTE: This is kind of unused + let last_skip = def_iter.len(); + rep_iter.skip_in_place(last_skip)?; + def_iter.skip_in_place(last_skip)?; + + Ok(()) + }, + Some(Filter::Mask(bitmap)) => { + let mut iter = bitmap.iter(); + while iter.num_remaining() > 0 { + let num_zeros = iter.take_leading_zeros(); + if num_zeros > 0 { + let offset = idx_to_limit(&rep_iter, num_zeros)?; + rep_iter.skip_in_place(offset)?; + def_iter.skip_in_place(offset)?; + } + + let num_ones = iter.take_leading_ones(); + if num_ones > 0 { + let limit = idx_to_limit(&rep_iter, num_ones)?; + extend_offsets_limited( + &mut def_iter, + &mut rep_iter, + batched_collector, + nested, + limit, + def_levels, + rep_levels, + )?; + } + } + Ok(()) + }, + } +} + +fn extend_offsets_limited<'a, D: utils::NestedDecoder>( + def_iter: &mut HybridRleDecoder<'a>, + rep_iter: &mut HybridRleDecoder<'a>, + batched_collector: &mut BatchedCollector< + '_, + (), + D::DecodedState, + BatchedNestedDecoder<'a, '_, '_, D>, + >, + nested: &mut [Nested], + mut limit: usize, + // Amortized allocations + def_levels: &[u16], + rep_levels: &[u16], +) -> PolarsResult<()> { #[derive(Default)] struct LevelGatherer<'a>(std::marker::PhantomData<&'a ()>); struct LevelGathererState<'a> { @@ -416,29 +526,12 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>( // @TODO: Add specialization for other methods } - const ROW_IDX_BATCH_SIZE: usize = 1024; - let mut state = RowIdxOffsetState { - num_elements_seen: 0, - top_level_limit: additional, - found: None, - }; - - let mut row_idx_iter = rep_iter.clone(); - while row_idx_iter.len() > 0 && state.found.is_none() { - row_idx_iter.gather_n_into(&mut state, ROW_IDX_BATCH_SIZE, &RowIdxOffsetGatherer)?; - } - - debug_assert_eq!(def_iter.len(), rep_iter.len()); - - let mut def_iter = def_iter; - let mut rep_iter = rep_iter; let mut def_values = [0u16; DECODE_BATCH_SIZE]; let mut rep_values = [0u16; DECODE_BATCH_SIZE]; let max_depth = nested.len(); const DECODE_BATCH_SIZE: usize = 1024; - let mut limit = state.found.unwrap_or(def_iter.len()); while def_iter.len() > 0 && limit > 0 { let additional = usize::min(limit, DECODE_BATCH_SIZE); @@ -543,30 +636,17 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>( limit -= additional; } - Ok(def_iter.len() != 0) + Ok(()) } -pub struct PageNestedDecoder { - pub iter: BasicDecompressor, +pub struct PageNestedDecoder { + pub iter: BasicDecompressor, pub data_type: ArrowDataType, pub dict: Option, pub decoder: D, pub init: Vec, } -pub struct PageNestedDictArrayDecoder< - I: CompressedPagesIter, - K: DictionaryKey, - D: utils::NestedDecoder, -> { - pub iter: BasicDecompressor, - pub data_type: ArrowDataType, - pub dict: D::Dict, - pub decoder: D, - pub init: Vec, - _pd: std::marker::PhantomData, -} - /// Return the definition and repetition level iterators for this page. fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder, HybridRleDecoder)> { let split = split_buffer(page)?; @@ -582,9 +662,9 @@ fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder, HybridRleDec Ok((def_iter, rep_iter)) } -impl PageNestedDecoder { +impl PageNestedDecoder { pub fn new( - mut iter: BasicDecompressor, + mut iter: BasicDecompressor, data_type: ArrowDataType, decoder: D, init: Vec, @@ -601,185 +681,115 @@ impl PageNestedDecoder { }) } - pub fn collect_n(mut self, limit: usize) -> ParquetResult<(NestedState, Box)> { - use streaming_decompression::FallibleStreamingIterator; - - let mut target = self.decoder.with_capacity(limit); - // @TODO: Self capacity - let mut nested_state = init_nested(&self.init, 0); - - if limit == 0 { - return Ok((nested_state, self.decoder.finalize(self.data_type, target)?)); - } - - let mut limit = limit; - - // Amortize the allocations. - let (def_levels, rep_levels) = nested_state.levels(); - - loop { - let Some(page) = self.iter.next()? else { - break; - }; - - let Page::Data(page) = page else { - // @TODO This should be removed - unreachable!(); - }; - - let mut values_page = - utils::State::new_nested(&self.decoder, page, self.dict.as_ref())?; - let (def_iter, rep_iter) = level_iters(page)?; - - let start_length = nested_state.len(); - - // @TODO: move this to outside the loop. - let mut batched_collector = BatchedCollector::new( - BatchedNestedDecoder { - state: &mut values_page, - decoder: &mut self.decoder, - }, - &mut target, - ); - - let is_fully_read = extend_offsets2( - def_iter, - rep_iter, - &mut batched_collector, - &mut nested_state.nested, - limit, - &def_levels, - &rep_levels, - )?; - - batched_collector.finalize()?; - - let num_done = nested_state.len() - start_length; - debug_assert!(num_done <= limit); - limit -= num_done; - - debug_assert!(values_page.len() == 0 || limit == 0); - - if is_fully_read { - break; - } - } - - // we pop the primitive off here. - debug_assert!(matches!( - nested_state.nested.last().unwrap().content, - NestedContent::Primitive - )); - _ = nested_state.pop().unwrap(); - - let array = self.decoder.finalize(self.data_type, target)?; - - Ok((nested_state, array)) - } -} - -impl - PageNestedDictArrayDecoder -{ - pub fn new( - mut iter: BasicDecompressor, - data_type: ArrowDataType, - decoder: D, - init: Vec, - ) -> ParquetResult { - let dict_page = iter - .read_dict_page()? - .ok_or(ParquetError::FeatureNotSupported( - "Dictionary array without a dictionary page".to_string(), - ))?; - let dict = decoder.deserialize_dict(dict_page); - - Ok(Self { - iter, - data_type, - dict, - decoder, - init, - _pd: std::marker::PhantomData, - }) - } - - pub fn collect_n(mut self, limit: usize) -> ParquetResult<(NestedState, DictionaryArray)> { - use streaming_decompression::FallibleStreamingIterator; - - let mut target = ( - Vec::with_capacity(limit), - MutableBitmap::with_capacity(limit), - ); + pub fn collect_n(mut self, filter: Option) -> ParquetResult<(NestedState, D::Output)> { + // @TODO: We should probably count the filter so that we don't overallocate + let mut target = self.decoder.with_capacity(self.iter.total_num_values()); // @TODO: Self capacity let mut nested_state = init_nested(&self.init, 0); - if limit == 0 { - let (values, validity) = target; - let validity = if !validity.is_empty() { - Some(validity.freeze()) - } else { - None - }; - return Ok(( - nested_state, - self.decoder - .finalize_dict_array(self.data_type, self.dict, (values, validity))?, - )); - } - - let mut limit = limit; - // Amortize the allocations. let (def_levels, rep_levels) = nested_state.levels(); - loop { - let Some(page) = self.iter.next()? else { - break; - }; - - let Page::Data(page) = page else { - // @TODO This should be removed - unreachable!(); - }; - - use utils::ExactSize; - let mut dictionary_decoder = DictionaryDecoder::new(self.dict.len()); - let mut values_page = utils::State::new_nested(&dictionary_decoder, page, Some(&()))?; - let (def_iter, rep_iter) = level_iters(page)?; - - let start_length = nested_state.len(); - - // @TODO: move this to outside the loop. - let mut batched_collector = BatchedCollector::new( - BatchedNestedDecoder { - state: &mut values_page, - decoder: &mut dictionary_decoder, - }, - &mut target, - ); - - let is_fully_read = extend_offsets2( - def_iter, - rep_iter, - &mut batched_collector, - &mut nested_state.nested, - limit, - &def_levels, - &rep_levels, - )?; - - batched_collector.finalize()?; - - let num_done = nested_state.len() - start_length; - debug_assert!(num_done <= limit); - limit -= num_done; - - debug_assert!(values_page.len() == 0 || limit == 0); - - if is_fully_read { - break; - } + match filter { + None => { + loop { + let Some(page) = self.iter.next() else { + break; + }; + let page = page?; + + let mut state = + utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?; + let (def_iter, rep_iter) = level_iters(&page)?; + + // @TODO: move this to outside the loop. + let mut batched_collector = BatchedCollector::new( + BatchedNestedDecoder { + state: &mut state, + decoder: &mut self.decoder, + }, + &mut target, + ); + + extend_offsets2( + def_iter, + rep_iter, + &mut batched_collector, + &mut nested_state.nested, + None, + &def_levels, + &rep_levels, + )?; + + batched_collector.finalize()?; + + drop(state); + self.iter.reuse_page_buffer(page); + } + }, + Some(mut filter) => { + let mut num_rows_remaining = filter.num_rows(); + + loop { + let Some(page) = self.iter.next() else { + break; + }; + let page = page?; + + let mut state = + utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?; + let (def_iter, rep_iter) = level_iters(&page)?; + + let mut count = ZeroCount::default(); + rep_iter + .clone() + .gather_into(&mut count, &ZeroCountGatherer)?; + + let is_fully_read = count.num_zero > num_rows_remaining; + let state_filter; + (state_filter, filter) = Filter::split_at(&filter, count.num_zero); + let state_filter = if count.num_zero > 0 { + Some(state_filter) + } else { + None + }; + + let start_length = nested_state.len(); + + // @TODO: move this to outside the loop. + let mut batched_collector = BatchedCollector::new( + BatchedNestedDecoder { + state: &mut state, + decoder: &mut self.decoder, + }, + &mut target, + ); + + extend_offsets2( + def_iter, + rep_iter, + &mut batched_collector, + &mut nested_state.nested, + state_filter, + &def_levels, + &rep_levels, + )?; + + batched_collector.finalize()?; + + let num_done = nested_state.len() - start_length; + debug_assert!(num_done <= num_rows_remaining); + debug_assert!(num_done <= count.num_zero); + num_rows_remaining -= num_done; + + drop(state); + self.iter.reuse_page_buffer(page); + + if is_fully_read { + break; + } + } + }, } // we pop the primitive off here. @@ -789,20 +799,8 @@ impl )); _ = nested_state.pop().unwrap(); - let (values, validity) = target; - let validity = if !validity.is_empty() { - Some(validity.freeze()) - } else { - None - }; - let array = - self.decoder - .finalize_dict_array(self.data_type, self.dict, (values, validity))?; + let array = self.decoder.finalize(self.data_type, self.dict, target)?; Ok((nested_state, array)) } } - -/// Type def for a sharable, boxed dyn [`Iterator`] of NestedStates and arrays -pub type NestedArrayIter<'a> = - Box)>> + Send + Sync + 'a>; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null.rs b/crates/polars-parquet/src/arrow/read/deserialize/null.rs index 313993fd8b9c..4ca4a573c0b5 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null.rs @@ -7,6 +7,7 @@ use arrow::datatypes::ArrowDataType; use polars_error::PolarsResult; use super::utils; +use super::utils::filter::Filter; use crate::parquet::encoding::hybrid_rle; use crate::parquet::error::ParquetResult; use crate::parquet::page::{DataPage, DictPage}; @@ -30,7 +31,6 @@ impl<'a> utils::StateTranslation<'a, NullDecoder> for () { _page: &'a DataPage, _dict: Option<&'a ::Dict>, _page_validity: Option<&utils::PageValidity<'a>>, - _filter: Option<&utils::filter::Filter<'a>>, ) -> PolarsResult { Ok(()) } @@ -59,6 +59,7 @@ impl utils::Decoder for NullDecoder { type Translation<'a> = (); type Dict = (); type DecodedState = NullArrayLength; + type Output = NullArray; /// Initializes a new state fn with_capacity(&self, _: usize) -> Self::DecodedState { @@ -91,18 +92,10 @@ impl utils::Decoder for NullDecoder { fn finalize( &self, data_type: ArrowDataType, + _dict: Option, decoded: Self::DecodedState, - ) -> ParquetResult> { - Ok(Box::new(NullArray::new(data_type, decoded.length))) - } - - fn finalize_dict_array( - &self, - _data_type: ArrowDataType, - _dict: Self::Dict, - _decoded: (Vec, Option), - ) -> ParquetResult> { - unimplemented!() + ) -> ParquetResult { + Ok(NullArray::new(data_type, decoded.length)) } } @@ -124,34 +117,37 @@ impl utils::NestedDecoder for NullDecoder { } } -use super::{BasicDecompressor, CompressedPagesIter}; -use crate::parquet::page::Page; +use super::BasicDecompressor; /// Converts [`PagesIter`] to an [`ArrayIter`] -pub fn iter_to_arrays( - mut iter: BasicDecompressor, +pub fn iter_to_arrays( + mut iter: BasicDecompressor, data_type: ArrowDataType, - num_rows: usize, -) -> Box -where - I: CompressedPagesIter, -{ - use streaming_decompression::FallibleStreamingIterator; + mut filter: Option, +) -> ParquetResult> { + let num_rows = Filter::opt_num_rows(&filter, iter.total_num_values()); let mut len = 0usize; - while let Ok(Some(page)) = iter.next() { - match page { - Page::Dict(_) => continue, - Page::Data(page) => { - let rows = page.num_values(); - len = (len + rows).min(num_rows); - if len == num_rows { - break; - } - }, - } + while len < num_rows { + let Some(page) = iter.next() else { + break; + }; + let page = page?; + + let rows = page.num_values(); + let page_filter; + (page_filter, filter) = Filter::opt_split_at(&filter, rows); + + let num_rows = match page_filter { + None => rows, + Some(filter) => filter.num_rows(), + }; + + len = (len + num_rows).min(num_rows); + + iter.reuse_page_buffer(page); } - Box::new(NullArray::new(data_type, len)) + Ok(Box::new(NullArray::new(data_type, len))) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index 910c48e8528d..0fb230b8e1d7 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -1,5 +1,5 @@ -use arrow::array::{Array, DictionaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}; -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::array::{DictionaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}; +use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; use polars_error::PolarsResult; @@ -11,7 +11,6 @@ use crate::parquet::error::ParquetResult; use crate::parquet::page::{split_buffer, DataPage, DictPage}; use crate::parquet::types::{decode, NativeType as ParquetNativeType}; use crate::read::deserialize::utils::array_chunks::ArrayChunks; -use crate::read::deserialize::utils::filter::Filter; use crate::read::deserialize::utils::{ BatchableCollector, Decoder, PageValidity, TranslatedHybridRle, }; @@ -172,7 +171,6 @@ where page: &'a DataPage, dict: Option<&'a as utils::Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, - _filter: Option<&Filter<'a>>, ) -> PolarsResult { match (page.encoding(), dict) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => { @@ -347,6 +345,7 @@ where type Translation<'a> = StateTranslation<'a, P, T>; type Dict = Vec; type DecodedState = (Vec, MutableBitmap); + type Output = PrimitiveArray; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( @@ -428,36 +427,41 @@ where fn finalize( &self, data_type: ArrowDataType, + _dict: Option, (values, validity): Self::DecodedState, - ) -> ParquetResult> { + ) -> ParquetResult { let validity = if validity.is_empty() { None } else { Some(validity) }; - Ok(Box::new( - MutablePrimitiveArray::try_new(data_type, values, validity) - .unwrap() - .freeze(), - )) + Ok(MutablePrimitiveArray::try_new(data_type, values, validity) + .unwrap() + .freeze()) } +} +impl utils::DictDecodable for PrimitiveDecoder +where + T: NativeType, + P: ParquetNativeType, + D: DecoderFunction, +{ fn finalize_dict_array( &self, data_type: ArrowDataType, dict: Self::Dict, - (values, validity): (Vec, Option), + keys: PrimitiveArray, ) -> ParquetResult> { let value_type = match &data_type { ArrowDataType::Dictionary(_, value, _) => value.as_ref().clone(), _ => T::PRIMITIVE.into(), }; - let array = PrimitiveArray::::new(K::PRIMITIVE.into(), values.into(), validity); let dict = Box::new(PrimitiveArray::new(value_type, dict.into(), None)); - Ok(DictionaryArray::try_new(data_type, array, dict).unwrap()) + Ok(DictionaryArray::try_new(data_type, keys, dict).unwrap()) } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index 542cc84cd2ef..ba428d6bdbb4 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -1,5 +1,5 @@ -use arrow::array::{Array, DictionaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}; -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::array::{DictionaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}; +use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; use num_traits::AsPrimitive; @@ -16,7 +16,6 @@ use crate::parquet::error::ParquetResult; use crate::parquet::page::{split_buffer, DataPage, DictPage}; use crate::parquet::types::{decode, NativeType as ParquetNativeType}; use crate::read::deserialize::utils::array_chunks::ArrayChunks; -use crate::read::deserialize::utils::filter::Filter; use crate::read::deserialize::utils::{ BatchableCollector, Decoder, PageValidity, TranslatedHybridRle, }; @@ -44,7 +43,6 @@ where page: &'a DataPage, dict: Option<&'a as utils::Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, - _filter: Option<&Filter<'a>>, ) -> PolarsResult { match (page.encoding(), dict) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => { @@ -248,6 +246,7 @@ where type Translation<'a> = StateTranslation<'a, P, T>; type Dict = Vec; type DecodedState = (Vec, MutableBitmap); + type Output = PrimitiveArray; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { self.0.with_capacity(capacity) @@ -326,36 +325,42 @@ where fn finalize( &self, data_type: ArrowDataType, + _dict: Option, (values, validity): Self::DecodedState, - ) -> ParquetResult> { + ) -> ParquetResult { let validity = if validity.is_empty() { None } else { Some(validity) }; - Ok(Box::new( - MutablePrimitiveArray::try_new(data_type, values, validity) - .unwrap() - .freeze(), - )) + Ok(MutablePrimitiveArray::try_new(data_type, values, validity) + .unwrap() + .freeze()) } +} +impl utils::DictDecodable for IntDecoder +where + T: NativeType, + P: ParquetNativeType, + i64: num_traits::AsPrimitive

, + D: DecoderFunction, +{ fn finalize_dict_array( &self, data_type: ArrowDataType, dict: Self::Dict, - (values, validity): (Vec, Option), + keys: PrimitiveArray, ) -> ParquetResult> { let value_type = match &data_type { ArrowDataType::Dictionary(_, value, _) => value.as_ref().clone(), _ => T::PRIMITIVE.into(), }; - let array = PrimitiveArray::::new(K::PRIMITIVE.into(), values.into(), validity); let dict = Box::new(PrimitiveArray::new(value_type, dict.into(), None)); - Ok(DictionaryArray::try_new(data_type, array, dict).unwrap()) + Ok(DictionaryArray::try_new(data_type, keys, dict).unwrap()) } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/simple.rs b/crates/polars-parquet/src/arrow/read/deserialize/simple.rs index 0b3068e1a6c6..91a94d669c60 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/simple.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/simple.rs @@ -5,9 +5,9 @@ use arrow::types::{days_ms, i256}; use ethnum::I256; use polars_error::{polars_bail, PolarsResult}; -use super::utils::PageDictArrayDecoder; +use super::utils::filter::Filter; use super::{ - binary, boolean, fixed_size_binary, null, primitive, BasicDecompressor, CompressedPagesIter, + binary, boolean, dictionary, fixed_size_binary, null, primitive, BasicDecompressor, ParquetResult, }; use crate::parquet::error::ParquetError; @@ -20,11 +20,11 @@ use crate::read::deserialize::utils::PageDecoder; /// An iterator adapter that maps an iterator of Pages a boxed [`Array`] of [`ArrowDataType`] /// `data_type` with a maximum of `num_rows` elements. -pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( - pages: BasicDecompressor, +pub fn page_iter_to_array( + pages: BasicDecompressor, type_: &PrimitiveType, data_type: ArrowDataType, - num_rows: usize, + filter: Option, ) -> PolarsResult> { use ArrowDataType::*; @@ -32,52 +32,52 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( let logical_type = &type_.logical_type; Ok(match (physical_type, data_type.to_logical_type()) { - (_, Null) => null::iter_to_arrays(pages, data_type, num_rows), + (_, Null) => null::iter_to_arrays(pages, data_type, filter)?, (PhysicalType::Boolean, Boolean) => { - PageDecoder::new(pages, data_type, boolean::BooleanDecoder)?.collect_n(num_rows)? + Box::new(PageDecoder::new(pages, data_type, boolean::BooleanDecoder)?.collect_n(filter)?) }, - (PhysicalType::Int32, UInt8) => PageDecoder::new( + (PhysicalType::Int32, UInt8) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Int32, UInt16) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int32, UInt16) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Int32, UInt32) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int32, UInt32) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Int64, UInt32) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int64, UInt32) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Int32, Int8) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int32, Int8) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Int32, Int16) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int32, Int16) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Int32, Int32 | Date32 | Time32(_)) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int32, Int32 | Date32 | Time32(_)) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::unit(), )? - .collect_n(num_rows)?, + .collect_n(filter)?), (PhysicalType::Int64 | PhysicalType::Int96, Timestamp(time_unit, _)) => { let time_unit = *time_unit; return timestamp( @@ -85,15 +85,15 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( physical_type, logical_type, data_type, - num_rows, + filter, time_unit, ); }, (PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => { let size = FixedSizeBinaryArray::get_size(&data_type); - PageDecoder::new(pages, data_type, fixed_size_binary::BinaryDecoder { size })? - .collect_n(num_rows)? + Box::new(PageDecoder::new(pages, data_type, fixed_size_binary::BinaryDecoder { size })? + .collect_n(filter)?) }, (PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::YearMonth)) => { // @TODO: Make a separate decoder for this @@ -104,12 +104,9 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( ArrowDataType::FixedSizeBinary(n), fixed_size_binary::BinaryDecoder { size: n }, )? - .collect_n(num_rows)?; + .collect_n(filter)?; let values = array - .as_any() - .downcast_ref::() - .unwrap() .values() .chunks_exact(n) .map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap())) @@ -131,12 +128,9 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( ArrowDataType::FixedSizeBinary(n), fixed_size_binary::BinaryDecoder { size: n }, )? - .collect_n(num_rows)?; + .collect_n(filter)?; let values = array - .as_any() - .downcast_ref::() - .unwrap() .values() .chunks_exact(n) .map(super::super::convert_days_ms) @@ -149,18 +143,18 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( validity, )?) }, - (PhysicalType::Int32, Decimal(_, _)) => PageDecoder::new( + (PhysicalType::Int32, Decimal(_, _)) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_into(), )? - .collect_n(num_rows)?, - (PhysicalType::Int64, Decimal(_, _)) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int64, Decimal(_, _)) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_into(), )? - .collect_n(num_rows)?, + .collect_n(filter)?), (PhysicalType::FixedLenByteArray(n), Decimal(_, _)) if *n > 16 => { polars_bail!(ComputeError: "not implemented: can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}" @@ -176,12 +170,9 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( ArrowDataType::FixedSizeBinary(n), fixed_size_binary::BinaryDecoder { size: n }, )? - .collect_n(num_rows)?; + .collect_n(filter)?; let values = array - .as_any() - .downcast_ref::() - .unwrap() .values() .chunks_exact(n) .map(|value: &[u8]| super::super::convert_i128(value, n)) @@ -194,18 +185,18 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( validity, )?) }, - (PhysicalType::Int32, Decimal256(_, _)) => PageDecoder::new( + (PhysicalType::Int32, Decimal256(_, _)) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::closure(|x: i32| i256(I256::new(x as i128))), )? - .collect_n(num_rows)?, - (PhysicalType::Int64, Decimal256(_, _)) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int64, Decimal256(_, _)) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::closure(|x: i64| i256(I256::new(x as i128))), )? - .collect_n(num_rows)?, + .collect_n(filter)?), (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => { // @TODO: Make a separate decoder for this @@ -216,12 +207,9 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( ArrowDataType::FixedSizeBinary(n), fixed_size_binary::BinaryDecoder { size: n }, )? - .collect_n(num_rows)?; + .collect_n(filter)?; let values = array - .as_any() - .downcast_ref::() - .unwrap() .values() .chunks_exact(n) .map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n)))) @@ -244,12 +232,9 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( ArrowDataType::FixedSizeBinary(n), fixed_size_binary::BinaryDecoder { size: n }, )? - .collect_n(num_rows)?; + .collect_n(filter)?; let values = array - .as_any() - .downcast_ref::() - .unwrap() .values() .chunks_exact(n) .map(super::super::convert_i256) @@ -267,54 +252,54 @@ pub fn page_iter_to_array<'a, I: CompressedPagesIter + 'a>( "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}" ) }, - (PhysicalType::Int32, Date64) => PageDecoder::new( + (PhysicalType::Int32, Date64) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::closure(|x: i32| i64::from(x) * 86400000), )? - .collect_n(num_rows)?, - (PhysicalType::Int64, Date64) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int64, Date64) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::unit(), )? - .collect_n(num_rows)?, - (PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::unit(), )? - .collect_n(num_rows)?, - (PhysicalType::Int64, UInt64) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Int64, UInt64) => Box::new(PageDecoder::new( pages, data_type, primitive::IntDecoder::::cast_as(), )? - .collect_n(num_rows)?, - (PhysicalType::Float, Float32) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Float, Float32) => Box::new(PageDecoder::new( pages, data_type, primitive::PrimitiveDecoder::::unit(), )? - .collect_n(num_rows)?, - (PhysicalType::Double, Float64) => PageDecoder::new( + .collect_n(filter)?), + (PhysicalType::Double, Float64) => Box::new(PageDecoder::new( pages, data_type, primitive::PrimitiveDecoder::::unit(), )? - .collect_n(num_rows)?, + .collect_n(filter)?), // Don't compile this code with `i32` as we don't use this in polars (PhysicalType::ByteArray, LargeBinary | LargeUtf8) => { PageDecoder::new(pages, data_type, binary::BinaryDecoder::::default())? - .collect_n(num_rows)? + .collect_n(filter)? }, (PhysicalType::ByteArray, BinaryView | Utf8View) => { PageDecoder::new(pages, data_type, binview::BinViewDecoder::default())? - .collect_n(num_rows)? + .collect_n(filter)? }, (_, Dictionary(key_type, _, _)) => { return match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, physical_type, logical_type, data_type, num_rows).map(|v| Box::new(v) as Box<_>) + dict_read::<$K>(pages, physical_type, logical_type, data_type, filter).map(|v| Box::new(v) as Box<_>) }).map_err(Into::into) }, (from, to) => { @@ -394,40 +379,48 @@ pub fn int96_to_i64_s(value: [u32; 3]) -> i64 { day_seconds + seconds } -fn timestamp( - pages: BasicDecompressor, +fn timestamp( + pages: BasicDecompressor, physical_type: &PhysicalType, logical_type: &Option, data_type: ArrowDataType, - num_rows: usize, + filter: Option, time_unit: TimeUnit, ) -> PolarsResult> { if physical_type == &PhysicalType::Int96 { return match time_unit { - TimeUnit::Nanosecond => Ok(PageDecoder::new( - pages, - data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)), - )? - .collect_n(num_rows)?), - TimeUnit::Microsecond => Ok(PageDecoder::new( - pages, - data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)), - )? - .collect_n(num_rows)?), - TimeUnit::Millisecond => Ok(PageDecoder::new( - pages, - data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)), - )? - .collect_n(num_rows)?), - TimeUnit::Second => Ok(PageDecoder::new( - pages, - data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)), - )? - .collect_n(num_rows)?), + TimeUnit::Nanosecond => Ok(Box::new( + PageDecoder::new( + pages, + data_type, + primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)), + )? + .collect_n(filter)?, + )), + TimeUnit::Microsecond => Ok(Box::new( + PageDecoder::new( + pages, + data_type, + primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)), + )? + .collect_n(filter)?, + )), + TimeUnit::Millisecond => Ok(Box::new( + PageDecoder::new( + pages, + data_type, + primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)), + )? + .collect_n(filter)?, + )), + TimeUnit::Second => Ok(Box::new( + PageDecoder::new( + pages, + data_type, + primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)), + )? + .collect_n(filter)?, + )), }; }; @@ -439,29 +432,35 @@ fn timestamp( let (factor, is_multiplier) = unify_timestamp_unit(logical_type, time_unit); Ok(match (factor, is_multiplier) { - (1, _) => PageDecoder::new(pages, data_type, primitive::IntDecoder::::unit())? - .collect_n(num_rows)?, - (a, true) => PageDecoder::new( - pages, - data_type, - primitive::IntDecoder::closure(|x: i64| x * a), - )? - .collect_n(num_rows)?, - (a, false) => PageDecoder::new( - pages, - data_type, - primitive::IntDecoder::closure(|x: i64| x / a), - )? - .collect_n(num_rows)?, + (1, _) => Box::new( + PageDecoder::new(pages, data_type, primitive::IntDecoder::::unit())? + .collect_n(filter)?, + ), + (a, true) => Box::new( + PageDecoder::new( + pages, + data_type, + primitive::IntDecoder::closure(|x: i64| x * a), + )? + .collect_n(filter)?, + ), + (a, false) => Box::new( + PageDecoder::new( + pages, + data_type, + primitive::IntDecoder::closure(|x: i64| x / a), + )? + .collect_n(filter)?, + ), }) } -fn timestamp_dict( - pages: BasicDecompressor, +fn timestamp_dict( + pages: BasicDecompressor, physical_type: &PhysicalType, logical_type: &Option, data_type: ArrowDataType, - num_rows: usize, + filter: Option, time_unit: TimeUnit, ) -> ParquetResult> { if physical_type == &PhysicalType::Int96 { @@ -471,44 +470,52 @@ fn timestamp_dict( }; let (factor, is_multiplier) = unify_timestamp_unit(&Some(logical_type), time_unit); return match (factor, is_multiplier) { - (a, true) => PageDictArrayDecoder::<_, K, _>::new( + (a, true) => PageDecoder::new( pages, ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x) * a), + dictionary::DictionaryDecoder::::new(primitive::PrimitiveDecoder::closure( + |x: [u32; 3]| int96_to_i64_ns(x) * a, + )), )? - .collect_n(num_rows), - (a, false) => PageDictArrayDecoder::<_, K, _>::new( + .collect_n(filter), + (a, false) => PageDecoder::new( pages, ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x) / a), + dictionary::DictionaryDecoder::::new(primitive::PrimitiveDecoder::closure( + |x: [u32; 3]| int96_to_i64_ns(x) / a, + )), )? - .collect_n(num_rows), + .collect_n(filter), }; }; let (factor, is_multiplier) = unify_timestamp_unit(logical_type, time_unit); match (factor, is_multiplier) { - (a, true) => PageDictArrayDecoder::<_, K, _>::new( + (a, true) => PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::closure(|x: i64| x * a), + dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::closure(|x: i64| { + x * a + })), )? - .collect_n(num_rows), - (a, false) => PageDictArrayDecoder::<_, K, _>::new( + .collect_n(filter), + (a, false) => PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::closure(|x: i64| x / a), + dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::closure(|x: i64| { + x / a + })), )? - .collect_n(num_rows), + .collect_n(filter), } } -fn dict_read( - iter: BasicDecompressor, +fn dict_read( + iter: BasicDecompressor, physical_type: &PhysicalType, logical_type: &Option, data_type: ArrowDataType, - num_rows: usize, + filter: Option, ) -> ParquetResult> { use ArrowDataType::*; let values_data_type = if let Dictionary(_, v, _) = &data_type { @@ -517,106 +524,137 @@ fn dict_read( panic!() }; - Ok(match (physical_type, values_data_type.to_logical_type()) { - (PhysicalType::Int32, UInt8) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - )? - .collect_n(num_rows)?, - (PhysicalType::Int32, UInt16) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - )? - .collect_n(num_rows)?, - (PhysicalType::Int32, UInt32) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - )? - .collect_n(num_rows)?, - (PhysicalType::Int64, UInt64) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - )? - .collect_n(num_rows)?, - (PhysicalType::Int32, Int8) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - )? - .collect_n(num_rows)?, - (PhysicalType::Int32, Int16) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::cast_as(), - )? - .collect_n(num_rows)?, - (PhysicalType::Int32, Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth)) => { - PageDictArrayDecoder::<_, K, _>::new( + Ok( + match (physical_type, values_data_type.to_logical_type()) { + (PhysicalType::Int32, UInt8) => PageDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::unit(), + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), )? - .collect_n(num_rows)? - }, - - (PhysicalType::Int64, Timestamp(time_unit, _)) => { - let time_unit = *time_unit; - return timestamp_dict::( + .collect_n(filter)?, + (PhysicalType::Int32, UInt16) => PageDecoder::new( iter, - physical_type, - logical_type, data_type, - num_rows, - time_unit, - ); - }, - - (PhysicalType::Int64, Int64 | Date64 | Time64(_) | Duration(_)) => { - PageDictArrayDecoder::<_, K, _>::new( + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), + )? + .collect_n(filter)?, + (PhysicalType::Int32, UInt32) => PageDecoder::new( iter, data_type, - primitive::PrimitiveDecoder::::unit(), + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), )? - .collect_n(num_rows)? - }, - (PhysicalType::Float, Float32) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::unit(), - )? - .collect_n(num_rows)?, - (PhysicalType::Double, Float64) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - primitive::PrimitiveDecoder::::unit(), - )? - .collect_n(num_rows)?, - (PhysicalType::ByteArray, LargeUtf8 | LargeBinary) => PageDictArrayDecoder::<_, K, _>::new( - iter, - data_type, - binary::BinaryDecoder::::default(), - )? - .collect_n(num_rows)?, - (PhysicalType::ByteArray, Utf8View | BinaryView) => { - PageDictArrayDecoder::<_, K, _>::new(iter, data_type, BinViewDecoder::default())? - .collect_n(num_rows)? - }, - (PhysicalType::FixedLenByteArray(size), FixedSizeBinary(_)) => { - PageDictArrayDecoder::<_, K, _>::new( + .collect_n(filter)?, + (PhysicalType::Int64, UInt64) => PageDecoder::new( iter, data_type, - fixed_size_binary::BinaryDecoder { size: *size }, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), )? - .collect_n(num_rows)? - }, - other => { - return Err(ParquetError::FeatureNotSupported(format!( - "Reading dictionaries of type {other:?}" - ))); + .collect_n(filter)?, + (PhysicalType::Int32, Int8) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), + )? + .collect_n(filter)?, + (PhysicalType::Int32, Int16) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::cast_as(), + ), + )? + .collect_n(filter)?, + ( + PhysicalType::Int32, + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth), + ) => { + PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::unit(), + ), + )? + .collect_n(filter)? + }, + + (PhysicalType::Int64, Timestamp(time_unit, _)) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, + physical_type, + logical_type, + data_type, + filter, + time_unit, + ); + }, + + (PhysicalType::Int64, Int64 | Date64 | Time64(_) | Duration(_)) => { + PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::unit(), + ), + )? + .collect_n(filter)? + }, + (PhysicalType::Float, Float32) => { + PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::unit(), + ), + )? + .collect_n(filter)? + }, + (PhysicalType::Double, Float64) => { + PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new( + primitive::PrimitiveDecoder::::unit(), + ), + )? + .collect_n(filter)? + }, + (PhysicalType::ByteArray, LargeUtf8 | LargeBinary) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(binary::BinaryDecoder::::default()), + )? + .collect_n(filter)?, + (PhysicalType::ByteArray, Utf8View | BinaryView) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(BinViewDecoder::default()), + )? + .collect_n(filter)?, + (PhysicalType::FixedLenByteArray(size), FixedSizeBinary(_)) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(fixed_size_binary::BinaryDecoder { + size: *size, + }), + )? + .collect_n(filter)?, + other => { + return Err(ParquetError::FeatureNotSupported(format!( + "Reading dictionaries of type {other:?}" + ))); + }, }, - }) + ) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs index 395ece91df57..b7a9c6645701 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs @@ -1,24 +1,72 @@ -use crate::parquet::indexes::Interval; -use crate::parquet::page::DataPage; +use std::ops::Range; -#[derive(Debug, Clone, Copy)] -pub(crate) struct Filter<'a> { - pub(super) selected_rows: &'a [Interval], +use arrow::array::Splitable; +use arrow::bitmap::Bitmap; - /// Index of the [`Interval`] that is >= `current_index` - pub(super) current_interval: usize, - /// Global offset - pub(super) current_index: usize, +#[derive(Debug, Clone)] +pub enum Filter { + Range(Range), + Mask(Bitmap), } -impl<'a> Filter<'a> { - pub fn new(page: &'a DataPage) -> Option { - let selected_rows = page.selected_rows()?; +impl Filter { + pub fn new_limited(x: usize) -> Self { + Filter::Range(0..x) + } + + pub fn new_ranged(start: usize, end: usize) -> Self { + Filter::Range(start..end) + } + + pub fn new_masked(mask: Bitmap) -> Self { + Filter::Mask(mask) + } + + pub(crate) fn num_rows(&self) -> usize { + match self { + Filter::Range(range) => range.len(), + Filter::Mask(bitmap) => bitmap.set_bits(), + } + } + + pub(crate) fn split_at(&self, at: usize) -> (Filter, Filter) { + use Filter as F; + match self { + F::Range(range) => { + let start = range.start; + let end = range.end; + + if at <= start { + (F::Range(0..0), F::Range(start - at..end - at)) + } else if at > end { + (F::Range(start..end), F::Range(0..0)) + } else { + (F::Range(start..at), F::Range(0..end - at)) + } + }, + F::Mask(bitmap) => { + let (lhs, rhs) = bitmap.split_at(at); + (F::Mask(lhs), F::Mask(rhs)) + }, + } + } + + pub(crate) fn opt_split_at( + filter: &Option, + at: usize, + ) -> (Option, Option) { + let Some(filter) = filter else { + return (None, None); + }; + + let (lhs, rhs) = filter.split_at(at); + (Some(lhs), Some(rhs)) + } - Some(Self { - selected_rows, - current_interval: 0, - current_index: 0, - }) + pub(crate) fn opt_num_rows(filter: &Option, total_num_rows: usize) -> usize { + match filter { + Some(filter) => usize::min(filter.num_rows(), total_num_rows), + None => total_num_rows, + } } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index 883db729404c..6f8e77473d58 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -2,9 +2,9 @@ pub(crate) mod array_chunks; pub(crate) mod filter; use arrow::array::{ - Array, BinaryArray, DictionaryArray, DictionaryKey, MutableBinaryViewArray, View, + BinaryArray, DictionaryArray, DictionaryKey, MutableBinaryViewArray, PrimitiveArray, View, }; -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use arrow::pushable::Pushable; use arrow::types::Offset; @@ -12,21 +12,19 @@ use polars_error::{polars_err, PolarsError, PolarsResult}; use self::filter::Filter; use super::binary::utils::Binary; -use super::{BasicDecompressor, CompressedPagesIter, ParquetError}; +use super::BasicDecompressor; use crate::parquet::encoding::hybrid_rle::gatherer::{ HybridRleGatherer, ZeroCount, ZeroCountGatherer, }; use crate::parquet::encoding::hybrid_rle::{self, HybridRleDecoder, Translator}; use crate::parquet::error::ParquetResult; -use crate::parquet::page::{split_buffer, DataPage, DictPage, Page}; +use crate::parquet::page::{split_buffer, DataPage, DictPage}; use crate::parquet::schema::Repetition; -use crate::read::deserialize::dictionary::DictionaryDecoder; #[derive(Debug)] pub(crate) struct State<'a, D: Decoder> { pub(crate) page_validity: Option>, pub(crate) translation: D::Translation<'a>, - pub(crate) filter: Option>, } pub(crate) trait StateTranslation<'a, D: Decoder>: Sized { @@ -37,7 +35,6 @@ pub(crate) trait StateTranslation<'a, D: Decoder>: Sized { page: &'a DataPage, dict: Option<&'a D::Dict>, page_validity: Option<&PageValidity<'a>>, - filter: Option<&Filter<'a>>, ) -> PolarsResult; fn len_when_not_nullable(&self) -> usize; fn skip_in_place(&mut self, n: usize) -> ParquetResult<()>; @@ -57,20 +54,16 @@ impl<'a, D: Decoder> State<'a, D> { pub fn new(decoder: &D, page: &'a DataPage, dict: Option<&'a D::Dict>) -> PolarsResult { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - let is_filtered = page.selected_rows().is_some(); let page_validity = is_optional .then(|| page_validity_decoder(page)) .transpose()?; - let filter = is_filtered.then(|| Filter::new(page)).flatten(); - let translation = - D::Translation::new(decoder, page, dict, page_validity.as_ref(), filter.as_ref())?; + let translation = D::Translation::new(decoder, page, dict, page_validity.as_ref())?; Ok(Self { page_validity, translation, - filter, }) } @@ -79,12 +72,11 @@ impl<'a, D: Decoder> State<'a, D> { page: &'a DataPage, dict: Option<&'a D::Dict>, ) -> PolarsResult { - let translation = D::Translation::new(decoder, page, dict, None, None)?; + let translation = D::Translation::new(decoder, page, dict, None)?; Ok(Self { translation, page_validity: None, - filter: None, }) } @@ -116,73 +108,63 @@ impl<'a, D: Decoder> State<'a, D> { &mut self, decoder: &mut D, decoded: &mut D::DecodedState, - additional: usize, + filter: Option, ) -> ParquetResult<()> { - // @TODO: Taking the filter here is a bit unfortunate. Since each error leaves the filter - // empty. - let filter = self.filter.take(); - match filter { - None => self.translation.extend_from_state( - decoder, - decoded, - &mut self.page_validity, - additional, - ), - Some(mut filter) => { - let mut n = additional; - while n > 0 && self.len() > 0 { - let prev_n = n; - let prev_state_len = self.len(); - - // Skip over all intervals that we have already passed or that are length == 0. - while filter - .selected_rows - .get(filter.current_interval) - .is_some_and(|iv| { - iv.length == 0 || iv.start + iv.length <= filter.current_index - }) - { - filter.current_interval += 1; - } - - let Some(iv) = filter.selected_rows.get(filter.current_interval) else { - self.skip_in_place(self.len())?; - self.filter = Some(filter); - return Ok(()); - }; - - // Move to at least the start of the interval - if filter.current_index < iv.start { - self.skip_in_place(iv.start - filter.current_index)?; - filter.current_index = iv.start; - } - - let n_this_round = usize::min(iv.start + iv.length - filter.current_index, n); + None => { + let num_rows = self.len(); + self.translation.extend_from_state( + decoder, + decoded, + &mut self.page_validity, + num_rows, + ) + }, + Some(filter) => match filter { + Filter::Range(range) => { + let start = range.start; + let end = range.end; + self.skip_in_place(start)?; + debug_assert!(end - start <= self.len()); self.translation.extend_from_state( decoder, decoded, &mut self.page_validity, - n_this_round, + end - start, )?; - - let iv = &filter.selected_rows[filter.current_interval]; - filter.current_index += n_this_round; - if filter.current_index >= iv.start + iv.length { - filter.current_interval += 1; + Ok(()) + }, + Filter::Mask(bitmap) => { + debug_assert!(bitmap.len() == self.len()); + + let mut iter = bitmap.iter(); + while iter.num_remaining() > 0 && self.len() > 0 { + let prev_state_len = self.len(); + + let num_ones = iter.take_leading_ones(); + self.translation.extend_from_state( + decoder, + decoded, + &mut self.page_validity, + num_ones, + )?; + + if self.len() == 0 { + break; + } + + let num_zeros = iter.take_leading_zeros(); + self.skip_in_place(num_zeros)?; + + assert!( + prev_state_len != self.len(), + "No forward progress was booked in a filtered parquet file." + ); } - n -= n_this_round; - - assert!( - prev_n != n || prev_state_len != self.len(), - "No forward progress was booked in a filtered parquet file." - ); - } - - self.filter = Some(filter); - Ok(()) + Ok(()) + }, }, } } @@ -570,6 +552,8 @@ pub(super) trait Decoder: Sized { /// The target state that this Decoder decodes into. type DecodedState: ExactSize; + type Output; + /// Initializes a new [`Self::DecodedState`]. fn with_capacity(&self, capacity: usize) -> Self::DecodedState; @@ -595,16 +579,9 @@ pub(super) trait Decoder: Sized { fn finalize( &self, data_type: ArrowDataType, + dict: Option, decoded: Self::DecodedState, - ) -> ParquetResult>; - - /// Turn the collected arrays into the final dictionary array. - fn finalize_dict_array( - &self, - data_type: ArrowDataType, - dict: Self::Dict, - decoded: (Vec, Option), - ) -> ParquetResult>; + ) -> ParquetResult; } pub(crate) trait NestedDecoder: Decoder { @@ -622,7 +599,7 @@ pub(crate) trait NestedDecoder: Decoder { decoded: &mut Self::DecodedState, n: usize, ) -> ParquetResult<()> { - state.extend_from_state(self, decoded, n)?; + state.extend_from_state(self, decoded, Some(Filter::new_limited(n)))?; Self::validity_extend(state, decoded, true, n); Ok(()) @@ -639,16 +616,25 @@ pub(crate) trait NestedDecoder: Decoder { } } -pub struct PageDecoder { - pub iter: BasicDecompressor, +pub trait DictDecodable: Decoder { + fn finalize_dict_array( + &self, + data_type: ArrowDataType, + dict: Self::Dict, + keys: PrimitiveArray, + ) -> ParquetResult>; +} + +pub struct PageDecoder { + pub iter: BasicDecompressor, pub data_type: ArrowDataType, pub dict: Option, pub decoder: D, } -impl PageDecoder { +impl PageDecoder { pub fn new( - mut iter: BasicDecompressor, + mut iter: BasicDecompressor, data_type: ArrowDataType, decoder: D, ) -> ParquetResult { @@ -663,130 +649,36 @@ impl PageDecoder { }) } - pub fn collect_n(mut self, limit: usize) -> ParquetResult> { - let mut target = self.decoder.with_capacity(limit); - self.collect_n_into(&mut target, limit)?; - self.decoder.finalize(self.data_type, target) - } - - pub fn collect_n_into( - &mut self, - target: &mut D::DecodedState, - mut limit: usize, - ) -> ParquetResult { - use streaming_decompression::FallibleStreamingIterator; + pub fn collect_n(mut self, mut filter: Option) -> ParquetResult { + let mut num_rows_remaining = Filter::opt_num_rows(&filter, self.iter.total_num_values()); - if limit == 0 { - return Ok(0); - } + let mut target = self.decoder.with_capacity(num_rows_remaining); - let start_limit = limit; - - while limit > 0 { - let Some(page) = self.iter.next()? else { - return Ok(start_limit - limit); + while num_rows_remaining > 0 { + let Some(page) = self.iter.next() else { + return self.decoder.finalize(self.data_type, self.dict, target); }; + let page = page?; - let Page::Data(page) = page else { - // @TODO This should be removed - unreachable!(); - }; + let mut state = State::new(&self.decoder, &page, self.dict.as_ref())?; + let state_len = state.len(); - let mut state = State::new(&self.decoder, page, self.dict.as_ref())?; - let start_length = target.len(); - state.extend_from_state(&mut self.decoder, target, limit)?; - let end_length = target.len(); + let state_filter; + (state_filter, filter) = Filter::opt_split_at(&filter, state_len); - limit -= end_length - start_length; - - debug_assert!(state.len() == 0 || limit == 0); - } - - Ok(start_limit - limit) - } -} - -pub struct PageDictArrayDecoder { - pub iter: BasicDecompressor, - pub data_type: ArrowDataType, - pub dict: D::Dict, - pub decoder: D, - _pd: std::marker::PhantomData, -} - -impl PageDictArrayDecoder { - pub fn new( - mut iter: BasicDecompressor, - data_type: ArrowDataType, - decoder: D, - ) -> ParquetResult { - let dict_page = iter - .read_dict_page()? - .ok_or(ParquetError::FeatureNotSupported( - "Dictionary array without a dictionary page".to_string(), - ))?; - let dict = decoder.deserialize_dict(dict_page); - - Ok(Self { - iter, - data_type, - dict, - decoder, - _pd: std::marker::PhantomData, - }) - } - - pub fn collect_n(mut self, limit: usize) -> ParquetResult> { - let mut target = ( - Vec::with_capacity(limit), - MutableBitmap::with_capacity(limit), - ); - self.collect_n_into(&mut target, limit)?; - let (values, validity) = target; - let validity = if !validity.is_empty() { - Some(validity.freeze()) - } else { - None - }; - self.decoder - .finalize_dict_array(self.data_type, self.dict, (values, validity)) - } - - pub fn collect_n_into( - &mut self, - target: &mut (Vec, MutableBitmap), - mut limit: usize, - ) -> ParquetResult { - use streaming_decompression::FallibleStreamingIterator; - - if limit == 0 { - return Ok(0); - } - - let start_limit = limit; - - while limit > 0 { - let Some(page) = self.iter.next()? else { - return Ok(start_limit - limit); - }; - - let Page::Data(page) = page else { - // @TODO This should be removed - unreachable!(); - }; - - let mut dictionary_decoder = DictionaryDecoder::new(self.dict.len()); - let mut state = State::new(&dictionary_decoder, page, Some(&()))?; let start_length = target.len(); - state.extend_from_state(&mut dictionary_decoder, target, limit)?; + state.extend_from_state(&mut self.decoder, &mut target, state_filter)?; let end_length = target.len(); - limit -= end_length - start_length; + num_rows_remaining -= end_length - start_length; + + debug_assert!(state.len() == 0 || num_rows_remaining == 0); - debug_assert!(state.len() == 0 || limit == 0); + drop(state); + self.iter.reuse_page_buffer(page); } - Ok(start_limit - limit) + self.decoder.finalize(self.data_type, self.dict, target) } } diff --git a/crates/polars-parquet/src/arrow/read/file.rs b/crates/polars-parquet/src/arrow/read/file.rs index 49cdba63d3dc..a390022331be 100644 --- a/crates/polars-parquet/src/arrow/read/file.rs +++ b/crates/polars-parquet/src/arrow/read/file.rs @@ -5,9 +5,9 @@ use arrow::datatypes::ArrowSchema; use arrow::record_batch::RecordBatchT; use polars_error::PolarsResult; +use super::deserialize::Filter; use super::{RowGroupDeserializer, RowGroupMetaData}; use crate::arrow::read::read_columns_many; -use crate::parquet::indexes::FilteredPage; /// An iterator of [`RecordBatchT`]s coming from row groups of a parquet file. /// @@ -29,9 +29,8 @@ impl FileReader { row_groups: Vec, schema: ArrowSchema, limit: Option, - page_indexes: Option>>>>, ) -> Self { - let row_groups = RowGroupReader::new(reader, schema, row_groups, limit, page_indexes); + let row_groups = RowGroupReader::new(reader, schema, row_groups, limit); Self { row_groups, @@ -113,7 +112,6 @@ pub struct RowGroupReader { schema: ArrowSchema, row_groups: std::vec::IntoIter, remaining_rows: usize, - page_indexes: Option>>>>, } impl RowGroupReader { @@ -123,17 +121,12 @@ impl RowGroupReader { schema: ArrowSchema, row_groups: Vec, limit: Option, - page_indexes: Option>>>>, ) -> Self { - if let Some(pages) = &page_indexes { - assert_eq!(pages.len(), row_groups.len()) - } Self { reader, schema, row_groups: row_groups.into_iter(), remaining_rows: limit.unwrap_or(usize::MAX), - page_indexes: page_indexes.map(|pages| pages.into_iter()), } } @@ -153,31 +146,13 @@ impl RowGroupReader { return Ok(None); }; - let pages = self.page_indexes.as_mut().and_then(|iter| iter.next()); - - // the number of rows depends on whether indexes are selected or not. - let num_rows = pages - .as_ref() - .map(|x| { - // first field, first column within that field - x[0][0] - .iter() - .map(|page| { - page.selected_rows - .iter() - .map(|interval| interval.length) - .sum::() - }) - .sum() - }) - .unwrap_or_else(|| row_group.num_rows()); + let num_rows = row_group.num_rows(); let column_chunks = read_columns_many( &mut self.reader, &row_group, self.schema.fields.clone(), - Some(self.remaining_rows), - pages, + Some(Filter::new_limited(self.remaining_rows)), )?; let result = RowGroupDeserializer::new(column_chunks, num_rows, Some(self.remaining_rows)); diff --git a/crates/polars-parquet/src/arrow/read/mod.rs b/crates/polars-parquet/src/arrow/read/mod.rs index 31240af2332d..fff6987f4f1a 100644 --- a/crates/polars-parquet/src/arrow/read/mod.rs +++ b/crates/polars-parquet/src/arrow/read/mod.rs @@ -14,7 +14,7 @@ use arrow::array::Array; use arrow::types::{i256, NativeType}; pub use deserialize::{ column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, - InitNested, NestedArrayIter, NestedState, + Filter, InitNested, NestedState, }; pub use file::{FileReader, RowGroupReader}; #[cfg(feature = "async")] @@ -23,7 +23,6 @@ use polars_error::PolarsResult; pub use row_group::*; pub use schema::{infer_schema, FileMetaData}; -use super::write::CompressedPage; use crate::parquet::error::ParquetResult; #[cfg(feature = "async")] pub use crate::parquet::read::{get_page_stream, read_metadata_async as _read_metadata_async}; @@ -46,13 +45,6 @@ pub use crate::parquet::{ FallibleStreamingIterator, }; -pub trait CompressedPagesIter: - Iterator> + Send + Sync -{ -} - -impl> + Send + Sync> CompressedPagesIter for I {} - /// Type def for a sharable, boxed dyn [`Iterator`] of arrays pub type ArrayIter<'a> = Box>> + Send + Sync + 'a>; diff --git a/crates/polars-parquet/src/arrow/read/row_group.rs b/crates/polars-parquet/src/arrow/read/row_group.rs index f2104403baae..0156569b4cd9 100644 --- a/crates/polars-parquet/src/arrow/read/row_group.rs +++ b/crates/polars-parquet/src/arrow/read/row_group.rs @@ -8,9 +8,9 @@ use polars_utils::mmap::MemReader; use super::{ArrayIter, RowGroupMetaData}; use crate::arrow::read::column_iter_to_arrays; -use crate::parquet::indexes::FilteredPage; +use crate::arrow::read::deserialize::Filter; use crate::parquet::metadata::ColumnChunkMetaData; -use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader}; +use crate::parquet::read::{BasicDecompressor, PageReader}; /// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into /// an iterator of [`RecordBatchT`]. @@ -132,75 +132,32 @@ where Ok((meta, chunk)) } -type Pages = Box< - dyn Iterator< - Item = std::result::Result< - crate::parquet::page::CompressedPage, - crate::parquet::error::ParquetError, - >, - > + Sync - + Send, ->; - /// Converts a vector of columns associated with the parquet field whose name is [`Field`] /// to an iterator of [`Array`], [`ArrayIter`] of chunk size `chunk_size`. pub fn to_deserializer<'a>( columns: Vec<(&ColumnChunkMetaData, Vec)>, field: Field, - num_rows: usize, - pages: Option>>, + filter: Option, ) -> PolarsResult> { - let (columns, types) = if let Some(pages) = pages { - let (columns, types): (Vec<_>, Vec<_>) = columns - .into_iter() - .zip(pages) - .map(|((column_meta, chunk), mut pages)| { - // de-offset the start, since we read in chunks (and offset is from start of file) - let mut meta: PageMetaData = column_meta.into(); - pages - .iter_mut() - .for_each(|page| page.start -= meta.column_start); - meta.column_start = 0; - let pages = IndexedPageReader::new_with_page_meta( - MemReader::from_vec(chunk), - meta, - pages, - vec![], - vec![], - ); - let pages = Box::new(pages) as Pages; - ( - BasicDecompressor::new(pages, vec![]), - &column_meta.descriptor().descriptor.primitive_type, - ) - }) - .unzip(); - - (columns, types) - } else { - let (columns, types): (Vec<_>, Vec<_>) = columns - .into_iter() - .map(|(column_meta, chunk)| { - let len = chunk.len(); - let pages = PageReader::new( - MemReader::from_vec(chunk), - column_meta, - std::sync::Arc::new(|_, _| true), - vec![], - len * 2 + 1024, - ); - let pages = Box::new(pages) as Pages; - ( - BasicDecompressor::new(pages, vec![]), - &column_meta.descriptor().descriptor.primitive_type, - ) - }) - .unzip(); - - (columns, types) - }; - - column_iter_to_arrays(columns, types, field, num_rows) + let (columns, types): (Vec<_>, Vec<_>) = columns + .into_iter() + .map(|(column_meta, chunk)| { + let len = chunk.len(); + let pages = PageReader::new( + MemReader::from_vec(chunk), + column_meta, + std::sync::Arc::new(|_, _| true), + vec![], + len * 2 + 1024, + ); + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + column_iter_to_arrays(columns, types, field, filter) } /// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top @@ -217,12 +174,8 @@ pub fn read_columns_many<'a, R: Read + Seek>( reader: &mut R, row_group: &RowGroupMetaData, fields: Vec, - limit: Option, - pages: Option>>>, + filter: Option, ) -> PolarsResult>> { - let num_rows = row_group.num_rows(); - let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows); - // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group let field_columns = fields @@ -230,18 +183,9 @@ pub fn read_columns_many<'a, R: Read + Seek>( .map(|field| read_columns(reader, row_group.columns(), &field.name)) .collect::>>()?; - if let Some(pages) = pages { - field_columns - .into_iter() - .zip(fields) - .zip(pages) - .map(|((columns, field), pages)| to_deserializer(columns, field, num_rows, Some(pages))) - .collect() - } else { - field_columns - .into_iter() - .zip(fields) - .map(|(columns, field)| to_deserializer(columns, field, num_rows, None)) - .collect() - } + field_columns + .into_iter() + .zip(fields) + .map(|(columns, field)| to_deserializer(columns, field, filter.clone())) + .collect() } diff --git a/crates/polars-parquet/src/parquet/read/column/mod.rs b/crates/polars-parquet/src/parquet/read/column/mod.rs index 5ad518a3d6e5..2cd15c4f61e6 100644 --- a/crates/polars-parquet/src/parquet/read/column/mod.rs +++ b/crates/polars-parquet/src/parquet/read/column/mod.rs @@ -2,7 +2,7 @@ use std::io::{Read, Seek}; use std::vec::IntoIter; use super::{get_field_columns, get_page_iterator, MemReader, PageFilter, PageReader}; -use crate::parquet::error::ParquetError; +use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use crate::parquet::page::CompressedPage; use crate::parquet::schema::types::ParquetType; @@ -23,14 +23,13 @@ pub fn get_column_iterator( row_group: &RowGroupMetaData, field_name: &str, page_filter: Option, - scratch: Vec, max_page_size: usize, ) -> ColumnIterator { let columns = get_field_columns(row_group.columns(), field_name) .cloned() .collect::>(); - ColumnIterator::new(reader, columns, page_filter, scratch, max_page_size) + ColumnIterator::new(reader, columns, page_filter, max_page_size) } /// State of [`MutStreamingIterator`]. @@ -54,11 +53,9 @@ pub trait MutStreamingIterator: Sized { /// A [`MutStreamingIterator`] that reads column chunks one by one, /// returning a [`PageReader`] per column. pub struct ColumnIterator { - reader: Option, + reader: MemReader, columns: Vec, page_filter: Option, - current: Option<(PageReader, ColumnChunkMetaData)>, - scratch: Vec, max_page_size: usize, } @@ -69,56 +66,38 @@ impl ColumnIterator { reader: MemReader, mut columns: Vec, page_filter: Option, - scratch: Vec, max_page_size: usize, ) -> Self { columns.reverse(); Self { - reader: Some(reader), - scratch, + reader, columns, page_filter, - current: None, max_page_size, } } } -impl MutStreamingIterator for ColumnIterator { - type Item = (PageReader, ColumnChunkMetaData); - type Error = ParquetError; +impl Iterator for ColumnIterator { + type Item = ParquetResult<(PageReader, ColumnChunkMetaData)>; - fn advance(mut self) -> Result, ParquetError> { - let (reader, scratch) = if let Some((iter, _)) = self.current { - iter.into_inner() - } else { - (self.reader.unwrap(), self.scratch) - }; + fn next(&mut self) -> Option { if self.columns.is_empty() { - return Ok(State::Finished(scratch)); + return None; }; let column = self.columns.pop().unwrap(); - let iter = get_page_iterator( + let iter = match get_page_iterator( &column, - reader, + self.reader.clone(), self.page_filter.clone(), - scratch, + Vec::new(), self.max_page_size, - )?; - let current = Some((iter, column)); - Ok(State::Some(Self { - reader: None, - columns: self.columns, - page_filter: self.page_filter, - current, - scratch: vec![], - max_page_size: self.max_page_size, - })) - } - - fn get(&mut self) -> Option<&mut Self::Item> { - self.current.as_mut() + ) { + Err(e) => return Some(Err(e)), + Ok(v) => v, + }; + Some(Ok((iter, column))) } } diff --git a/crates/polars-parquet/src/parquet/read/compression.rs b/crates/polars-parquet/src/parquet/read/compression.rs index 7bb9bba4045a..91ff7a519c61 100644 --- a/crates/polars-parquet/src/parquet/read/compression.rs +++ b/crates/polars-parquet/src/parquet/read/compression.rs @@ -1,9 +1,10 @@ use parquet_format_safe::DataPageHeaderV2; +use super::PageReader; use crate::parquet::compression::{self, Compression}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page}; -use crate::parquet::{CowBuffer, FallibleStreamingIterator}; +use crate::parquet::CowBuffer; fn decompress_v1( compressed: &[u8], @@ -150,60 +151,80 @@ impl streaming_decompression::Decompressed for Page { /// This decompressor uses an internal [`Vec`] to perform decompressions which /// is reused across pages, so that a single allocation is required. /// If the pages are not compressed, the internal buffer is not used. -pub struct BasicDecompressor>> { - iter: _Decompressor, - peeked: Option, +pub struct BasicDecompressor { + reader: PageReader, + buffer: Vec, } -impl BasicDecompressor -where - I: Iterator>, -{ - /// Returns a new [`BasicDecompressor`]. - pub fn new(iter: I, buffer: Vec) -> Self { - Self { - iter: _Decompressor::new(iter, buffer, decompress), - peeked: None, - } +impl BasicDecompressor { + /// Create a new [`BasicDecompressor`] + pub fn new(reader: PageReader, buffer: Vec) -> Self { + Self { reader, buffer } + } + + /// The total number of values is given from the `ColumnChunk` metadata. + /// + /// - Nested column: equal to the number of non-null values at the lowest nesting level. + /// - Unnested column: equal to the number of non-null rows. + pub fn total_num_values(&self) -> usize { + self.reader.total_num_values() } /// Returns its internal buffer, consuming itself. pub fn into_inner(self) -> Vec { - self.iter.into_inner() + self.buffer } pub fn read_dict_page(&mut self) -> ParquetResult> { - match self.iter.next()? { - Some(Page::Data(page)) => { - self.peeked = Some(Page::Data(page.clone())); - Ok(None) - }, - Some(Page::Dict(page)) => Ok(Some(page.clone())), + match self.reader.read_dict()? { None => Ok(None), + Some(p) => { + let num_values = p.num_values; + let page = + decompress(CompressedPage::Dict(p), &mut Vec::with_capacity(num_values))?; + + match page { + Page::Dict(d) => Ok(Some(d)), + Page::Data(_) => unreachable!(), + } + }, } } -} -impl FallibleStreamingIterator for BasicDecompressor -where - I: Iterator>, -{ - type Item = Page; - type Error = ParquetError; + pub fn reuse_page_buffer(&mut self, page: DataPage) { + let buffer = match page.buffer { + CowBuffer::Borrowed(_) => return, + CowBuffer::Owned(vec) => vec, + }; - fn advance(&mut self) -> ParquetResult<()> { - if self.peeked.take().is_some() { - return Ok(()); - } + if self.buffer.capacity() > buffer.capacity() { + return; + }; - self.iter.advance() + self.buffer = buffer; } +} - fn get(&self) -> Option<&Self::Item> { - if let Some(peeked) = self.peeked.as_ref() { - return Some(peeked); - } +impl Iterator for BasicDecompressor { + type Item = ParquetResult; + + fn next(&mut self) -> Option { + let page = match self.reader.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(p)) => p, + }; + + Some(decompress(page, &mut self.buffer).map(|p| { + if let Page::Data(p) = p { + p + } else { + panic!("Found compressed page in the middle of the pages") + } + })) + } - self.iter.get() + fn size_hint(&self) -> (usize, Option) { + self.reader.size_hint() } } diff --git a/crates/polars-parquet/src/parquet/read/mod.rs b/crates/polars-parquet/src/parquet/read/mod.rs index 9f7d3aa2af53..cea8561193ef 100644 --- a/crates/polars-parquet/src/parquet/read/mod.rs +++ b/crates/polars-parquet/src/parquet/read/mod.rs @@ -16,7 +16,7 @@ pub use indexes::{read_columns_indexes, read_pages_locations}; pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size}; #[cfg(feature = "async")] pub use page::{get_page_stream, get_page_stream_from_column_start}; -pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader}; +pub use page::{PageFilter, PageIterator, PageMetaData, PageReader}; use polars_utils::mmap::MemReader; #[cfg(feature = "async")] pub use stream::read_metadata as read_metadata_async; diff --git a/crates/polars-parquet/src/parquet/read/page/mod.rs b/crates/polars-parquet/src/parquet/read/page/mod.rs index a8f1396d37d6..98d76493ba50 100644 --- a/crates/polars-parquet/src/parquet/read/page/mod.rs +++ b/crates/polars-parquet/src/parquet/read/page/mod.rs @@ -1,9 +1,7 @@ -mod indexed_reader; mod reader; #[cfg(feature = "async")] mod stream; -pub use indexed_reader::IndexedPageReader; pub use reader::{PageFilter, PageMetaData, PageReader}; use crate::parquet::error::ParquetError; diff --git a/crates/polars-parquet/src/parquet/read/page/reader.rs b/crates/polars-parquet/src/parquet/read/page/reader.rs index 7c8e32f40faf..80b51603f6fd 100644 --- a/crates/polars-parquet/src/parquet/read/page/reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/reader.rs @@ -1,3 +1,4 @@ +use std::io::Seek; use std::sync::{Arc, OnceLock}; use parquet_format_safe::thrift::protocol::TCompactInputProtocol; @@ -128,6 +129,54 @@ impl PageReader { pub fn into_inner(self) -> (MemReader, Vec) { (self.reader, self.scratch) } + + pub fn total_num_values(&self) -> usize { + debug_assert!(self.total_num_values >= 0); + self.total_num_values as usize + } + + pub fn read_dict(&mut self) -> ParquetResult> { + // a dictionary page exists iff the first data page is not at the start of + // the column + let seek_offset = self.reader.position(); + let page_header = read_page_header(&mut self.reader, self.max_page_size)?; + let page_type = page_header.type_.try_into()?; + + if !matches!(page_type, PageType::DictionaryPage) { + self.reader + .seek(std::io::SeekFrom::Start(seek_offset as u64))?; + return Ok(None); + } + + let read_size: usize = page_header.compressed_page_size.try_into()?; + + if read_size > self.max_page_size { + return Err(ParquetError::WouldOverAllocate); + } + + let buffer = self.reader.read_slice(read_size); + + if buffer.len() != read_size { + return Err(ParquetError::oos( + "The page header reported the wrong page size", + )); + } + + finish_page( + page_header, + buffer, + self.compression, + &self.descriptor, + None, + ) + .map(|p| { + if let CompressedPage::Dict(d) = p { + Some(d) + } else { + unreachable!() + } + }) + } } impl PageIterator for PageReader { diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index 06cd8194a818..477b9a1321df 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -1,5 +1,4 @@ mod read; -mod read_indexes; mod write; use std::io::{Cursor, Read, Seek}; @@ -53,7 +52,7 @@ pub fn read_column(mut reader: R, column: &str) -> PolarsResult< let statistics = deserialize(field, row_group)?; - let mut reader = p_read::FileReader::new(reader, metadata.row_groups, schema, None, None); + let mut reader = p_read::FileReader::new(reader, metadata.row_groups, schema, None); let array = reader.next().unwrap()?.into_arrays().pop().unwrap(); @@ -1308,7 +1307,6 @@ fn integration_read(data: &[u8], limit: Option) -> PolarsResult>>()?; @@ -1646,7 +1644,7 @@ fn filter_chunk() -> PolarsResult<()> { .map(|(_, row_group)| row_group) .collect(); - let reader = p_read::FileReader::new(reader, row_groups, schema, None, None); + let reader = p_read::FileReader::new(reader, row_groups, schema, None); let new_chunks = reader.collect::>>()?; diff --git a/crates/polars/tests/it/io/parquet/arrow/read.rs b/crates/polars/tests/it/io/parquet/arrow/read.rs index 3520634ba24c..6aaeb8c297cb 100644 --- a/crates/polars/tests/it/io/parquet/arrow/read.rs +++ b/crates/polars/tests/it/io/parquet/arrow/read.rs @@ -13,7 +13,7 @@ fn all_types() -> PolarsResult<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -56,7 +56,7 @@ fn all_types_chunked() -> PolarsResult<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -128,7 +128,7 @@ fn read_int96_timestamps() -> PolarsResult<()> { )], metadata: BTreeMap::new(), }; - let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, None); reader.collect::>>() }; diff --git a/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs b/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs deleted file mode 100644 index 4cc187b927d0..000000000000 --- a/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs +++ /dev/null @@ -1,250 +0,0 @@ -use std::io::Cursor; - -use arrow::array::*; -use arrow::datatypes::*; -use arrow::record_batch::RecordBatchT; -use polars_error::{PolarsError, PolarsResult}; -use polars_parquet::read::*; -use polars_parquet::write::*; - -/// Returns 2 sets of pages with different the same number of rows distributed un-evenly -fn pages( - arrays: &[&dyn Array], - encoding: Encoding, -) -> PolarsResult<(Vec, Vec, ArrowSchema)> { - // create pages with different number of rows - let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4]); - let array12 = PrimitiveArray::::from_slice([5]); - let array13 = PrimitiveArray::::from_slice([6]); - - let schema = ArrowSchema::from(vec![ - Field::new("a1", ArrowDataType::Int64, false), - Field::new( - "a2", - arrays[0].data_type().clone(), - arrays.iter().map(|x| x.null_count()).sum::() != 0usize, - ), - ]); - - let parquet_schema = to_parquet_schema(&schema)?; - - let options = WriteOptions { - statistics: StatisticsOptions::full(), - compression: CompressionOptions::Uncompressed, - version: Version::V1, - data_page_size: None, - }; - - let pages1 = [array11, array12, array13] - .into_iter() - .map(|array| { - array_to_page( - &array, - parquet_schema.columns()[0] - .descriptor - .primitive_type - .clone(), - &[Nested::primitive(None, true, array.len())], - options, - Encoding::Plain, - ) - }) - .collect::>>()?; - - let pages2 = arrays - .iter() - .flat_map(|array| { - array_to_pages( - *array, - parquet_schema.columns()[1] - .descriptor - .primitive_type - .clone(), - &[Nested::primitive(None, true, array.len())], - options, - encoding, - ) - .unwrap() - .collect::>>() - .unwrap() - }) - .collect::>(); - - Ok((pages1, pages2, schema)) -} - -/// Tests reading pages while skipping indexes -fn read_with_indexes( - (pages1, pages2, schema): (Vec, Vec, ArrowSchema), - expected: Box, -) -> PolarsResult<()> { - let options = WriteOptions { - statistics: StatisticsOptions::full(), - compression: CompressionOptions::Uncompressed, - version: Version::V1, - data_page_size: None, - }; - - let to_compressed = |pages: Vec| { - let encoded_pages = DynIter::new(pages.into_iter().map(Ok)); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]).map_err(PolarsError::from); - PolarsResult::Ok(DynStreamingIterator::new(compressed_pages)) - }; - - let row_group = DynIter::new(vec![to_compressed(pages1), to_compressed(pages2)].into_iter()); - - let writer = vec![]; - let mut writer = FileWriter::try_new(writer, schema, options)?; - - writer.write(row_group)?; - writer.end(None)?; - let data = writer.into_inner(); - - let mut reader = Cursor::new(data); - - let metadata = read_metadata(&mut reader)?; - - let schema = infer_schema(&metadata)?; - - // row group-based filtering can be done here - let row_groups = metadata.row_groups; - - // one per row group - let pages = row_groups - .iter() - .map(|row_group| { - assert!(indexes::has_indexes(row_group)); - - indexes::read_filtered_pages(&mut reader, row_group, &schema.fields, |_, intervals| { - let first_field = &intervals[0]; - let first_field_column = &first_field[0]; - assert_eq!(first_field_column.len(), 3); - let selection = [false, true, false]; - - first_field_column - .iter() - .zip(selection) - .filter(|(_i, is_selected)| *is_selected) - .map(|(i, _is_selected)| *i) - .collect() - }) - }) - .collect::>>()?; - - // apply projection pushdown - let schema = schema.filter(|index, _| index == 1); - let pages = pages - .into_iter() - .map(|pages| { - pages - .into_iter() - .enumerate() - .filter(|(index, _)| *index == 1) - .map(|(_, pages)| pages) - .collect::>() - }) - .collect::>(); - - let expected = RecordBatchT::new(vec![expected]); - - let chunks = FileReader::new(reader, row_groups, schema, None, Some(pages)); - - let arrays = chunks.collect::>>()?; - - assert_eq!(arrays, vec![expected]); - Ok(()) -} - -#[test] -fn indexed_required_i64() -> PolarsResult<()> { - let array21 = Int32Array::from_slice([1, 2, 3]); - let array22 = Int32Array::from_slice([4, 5, 6]); - let expected = Int32Array::from_slice([5]).boxed(); - - read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) -} - -#[test] -fn indexed_optional_i64() -> PolarsResult<()> { - let array21 = Int32Array::from([Some(1), Some(2), None]); - let array22 = Int32Array::from([None, Some(5), Some(6)]); - let expected = Int32Array::from_slice([5]).boxed(); - - read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) -} - -#[test] -fn indexed_optional_i64_delta() -> PolarsResult<()> { - let array21 = Int32Array::from([Some(1), Some(2), None]); - let array22 = Int32Array::from([None, Some(5), Some(6)]); - let expected = Int32Array::from_slice([5]).boxed(); - - read_with_indexes( - pages(&[&array21, &array22], Encoding::DeltaBinaryPacked)?, - expected, - ) -} - -#[test] -fn indexed_required_i64_delta() -> PolarsResult<()> { - let array21 = Int32Array::from_slice([1, 2, 3]); - let array22 = Int32Array::from_slice([4, 5, 6]); - let expected = Int32Array::from_slice([5]).boxed(); - - read_with_indexes( - pages(&[&array21, &array22], Encoding::DeltaBinaryPacked)?, - expected, - ) -} - -#[test] -fn indexed_required_fixed_len() -> PolarsResult<()> { - let array21 = FixedSizeBinaryArray::from_slice([[127], [128], [129]]); - let array22 = FixedSizeBinaryArray::from_slice([[130], [131], [132]]); - let expected = FixedSizeBinaryArray::from_slice([[131]]).boxed(); - - read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) -} - -#[test] -fn indexed_optional_fixed_len() -> PolarsResult<()> { - let array21 = FixedSizeBinaryArray::from([Some([127]), Some([128]), None]); - let array22 = FixedSizeBinaryArray::from([None, Some([131]), Some([132])]); - let expected = FixedSizeBinaryArray::from_slice([[131]]).boxed(); - - read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) -} - -#[test] -fn indexed_required_boolean() -> PolarsResult<()> { - let array21 = BooleanArray::from_slice([true, false, true]); - let array22 = BooleanArray::from_slice([false, false, true]); - let expected = BooleanArray::from_slice([false]).boxed(); - - read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) -} - -#[test] -fn indexed_optional_boolean() -> PolarsResult<()> { - let array21 = BooleanArray::from([Some(true), Some(false), None]); - let array22 = BooleanArray::from([None, Some(false), Some(true)]); - let expected = BooleanArray::from_slice([false]).boxed(); - - read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) -} - -#[test] -fn indexed_dict() -> PolarsResult<()> { - let indices = PrimitiveArray::from_values((0..6u64).map(|x| x % 2)); - let values = PrimitiveArray::from_slice([4i64, 6i64]).boxed(); - let array = DictionaryArray::try_from_keys(indices, values).unwrap(); - - let indices = PrimitiveArray::from_slice([0u64]); - let values = PrimitiveArray::from_slice([4i64, 6i64]).boxed(); - let expected = DictionaryArray::try_from_keys(indices, values).unwrap(); - - let expected = expected.boxed(); - - read_with_indexes(pages(&[&array], Encoding::RleDictionary)?, expected) -} diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index 95ab13936d19..99b1c1b7c9dd 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -13,25 +13,18 @@ mod utils; use std::fs::File; -use dictionary::{deserialize as deserialize_dict, DecodedDictPage}; -#[cfg(feature = "async")] -use futures::StreamExt; +use dictionary::DecodedDictPage; use polars_parquet::parquet::encoding::hybrid_rle::HybridRleDecoder; use polars_parquet::parquet::error::{ParquetError, ParquetResult}; use polars_parquet::parquet::metadata::ColumnChunkMetaData; -use polars_parquet::parquet::page::{CompressedPage, DataPage, Page}; -#[cfg(feature = "async")] -use polars_parquet::parquet::read::get_page_stream; -#[cfg(feature = "async")] -use polars_parquet::parquet::read::read_metadata_async; +use polars_parquet::parquet::page::DataPage; use polars_parquet::parquet::read::{ - get_column_iterator, get_field_columns, read_metadata, BasicDecompressor, MutStreamingIterator, - State, + get_column_iterator, get_field_columns, read_metadata, BasicDecompressor, }; use polars_parquet::parquet::schema::types::{GroupConvertedType, ParquetType}; use polars_parquet::parquet::schema::Repetition; use polars_parquet::parquet::types::int96_to_i64_ns; -use polars_parquet::parquet::FallibleStreamingIterator; +use polars_parquet::read::PageReader; use polars_utils::mmap::MemReader; use super::*; @@ -147,54 +140,29 @@ pub fn page_to_array(page: &DataPage, dict: Option<&DecodedDictPage>) -> Parquet } } -pub fn collect>( - mut iterator: I, - type_: PhysicalType, -) -> ParquetResult> { - let mut arrays = vec![]; - let mut dict = None; - while let Some(page) = iterator.next()? { - match page { - Page::Data(page) => arrays.push(page_to_array(page, dict.as_ref())?), - Page::Dict(page) => { - dict = Some(deserialize_dict(page, type_)?); - }, - } - } - Ok(arrays) -} - /// Reads columns into an [`Array`]. /// This is CPU-intensive: decompress, decode and de-serialize. -pub fn columns_to_array(mut columns: I, field: &ParquetType) -> ParquetResult +pub fn columns_to_array(mut columns: I, field: &ParquetType) -> ParquetResult where - II: Iterator>, - I: MutStreamingIterator, + I: Iterator>, { let mut validity = vec![]; let mut has_filled = false; let mut arrays = vec![]; - while let State::Some(mut new_iter) = columns.advance()? { - if let Some((pages, column)) = new_iter.get() { - let mut iterator = BasicDecompressor::new(pages, vec![]); - - let mut dict = None; - while let Some(page) = iterator.next()? { - match page { - polars_parquet::parquet::page::Page::Data(page) => { - if !has_filled { - struct_::extend_validity(&mut validity, page)?; - } - arrays.push(page_to_array(page, dict.as_ref())?) - }, - polars_parquet::parquet::page::Page::Dict(page) => { - dict = Some(deserialize_dict(page, column.physical_type())?); - }, - } + while let Some((pages, column)) = columns.next().transpose()? { + let mut iterator = BasicDecompressor::new(pages, vec![]); + + let dict = iterator + .read_dict_page()? + .map(|dict| dictionary::deserialize(&dict, column.physical_type())) + .transpose()?; + while let Some(page) = iterator.next().transpose()? { + if !has_filled { + struct_::extend_validity(&mut validity, &page)?; } + arrays.push(page_to_array(&page, dict.as_ref())?) } has_filled = true; - columns = new_iter; } match field { @@ -233,7 +201,6 @@ pub fn read_column( &metadata.row_groups[row_group], field.name(), None, - vec![], usize::MAX, ); @@ -246,42 +213,6 @@ pub fn read_column( Ok((array, statistics.pop().unwrap())) } -#[cfg(feature = "async")] -pub async fn read_column_async< - R: futures::AsyncRead + futures::AsyncSeek + Send + std::marker::Unpin, ->( - reader: &mut R, - row_group: usize, - field_name: &str, -) -> ParquetResult<(Array, Option)> { - let metadata = read_metadata_async(reader).await?; - - let field = metadata - .schema() - .fields() - .iter() - .find(|field| field.name() == field_name) - .ok_or_else(|| ParquetError::OutOfSpec("column does not exist".to_string()))?; - - let column = get_field_columns(metadata.row_groups[row_group].columns(), field.name()) - .next() - .unwrap(); - - let pages = get_page_stream(column, reader, vec![], Arc::new(|_, _| true), usize::MAX).await?; - - let mut statistics = get_field_columns(metadata.row_groups[row_group].columns(), field.name()) - .map(|column_meta| column_meta.statistics().transpose()) - .collect::>>()?; - - let pages = pages.collect::>().await; - - let iterator = BasicDecompressor::new(pages.into_iter(), vec![]); - - let mut arrays = collect(iterator, column.physical_type())?; - - Ok((arrays.pop().unwrap(), statistics.pop().unwrap())) -} - fn get_column(path: &str, column: &str) -> ParquetResult<(Array, Option)> { let file = File::open(path).unwrap(); let memreader = MemReader::from_reader(file).unwrap(); diff --git a/crates/polars/tests/it/io/parquet/roundtrip.rs b/crates/polars/tests/it/io/parquet/roundtrip.rs index 6f6a26255e60..aa4eacb0e04d 100644 --- a/crates/polars/tests/it/io/parquet/roundtrip.rs +++ b/crates/polars/tests/it/io/parquet/roundtrip.rs @@ -53,7 +53,7 @@ fn round_trip( .collect(); // we can then read the row groups into chunks - let chunks = polars_parquet::read::FileReader::new(reader, row_groups, schema, None, None); + let chunks = polars_parquet::read::FileReader::new(reader, row_groups, schema, None); let mut arrays = vec![]; for chunk in chunks { diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index c0e37be12a68..7f066fe726e4 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -16,8 +16,6 @@ use polars_parquet::parquet::metadata::{Descriptor, SchemaDescriptor}; use polars_parquet::parquet::page::Page; use polars_parquet::parquet::schema::types::{ParquetType, PhysicalType}; use polars_parquet::parquet::statistics::Statistics; -#[cfg(feature = "async")] -use polars_parquet::parquet::write::FileStreamer; use polars_parquet::parquet::write::{ Compressor, DynIter, DynStreamingIterator, FileWriter, Version, WriteOptions, }; @@ -50,17 +48,6 @@ fn read_column(reader: &mut R) -> ParquetResult<(Array, Option( - reader: &mut R, -) -> ParquetResult<(Array, Option)> { - let (a, statistics) = super::read::read_column_async(reader, 0, "col").await?; - Ok((a, statistics)) -} - fn test_column(column: &str, compression: CompressionOptions) -> ParquetResult<()> { let array = alltypes_plain(column); @@ -234,60 +221,6 @@ fn basic() -> ParquetResult<()> { Ok(()) } -#[cfg(feature = "async")] -#[allow(dead_code)] -async fn test_column_async(column: &str, compression: CompressionOptions) -> ParquetResult<()> { - let array = alltypes_plain(column); - - let options = WriteOptions { - write_statistics: true, - version: Version::V1, - }; - - // prepare schema - let type_ = match array { - Array::Int32(_) => PhysicalType::Int32, - Array::Int64(_) => PhysicalType::Int64, - Array::Int96(_) => PhysicalType::Int96, - Array::Float(_) => PhysicalType::Float, - Array::Double(_) => PhysicalType::Double, - Array::Binary(_) => PhysicalType::ByteArray, - _ => todo!(), - }; - - let schema = SchemaDescriptor::new( - "schema".to_string(), - vec![ParquetType::from_physical("col".to_string(), type_)], - ); - - let a = schema.columns(); - - let pages = DynStreamingIterator::new(Compressor::new_from_vec( - DynIter::new(std::iter::once(array_to_page( - &array, - &options, - &a[0].descriptor, - ))), - compression, - vec![], - )); - let columns = std::iter::once(Ok(pages)); - - let writer = futures::io::Cursor::new(vec![]); - let mut writer = FileStreamer::new(writer, schema, options, None); - - writer.write(DynIter::new(columns)).await?; - writer.end(None).await?; - - let data = writer.into_inner().into_inner(); - - let (result, statistics) = read_column_async(&mut futures::io::Cursor::new(data)).await?; - assert_eq!(array, result); - let stats = alltypes_statistics(column); - assert_eq!(statistics.as_ref(), Some(stats).as_ref()); - Ok(()) -} - #[test] fn test_parquet() { // In CI: This test will be skipped because the file does not exist.