diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index b190ffa21daf..f193ef104486 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -635,8 +635,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { self.column_index_builder.append( null_page, - self.truncate_min_value(stat.min_bytes()), - self.truncate_max_value(stat.max_bytes()), + self.truncate_min_value( + self.props.column_index_truncate_length(), + stat.min_bytes(), + ) + .0, + self.truncate_max_value( + self.props.column_index_truncate_length(), + stat.max_bytes(), + ) + .0, self.page_metrics.num_page_nulls as i64, ); } @@ -657,26 +665,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .append_row_count(self.page_metrics.num_buffered_rows as i64); } - fn truncate_min_value(&self, data: &[u8]) -> Vec { - self.props - .column_index_truncate_length() + fn truncate_min_value(&self, truncation_length: Option, data: &[u8]) -> (Vec, bool) { + truncation_length .filter(|l| data.len() > *l) .and_then(|l| match str::from_utf8(data) { Ok(str_data) => truncate_utf8(str_data, l), Err(_) => Some(data[..l].to_vec()), }) - .unwrap_or_else(|| data.to_vec()) + .map(|truncated| (truncated, true)) + .unwrap_or_else(|| (data.to_vec(), false)) } - fn truncate_max_value(&self, data: &[u8]) -> Vec { - self.props - .column_index_truncate_length() + fn truncate_max_value(&self, truncation_length: Option, data: &[u8]) -> (Vec, bool) { + truncation_length .filter(|l| data.len() > *l) .and_then(|l| match str::from_utf8(data) { Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8), Err(_) => increment(data[..l].to_vec()), }) - .unwrap_or_else(|| data.to_vec()) + .map(|truncated| (truncated, true)) + .unwrap_or_else(|| (data.to_vec(), false)) } /// Adds data page. @@ -857,6 +865,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .set_dictionary_page_offset(dict_page_offset); if self.statistics_enabled != EnabledStatistics::None { + let backwards_compatible_min_max = self.descr.sort_order().is_signed(); + let statistics = ValueStatistics::::new( self.column_metrics.min_column_value.clone(), self.column_metrics.max_column_value.clone(), @@ -865,14 +875,52 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { false, true, true, - ); + ) + .with_backwards_compatible_min_max(backwards_compatible_min_max) + .into(); + + let statistics = match statistics { + Statistics::ByteArray(stats) if stats.has_min_max_set() => { + let (min, did_truncate_min) = self.truncate_min_value( + self.props.statistics_truncate_length(), + stats.min_bytes().clone(), + ); + let (max, did_truncate_max) = self.truncate_max_value( + self.props.statistics_truncate_length(), + stats.max_bytes(), + ); + Statistics::byte_array( + Some(min.into()), + Some(max.into()), + stats.distinct_count(), + stats.null_count(), + backwards_compatible_min_max, + !did_truncate_max, + !did_truncate_min, + ) + } + Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => { + let (min, did_truncate_min) = self.truncate_min_value( + self.props.statistics_truncate_length(), + stats.min_bytes(), + ); + let (max, did_truncate_max) = self.truncate_max_value( + self.props.statistics_truncate_length(), + stats.max_bytes(), + ); + Statistics::fixed_len_byte_array( + Some(min.into()), + Some(max.into()), + stats.distinct_count(), + stats.null_count(), + backwards_compatible_min_max, + !did_truncate_max, + !did_truncate_min, + ) + } + stats => stats, + }; - // Some common readers only support the deprecated statistics - // format so we also write them out if possible - // See https://github.com/apache/arrow-rs/issues/799 - let statistics = statistics - .with_backwards_compatible_min_max(self.descr.sort_order().is_signed()) - .into(); builder = builder.set_statistics(statistics); } @@ -2468,6 +2516,148 @@ mod tests { } } + #[test] + fn test_statistics_truncating_byte_array() { + let page_writer = get_test_page_writer(); + + const TEST_TRUNCATE_LENGTH: usize = 1; + + // Truncate values at 1 byte + let builder = + WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH)); + let props = Arc::new(builder.build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + let mut data = vec![ByteArray::default(); 1]; + // This is the expected min value + data[0].set_data(Bytes::from(String::from("Blart Versenwald III"))); + + writer.write_batch(&data, None, None).unwrap(); + + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + assert_eq!(1, r.rows_written); + + let stats = r.metadata.statistics().expect("statistics"); + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::ByteArray(_stats) = stats { + let min_value = _stats.min(); + let max_value = _stats.max(); + + assert!(!_stats.min_is_exact()); + assert!(!_stats.max_is_exact()); + + assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH); + assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH); + + assert_eq!("B".as_bytes(), min_value.as_bytes()); + assert_eq!("C".as_bytes(), max_value.as_bytes()); + } else { + panic!("expecting Statistics::ByteArray"); + } + } + + #[test] + fn test_statistics_truncating_fixed_len_byte_array() { + let page_writer = get_test_page_writer(); + + const TEST_TRUNCATE_LENGTH: usize = 1; + + // Truncate values at 1 byte + let builder = + WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH)); + let props = Arc::new(builder.build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + let mut data = vec![FixedLenByteArray::default(); 1]; + + const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654; + const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes(); + + const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals + const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] = + [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0]; + + // This is the expected min value + data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice())); + + writer.write_batch(&data, None, None).unwrap(); + + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + assert_eq!(1, r.rows_written); + + let stats = r.metadata.statistics().expect("statistics"); + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::FixedLenByteArray(_stats) = stats { + let min_value = _stats.min(); + let max_value = _stats.max(); + + assert!(!_stats.min_is_exact()); + assert!(!_stats.max_is_exact()); + + assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH); + assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH); + + assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes()); + assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes()); + + let reconstructed_min = i128::from_be_bytes([ + min_value.as_bytes()[0], + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ]); + + let reconstructed_max = i128::from_be_bytes([ + max_value.as_bytes()[0], + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ]); + + // check that the inner value is correctly bounded by the min/max + println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}"); + assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE); + println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}"); + assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE); + } else { + panic!("expecting Statistics::FixedLenByteArray"); + } + } + #[test] fn test_send() { fn test() {} diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index ea71763a0101..287e73c9906a 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -51,6 +51,8 @@ pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option = Some(64); pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05; /// Default value for [`BloomFilterProperties::ndv`] pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64; +/// Default values for [`WriterProperties::statistics_truncate_length`] +pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = None; /// Parquet writer version. /// @@ -136,6 +138,7 @@ pub struct WriterProperties { column_properties: HashMap, sorting_columns: Option>, column_index_truncate_length: Option, + statistics_truncate_length: Option, } impl Default for WriterProperties { @@ -241,6 +244,13 @@ impl WriterProperties { self.column_index_truncate_length } + /// Returns the maximum length of truncated min/max values in statistics. + /// + /// `None` if truncation is disabled, must be greater than 0 otherwise. + pub fn statistics_truncate_length(&self) -> Option { + self.statistics_truncate_length + } + /// Returns encoding for a data page, when dictionary encoding is enabled. /// This is not configurable. #[inline] @@ -334,6 +344,7 @@ pub struct WriterPropertiesBuilder { column_properties: HashMap, sorting_columns: Option>, column_index_truncate_length: Option, + statistics_truncate_length: Option, } impl WriterPropertiesBuilder { @@ -352,6 +363,7 @@ impl WriterPropertiesBuilder { column_properties: HashMap::new(), sorting_columns: None, column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, + statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, } } @@ -370,6 +382,7 @@ impl WriterPropertiesBuilder { column_properties: self.column_properties, sorting_columns: self.sorting_columns, column_index_truncate_length: self.column_index_truncate_length, + statistics_truncate_length: self.statistics_truncate_length, } } @@ -643,6 +656,17 @@ impl WriterPropertiesBuilder { self.column_index_truncate_length = max_length; self } + + /// Sets the max length of min/max value fields in statistics. Must be greater than 0. + /// If set to `None` - there's no effective limit. + pub fn set_statistics_truncate_length(mut self, max_length: Option) -> Self { + if let Some(value) = max_length { + assert!(value > 0, "Cannot have a 0 statistics truncate length. If you wish to disable min/max value truncation, set it to `None`."); + } + + self.statistics_truncate_length = max_length; + self + } } /// Controls the level of statistics to be computed by the writer diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 26729afc03db..9e914009bcd7 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -578,12 +578,12 @@ impl ValueStatistics { } /// Returns optional value of number of distinct values occurring. - fn distinct_count(&self) -> Option { + pub fn distinct_count(&self) -> Option { self.distinct_count } /// Returns null count. - fn null_count(&self) -> u64 { + pub fn null_count(&self) -> u64 { self.null_count }