diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 3eb2f8882708..45bca7eb5e82 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1015,12 +1015,9 @@ impl<'a, W: Write> ParquetMetadataWriter<'a, W> { mod tests { use super::*; - use arrow_array::{ArrayRef, Int32Array, RecordBatch}; - use arrow_schema::{DataType as ArrowDataType, Field, Schema}; - use bytes::{BufMut, Bytes, BytesMut}; + use bytes::Bytes; use std::fs::File; - use crate::arrow::ArrowWriter; use crate::basic::{ ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type, }; @@ -1028,7 +1025,6 @@ mod tests { use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::{BoolType, ByteArrayType, Int32Type}; - use crate::file::footer::parse_metadata; use crate::file::page_index::index::Index; use crate::file::properties::EnabledStatistics; use crate::file::serialized_reader::ReadOptionsBuilder; @@ -2054,222 +2050,6 @@ mod tests { assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); } - struct TestMetadata { - #[allow(dead_code)] - file_size: usize, - metadata: ParquetMetaData, - } - - fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata { - let mut buf = BytesMut::new().writer(); - let schema: Arc = Arc::new(Schema::new(vec![Field::new( - "a", - ArrowDataType::Int32, - true, - )])); - - let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); - - let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - - let writer_props = match write_page_index { - true => WriterProperties::builder() - .set_statistics_enabled(EnabledStatistics::Page) - .build(), - false => WriterProperties::builder() - .set_statistics_enabled(EnabledStatistics::Chunk) - .build(), - }; - - let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let data = buf.into_inner().freeze(); - - let reader_opts = match read_page_index { - true => ReadOptionsBuilder::new().with_page_index().build(), - false => ReadOptionsBuilder::new().build(), - }; - let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap(); - let metadata = reader.metadata().clone(); - TestMetadata { - file_size: data.len(), - metadata, - } - } - - fn has_page_index(metadata: &ParquetMetaData) -> bool { - match metadata.column_index() { - Some(column_index) => column_index - .iter() - .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))), - None => false, - } - } - - #[test] - fn test_roundtrip_parquet_metadata_without_page_index() { - // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so - // we at least test round trip without them - let metadata = get_test_metadata(false, false); - assert!(!has_page_index(&metadata.metadata)); - - let mut buf = BytesMut::new().writer(); - { - let mut writer = ParquetMetadataWriter::new(&mut buf, &metadata.metadata); - writer.finish().unwrap(); - } - - let data = buf.into_inner().freeze(); - - let decoded_metadata = parse_metadata(&data).unwrap(); - assert!(!has_page_index(&metadata.metadata)); - - assert_eq!(metadata.metadata, decoded_metadata); - } - - /// Temporary function so we can test loading metadata with page indexes - /// while we haven't fully figured out how to load it cleanly - #[cfg(feature = "async")] - async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { - use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; - use crate::errors::Result as ParquetResult; - use bytes::Bytes; - use futures::future::BoxFuture; - use futures::FutureExt; - use std::ops::Range; - - /// Adapt a `Bytes` to a `MetadataFetch` implementation. - struct AsyncBytes { - data: Bytes, - } - - impl AsyncBytes { - fn new(data: Bytes) -> Self { - Self { data } - } - } - - impl MetadataFetch for AsyncBytes { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { - async move { Ok(self.data.slice(range.start..range.end)) }.boxed() - } - } - - /// A `MetadataFetch` implementation that reads from a subset of the full data - /// while accepting ranges that address the full data. - struct MaskedBytes { - inner: Box, - inner_range: Range, - } - - impl MaskedBytes { - fn new(inner: Box, inner_range: Range) -> Self { - Self { inner, inner_range } - } - } - - impl MetadataFetch for &mut MaskedBytes { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { - let inner_range = self.inner_range.clone(); - println!("inner_range: {:?}", inner_range); - println!("range: {:?}", range); - assert!(inner_range.start <= range.start && inner_range.end >= range.end); - let range = - range.start - self.inner_range.start..range.end - self.inner_range.start; - self.inner.fetch(range) - } - } - - let metadata_length = data.len(); - let mut reader = MaskedBytes::new( - Box::new(AsyncBytes::new(data)), - file_size - metadata_length..file_size, - ); - let metadata = MetadataLoader::load(&mut reader, file_size, None) - .await - .unwrap(); - let loaded_metadata = metadata.finish(); - let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); - metadata.load_page_index(true, true).await.unwrap(); - metadata.finish() - } - - fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { - assert_eq!(left.column_descr(), right.column_descr()); - assert_eq!(left.encodings(), right.encodings()); - assert_eq!(left.num_values(), right.num_values()); - assert_eq!(left.compressed_size(), right.compressed_size()); - assert_eq!(left.data_page_offset(), right.data_page_offset()); - assert_eq!(left.statistics(), right.statistics()); - assert_eq!(left.offset_index_length(), right.offset_index_length()); - assert_eq!(left.column_index_length(), right.column_index_length()); - assert_eq!( - left.unencoded_byte_array_data_bytes(), - right.unencoded_byte_array_data_bytes() - ); - } - - fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) { - assert_eq!(left.num_rows(), right.num_rows()); - assert_eq!(left.file_offset(), right.file_offset()); - assert_eq!(left.total_byte_size(), right.total_byte_size()); - assert_eq!(left.schema_descr(), right.schema_descr()); - assert_eq!(left.num_columns(), right.num_columns()); - left.columns() - .iter() - .zip(right.columns().iter()) - .for_each(|(lc, rc)| { - check_columns_are_equivalent(lc, rc); - }); - } - - #[tokio::test] - #[cfg(feature = "async")] - async fn test_encode_parquet_metadata_with_page_index() { - // Create a ParquetMetadata with page index information - let metadata = get_test_metadata(true, true); - assert!(has_page_index(&metadata.metadata)); - - let mut buf = BytesMut::new().writer(); - { - let mut writer = ParquetMetadataWriter::new(&mut buf, &metadata.metadata); - writer.finish().unwrap(); - } - - let data = buf.into_inner().freeze(); - - let decoded_metadata = load_metadata_from_bytes(data.len(), data).await; - - // Because the page index offsets will differ, compare invariant parts of the metadata - assert_eq!( - metadata.metadata.file_metadata(), - decoded_metadata.file_metadata() - ); - assert_eq!( - metadata.metadata.column_index(), - decoded_metadata.column_index() - ); - assert_eq!( - metadata.metadata.offset_index(), - decoded_metadata.offset_index() - ); - assert_eq!( - metadata.metadata.num_row_groups(), - decoded_metadata.num_row_groups() - ); - - metadata - .metadata - .row_groups() - .iter() - .zip(decoded_metadata.row_groups().iter()) - .for_each(|(left, right)| { - check_row_groups_are_equivalent(left, right); - }); - } - #[test] fn test_byte_array_size_statistics() { let message_type = " @@ -2348,4 +2128,237 @@ mod tests { assert_eq!(page_sizes.len(), 1); assert_eq!(page_sizes[0], unenc_size); } + + #[cfg(feature = "async")] + mod async_tests { + use std::sync::Arc; + + use crate::file::footer::parse_metadata; + use crate::file::properties::{EnabledStatistics, WriterProperties}; + use crate::file::reader::{FileReader, SerializedFileReader}; + use crate::file::writer::ParquetMetadataWriter; + use crate::{ + arrow::ArrowWriter, + file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder}, + }; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + use arrow_schema::{DataType as ArrowDataType, Field, Schema}; + use bytes::{BufMut, Bytes, BytesMut}; + + use super::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData}; + + struct TestMetadata { + #[allow(dead_code)] + file_size: usize, + metadata: ParquetMetaData, + } + + fn has_page_index(metadata: &ParquetMetaData) -> bool { + match metadata.column_index() { + Some(column_index) => column_index + .iter() + .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))), + None => false, + } + } + + #[test] + fn test_roundtrip_parquet_metadata_without_page_index() { + // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so + // we at least test round trip without them + let metadata = get_test_metadata(false, false); + assert!(!has_page_index(&metadata.metadata)); + + let mut buf = BytesMut::new().writer(); + { + let mut writer = ParquetMetadataWriter::new(&mut buf, &metadata.metadata); + writer.finish().unwrap(); + } + + let data = buf.into_inner().freeze(); + + let decoded_metadata = parse_metadata(&data).unwrap(); + assert!(!has_page_index(&metadata.metadata)); + + assert_eq!(metadata.metadata, decoded_metadata); + } + + fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata { + let mut buf = BytesMut::new().writer(); + let schema: Arc = Arc::new(Schema::new(vec![Field::new( + "a", + ArrowDataType::Int32, + true, + )])); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); + + let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + + let writer_props = match write_page_index { + true => WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + false => WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(), + }; + + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let data = buf.into_inner().freeze(); + + let reader_opts = match read_page_index { + true => ReadOptionsBuilder::new().with_page_index().build(), + false => ReadOptionsBuilder::new().build(), + }; + let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap(); + let metadata = reader.metadata().clone(); + TestMetadata { + file_size: data.len(), + metadata, + } + } + + /// Temporary function so we can test loading metadata with page indexes + /// while we haven't fully figured out how to load it cleanly + async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { + use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; + use crate::errors::Result as ParquetResult; + use bytes::Bytes; + use futures::future::BoxFuture; + use futures::FutureExt; + use std::ops::Range; + + /// Adapt a `Bytes` to a `MetadataFetch` implementation. + struct AsyncBytes { + data: Bytes, + } + + impl AsyncBytes { + fn new(data: Bytes) -> Self { + Self { data } + } + } + + impl MetadataFetch for AsyncBytes { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + async move { Ok(self.data.slice(range.start..range.end)) }.boxed() + } + } + + /// A `MetadataFetch` implementation that reads from a subset of the full data + /// while accepting ranges that address the full data. + struct MaskedBytes { + inner: Box, + inner_range: Range, + } + + impl MaskedBytes { + fn new(inner: Box, inner_range: Range) -> Self { + Self { inner, inner_range } + } + } + + impl MetadataFetch for &mut MaskedBytes { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + let inner_range = self.inner_range.clone(); + println!("inner_range: {:?}", inner_range); + println!("range: {:?}", range); + assert!(inner_range.start <= range.start && inner_range.end >= range.end); + let range = + range.start - self.inner_range.start..range.end - self.inner_range.start; + self.inner.fetch(range) + } + } + + let metadata_length = data.len(); + let mut reader = MaskedBytes::new( + Box::new(AsyncBytes::new(data)), + file_size - metadata_length..file_size, + ); + let metadata = MetadataLoader::load(&mut reader, file_size, None) + .await + .unwrap(); + let loaded_metadata = metadata.finish(); + let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); + metadata.load_page_index(true, true).await.unwrap(); + metadata.finish() + } + + fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { + assert_eq!(left.column_descr(), right.column_descr()); + assert_eq!(left.encodings(), right.encodings()); + assert_eq!(left.num_values(), right.num_values()); + assert_eq!(left.compressed_size(), right.compressed_size()); + assert_eq!(left.data_page_offset(), right.data_page_offset()); + assert_eq!(left.statistics(), right.statistics()); + assert_eq!(left.offset_index_length(), right.offset_index_length()); + assert_eq!(left.column_index_length(), right.column_index_length()); + assert_eq!( + left.unencoded_byte_array_data_bytes(), + right.unencoded_byte_array_data_bytes() + ); + } + + fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) { + assert_eq!(left.num_rows(), right.num_rows()); + assert_eq!(left.file_offset(), right.file_offset()); + assert_eq!(left.total_byte_size(), right.total_byte_size()); + assert_eq!(left.schema_descr(), right.schema_descr()); + assert_eq!(left.num_columns(), right.num_columns()); + left.columns() + .iter() + .zip(right.columns().iter()) + .for_each(|(lc, rc)| { + check_columns_are_equivalent(lc, rc); + }); + } + + #[tokio::test] + async fn test_encode_parquet_metadata_with_page_index() { + // Create a ParquetMetadata with page index information + let metadata = get_test_metadata(true, true); + assert!(has_page_index(&metadata.metadata)); + + let mut buf = BytesMut::new().writer(); + { + let mut writer = ParquetMetadataWriter::new(&mut buf, &metadata.metadata); + writer.finish().unwrap(); + } + + let data = buf.into_inner().freeze(); + + let decoded_metadata = load_metadata_from_bytes(data.len(), data).await; + + // Because the page index offsets will differ, compare invariant parts of the metadata + assert_eq!( + metadata.metadata.file_metadata(), + decoded_metadata.file_metadata() + ); + assert_eq!( + metadata.metadata.column_index(), + decoded_metadata.column_index() + ); + assert_eq!( + metadata.metadata.offset_index(), + decoded_metadata.offset_index() + ); + assert_eq!( + metadata.metadata.num_row_groups(), + decoded_metadata.num_row_groups() + ); + + metadata + .metadata + .row_groups() + .iter() + .zip(decoded_metadata.row_groups().iter()) + .for_each(|(left, right)| { + check_row_groups_are_equivalent(left, right); + }); + } + } }