Skip to content

Commit

Permalink
Parquet: Ensure page statistics are written only when conifgured from…
Browse files Browse the repository at this point in the history
… the Arrow Writer (#5181)

* Issue fix and tests

* Cleanup tests
  • Loading branch information
AdamGS authored Dec 7, 2023
1 parent 1534cc1 commit 490c080
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 14 deletions.
143 changes: 142 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,9 @@ mod tests {
use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::{ReaderProperties, WriterVersion};
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -2738,4 +2739,144 @@ mod tests {
assert_eq!(index[0][0].len(), 1); // 1 page
assert_eq!(index[0][1].len(), 1); // 1 page
}

#[test]
fn test_disabled_statistics_with_page() {
let file_schema = Schema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Utf8, true),
]);
let file_schema = Arc::new(file_schema);

let batch = RecordBatch::try_new(
file_schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
],
)
.unwrap();

let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
.build();

let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
writer.write(&batch).unwrap();

let metadata = writer.close().unwrap();
assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
assert_eq!(row_group.columns.len(), 2);
// Column "a" has both offset and column index, as requested
assert!(row_group.columns[0].offset_index_offset.is_some());
assert!(row_group.columns[0].column_index_offset.is_some());
// Column "b" should only have offset index
assert!(row_group.columns[1].offset_index_offset.is_some());
assert!(row_group.columns[1].column_index_offset.is_none());

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();

let row_group = reader.get_row_group(0).unwrap();
let a_col = row_group.metadata().column(0);
let b_col = row_group.metadata().column(1);

// Column chunk of column "a" should have chunk level statistics
if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
let min = byte_array_stats.min();
let max = byte_array_stats.max();

assert_eq!(min.as_bytes(), &[b'a']);
assert_eq!(max.as_bytes(), &[b'd']);
} else {
panic!("expecting Statistics::ByteArray");
}

// The column chunk for column "b" shouldn't have statistics
assert!(b_col.statistics().is_none());

let offset_index = reader.metadata().offset_index().unwrap();
assert_eq!(offset_index.len(), 1); // 1 row group
assert_eq!(offset_index[0].len(), 2); // 2 columns

let column_index = reader.metadata().column_index().unwrap();
assert_eq!(column_index.len(), 1); // 1 row group
assert_eq!(column_index[0].len(), 2); // 2 columns

let a_idx = &column_index[0][0];
assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}

#[test]
fn test_disabled_statistics_with_chunk() {
let file_schema = Schema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Utf8, true),
]);
let file_schema = Arc::new(file_schema);

let batch = RecordBatch::try_new(
file_schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
],
)
.unwrap();

let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
.build();

let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
writer.write(&batch).unwrap();

let metadata = writer.close().unwrap();
assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
assert_eq!(row_group.columns.len(), 2);
// Column "a" should only have offset index
assert!(row_group.columns[0].offset_index_offset.is_some());
assert!(row_group.columns[0].column_index_offset.is_none());
// Column "b" should only have offset index
assert!(row_group.columns[1].offset_index_offset.is_some());
assert!(row_group.columns[1].column_index_offset.is_none());

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();

let row_group = reader.get_row_group(0).unwrap();
let a_col = row_group.metadata().column(0);
let b_col = row_group.metadata().column(1);

// Column chunk of column "a" should have chunk level statistics
if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
let min = byte_array_stats.min();
let max = byte_array_stats.max();

assert_eq!(min.as_bytes(), &[b'a']);
assert_eq!(max.as_bytes(), &[b'd']);
} else {
panic!("expecting Statistics::ByteArray");
}

// The column chunk for column "b" shouldn't have statistics
assert!(b_col.statistics().is_none());

let column_index = reader.metadata().column_index().unwrap();
assert_eq!(column_index.len(), 1); // 1 row group
assert_eq!(column_index[0].len(), 2); // 2 columns

let a_idx = &column_index[0][0];
assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}
}
29 changes: 16 additions & 13 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,19 +764,22 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;

let page_statistics = match (values_data.min_value, values_data.max_value) {
(Some(min), Some(max)) => {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
Some(ValueStatistics::new(
Some(min),
Some(max),
None,
self.page_metrics.num_page_nulls,
false,
))
}
_ => None,
let page_statistics = if let (Some(min), Some(max)) =
(values_data.min_value, values_data.max_value)
{
// Update chunk level statistics
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

(self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new(
Some(min),
Some(max),
None,
self.page_metrics.num_page_nulls,
false,
))
} else {
None
};

// update column and offset index
Expand Down

0 comments on commit 490c080

Please sign in to comment.