diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index 7eba949042f1..69ccfe71f5a0 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -176,6 +176,38 @@ impl NativeIndex { boundary_order: index.boundary_order, }) } + + pub(crate) fn to_thrift(&self) -> ColumnIndex { + let min_values = self + .indexes + .iter() + .map(|x| x.min_bytes().map(|x| x.to_vec())) + .collect::>>() + .unwrap_or_else(|| vec![vec![]; self.indexes.len()]); + + let max_values = self + .indexes + .iter() + .map(|x| x.max_bytes().map(|x| x.to_vec())) + .collect::>>() + .unwrap_or_else(|| vec![vec![]; self.indexes.len()]); + + let null_counts = self + .indexes + .iter() + .map(|x| x.null_count()) + .collect::>>(); + + ColumnIndex::new( + self.indexes.iter().map(|x| x.min().is_none()).collect(), + min_values, + max_values, + self.boundary_order, + null_counts, + None, + None, + ) + } } #[cfg(test)] diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index c44a7e6697f0..3eb2f8882708 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -19,6 +19,7 @@ //! using row group writers and column writers respectively. use crate::bloom_filter::Sbbf; +use crate::file::page_index::index::Index; use crate::format as parquet; use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; use crate::thrift::TSerializable; @@ -261,122 +262,41 @@ impl SerializedFileWriter { Ok(()) } - /// Serialize all the offset index to the file - fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { - // iter row group - // iter each column - // write offset index to the file - for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { - for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { - match &self.offset_indexes[row_group_idx][column_idx] { - Some(offset_index) => { - let start_offset = self.buf.bytes_written(); - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - offset_index.write_to_out_protocol(&mut protocol)?; - let end_offset = self.buf.bytes_written(); - // set offset and index for offset index - column_metadata.offset_index_offset = Some(start_offset as i64); - column_metadata.offset_index_length = - Some((end_offset - start_offset) as i32); - } - None => {} - } - } - } - Ok(()) - } - - /// Serialize all the column index to the file - fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { - // iter row group - // iter each column - // write column index to the file - for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { - for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { - match &self.column_indexes[row_group_idx][column_idx] { - Some(column_index) => { - let start_offset = self.buf.bytes_written(); - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - column_index.write_to_out_protocol(&mut protocol)?; - let end_offset = self.buf.bytes_written(); - // set offset and index for offset index - column_metadata.column_index_offset = Some(start_offset as i64); - column_metadata.column_index_length = - Some((end_offset - start_offset) as i32); - } - None => {} - } - } - } - Ok(()) - } - /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { self.finished = true; - let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); // write out any remaining bloom filters after all row groups for row_group in &mut self.row_groups { write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; } - let mut row_groups = self - .row_groups - .as_slice() - .iter() - .map(|v| v.to_thrift()) - .collect::>(); - - // Write column indexes and offset indexes - self.write_column_indexes(&mut row_groups)?; - self.write_offset_indexes(&mut row_groups)?; - let key_value_metadata = match self.props.key_value_metadata() { Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()), None if self.kv_metadatas.is_empty() => None, None => Some(self.kv_metadatas.clone()), }; - // We only include ColumnOrder for leaf nodes. - // Currently only supported ColumnOrder is TypeDefinedOrder so we set this - // for all leaf nodes. - // Even if the column has an undefined sort order, such as INTERVAL, this - // is still technically the defined TYPEORDER so it should still be set. - let column_orders = (0..self.schema_descr().num_columns()) - .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder {})) - .collect(); - // This field is optional, perhaps in cases where no min/max fields are set - // in any Statistics or ColumnIndex object in the whole file. - // But for simplicity we always set this field. - let column_orders = Some(column_orders); + let row_groups = self + .row_groups + .iter() + .map(|v| v.to_thrift()) + .collect::>(); - let file_metadata = parquet::FileMetaData { - num_rows, + let mut encoder = ThriftMetadataWriter::new( + &mut self.buf, + &self.schema, + &self.descr, row_groups, - key_value_metadata, - version: self.props.writer_version().as_num(), - schema: types::to_thrift(self.schema.as_ref())?, - created_by: Some(self.props.created_by().to_owned()), - column_orders, - encryption_algorithm: None, - footer_signing_key_metadata: None, - }; - - // Write file metadata - let start_pos = self.buf.bytes_written(); - { - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - file_metadata.write_to_out_protocol(&mut protocol)?; + Some(self.props.created_by().to_string()), + self.props.writer_version().as_num(), + ); + if let Some(key_value_metadata) = key_value_metadata { + encoder = encoder.with_key_value_metadata(key_value_metadata) } - let end_pos = self.buf.bytes_written(); - - // Write footer - let metadata_len = (end_pos - start_pos) as u32; - - self.buf.write_all(&metadata_len.to_le_bytes())?; - self.buf.write_all(&PARQUET_MAGIC)?; - Ok(file_metadata) + encoder = encoder.with_column_indexes(&self.column_indexes); + encoder = encoder.with_offset_indexes(&self.offset_indexes); + encoder.finish() } #[inline] @@ -816,13 +736,291 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> { } } +/// Writes `crate::file::metadata` structures to a thrift encdoded byte streams +/// +/// This structure handles the details of writing the various parts of parquet +/// metadata into a byte stream. It is used to write the metadata into a +/// parquet file and can also write metadata into other locations (such as a +/// store of bytes). +/// +/// This is somewhat trickey because the metadata is not store as a single inline +/// thrift struture. It can have several "out of band" structures such as the OffsetIndex +/// and BloomFilters which are stored separately whose locations are stored as offsets +struct ThriftMetadataWriter<'a, W: Write> { + buf: &'a mut TrackedWrite, + schema: &'a TypePtr, + schema_descr: &'a SchemaDescPtr, + row_groups: Vec, + column_indexes: Option<&'a [Vec>]>, + offset_indexes: Option<&'a [Vec>]>, + key_value_metadata: Option>, + created_by: Option, + writer_version: i32, +} + +impl<'a, W: Write> ThriftMetadataWriter<'a, W> { + /// Serialize all the offset index to the file + fn write_offset_indexes(&mut self, offset_indexes: &[Vec>]) -> Result<()> { + // iter row group + // iter each column + // write offset index to the file + for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { + match &offset_indexes[row_group_idx][column_idx] { + Some(offset_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + offset_index.write_to_out_protocol(&mut protocol)?; + let end_offset = self.buf.bytes_written(); + // set offset and index for offset index + column_metadata.offset_index_offset = Some(start_offset as i64); + column_metadata.offset_index_length = + Some((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + + /// Serialize all the column index to the file + fn write_column_indexes(&mut self, column_indexes: &[Vec>]) -> Result<()> { + // iter row group + // iter each column + // write column index to the file + for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { + match &column_indexes[row_group_idx][column_idx] { + Some(column_index) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + column_index.write_to_out_protocol(&mut protocol)?; + let end_offset = self.buf.bytes_written(); + // set offset and index for offset index + column_metadata.column_index_offset = Some(start_offset as i64); + column_metadata.column_index_length = + Some((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + + /// Assembles and writes the final metadata to self.buf + pub fn finish(mut self) -> Result { + let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum(); + + // Write column indexes and offset indexes + if let Some(column_indexes) = self.column_indexes { + self.write_column_indexes(column_indexes)?; + } + if let Some(offset_indexes) = self.offset_indexes { + self.write_offset_indexes(offset_indexes)?; + } + + // We only include ColumnOrder for leaf nodes. + // Currently only supported ColumnOrder is TypeDefinedOrder so we set this + // for all leaf nodes. + // Even if the column has an undefined sort order, such as INTERVAL, this + // is still technically the defined TYPEORDER so it should still be set. + let column_orders = (0..self.schema_descr.num_columns()) + .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder {})) + .collect(); + // This field is optional, perhaps in cases where no min/max fields are set + // in any Statistics or ColumnIndex object in the whole file. + // But for simplicity we always set this field. + let column_orders = Some(column_orders); + + let file_metadata = parquet::FileMetaData { + num_rows, + row_groups: self.row_groups, + key_value_metadata: self.key_value_metadata.clone(), + version: self.writer_version, + schema: types::to_thrift(self.schema.as_ref())?, + created_by: self.created_by.clone(), + column_orders, + encryption_algorithm: None, + footer_signing_key_metadata: None, + }; + + // Write file metadata + let start_pos = self.buf.bytes_written(); + { + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + file_metadata.write_to_out_protocol(&mut protocol)?; + } + let end_pos = self.buf.bytes_written(); + + // Write footer + let metadata_len = (end_pos - start_pos) as u32; + + self.buf.write_all(&metadata_len.to_le_bytes())?; + self.buf.write_all(&PARQUET_MAGIC)?; + Ok(file_metadata) + } + + pub(self) fn new( + buf: &'a mut TrackedWrite, + schema: &'a TypePtr, + schema_descr: &'a SchemaDescPtr, + row_groups: Vec, + created_by: Option, + writer_version: i32, + ) -> Self { + Self { + buf, + schema, + schema_descr, + row_groups, + column_indexes: None, + offset_indexes: None, + key_value_metadata: None, + created_by, + writer_version, + } + } + + pub fn with_column_indexes(mut self, column_indexes: &'a [Vec>]) -> Self { + self.column_indexes = Some(column_indexes); + self + } + + pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec>]) -> Self { + self.offset_indexes = Some(offset_indexes); + self + } + + pub fn with_key_value_metadata(mut self, key_value_metadata: Vec) -> Self { + self.key_value_metadata = Some(key_value_metadata); + self + } +} + +pub struct ParquetMetadataWriter<'a, W: Write> { + buf: TrackedWrite, + write_page_index: bool, + metadata: &'a ParquetMetaData, +} + +impl<'a, W: Write> ParquetMetadataWriter<'a, W> { + pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self { + Self { + buf: TrackedWrite::new(buf), + write_page_index: true, + metadata, + } + } + + pub fn write_page_index(&mut self, write_page_index: bool) -> &mut Self { + self.write_page_index = write_page_index; + self + } + + pub fn finish(&mut self) -> Result<()> { + let file_metadata = self.metadata.file_metadata(); + + let schema = Arc::new(file_metadata.schema().clone()); + let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone())); + let created_by = file_metadata.created_by().map(str::to_string); + + let row_groups = self + .metadata + .row_groups() + .iter() + .map(|rg| rg.to_thrift()) + .collect::>(); + + let key_value_metadata = file_metadata.key_value_metadata().cloned(); + + let column_indexes = self.convert_column_indexes(); + let offset_indexes = self.convert_offset_index(); + + let mut encoder = ThriftMetadataWriter::new( + &mut self.buf, + &schema, + &schema_descr, + row_groups, + created_by, + file_metadata.version(), + ); + encoder = encoder.with_column_indexes(&column_indexes); + encoder = encoder.with_offset_indexes(&offset_indexes); + if let Some(key_value_metadata) = key_value_metadata { + encoder = encoder.with_key_value_metadata(key_value_metadata); + } + encoder.finish()?; + + Ok(()) + } + + fn convert_column_indexes(&self) -> Vec>> { + if let Some(row_group_column_indexes) = self.metadata.column_index() { + (0..self.metadata.row_groups().len()) + .map(|rg_idx| { + let column_indexes = &row_group_column_indexes[rg_idx]; + column_indexes + .iter() + .map(|column_index| match column_index { + Index::NONE => None, + Index::BOOLEAN(column_index) => Some(column_index.to_thrift()), + Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()), + Index::DOUBLE(column_index) => Some(column_index.to_thrift()), + Index::FIXED_LEN_BYTE_ARRAY(column_index) => { + Some(column_index.to_thrift()) + } + Index::FLOAT(column_index) => Some(column_index.to_thrift()), + Index::INT32(column_index) => Some(column_index.to_thrift()), + Index::INT64(column_index) => Some(column_index.to_thrift()), + Index::INT96(column_index) => Some(column_index.to_thrift()), + }) + .collect() + }) + .collect() + } else { + // make a None for each row group, for each column + self.metadata + .row_groups() + .iter() + .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect()) + .collect() + } + } + + fn convert_offset_index(&self) -> Vec>> { + if let Some(row_group_offset_indexes) = self.metadata.offset_index() { + (0..self.metadata.row_groups().len()) + .map(|rg_idx| { + let offset_indexes = &row_group_offset_indexes[rg_idx]; + offset_indexes + .iter() + .map(|offset_index| Some(offset_index.to_thrift())) + .collect() + }) + .collect() + } else { + // make a None for each row group, for each column + self.metadata + .row_groups() + .iter() + .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect()) + .collect() + } + } +} + #[cfg(test)] mod tests { use super::*; - use bytes::Bytes; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + use arrow_schema::{DataType as ArrowDataType, Field, Schema}; + use bytes::{BufMut, Bytes, BytesMut}; use std::fs::File; + use crate::arrow::ArrowWriter; use crate::basic::{ ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type, }; @@ -830,6 +1028,7 @@ mod tests { use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::{BoolType, ByteArrayType, Int32Type}; + use crate::file::footer::parse_metadata; use crate::file::page_index::index::Index; use crate::file::properties::EnabledStatistics; use crate::file::serialized_reader::ReadOptionsBuilder; @@ -1855,6 +2054,222 @@ mod tests { assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); } + struct TestMetadata { + #[allow(dead_code)] + file_size: usize, + metadata: ParquetMetaData, + } + + fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata { + let mut buf = BytesMut::new().writer(); + let schema: Arc = Arc::new(Schema::new(vec![Field::new( + "a", + ArrowDataType::Int32, + true, + )])); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); + + let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + + let writer_props = match write_page_index { + true => WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + false => WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(), + }; + + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let data = buf.into_inner().freeze(); + + let reader_opts = match read_page_index { + true => ReadOptionsBuilder::new().with_page_index().build(), + false => ReadOptionsBuilder::new().build(), + }; + let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap(); + let metadata = reader.metadata().clone(); + TestMetadata { + file_size: data.len(), + metadata, + } + } + + fn has_page_index(metadata: &ParquetMetaData) -> bool { + match metadata.column_index() { + Some(column_index) => column_index + .iter() + .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))), + None => false, + } + } + + #[test] + fn test_roundtrip_parquet_metadata_without_page_index() { + // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so + // we at least test round trip without them + let metadata = get_test_metadata(false, false); + assert!(!has_page_index(&metadata.metadata)); + + let mut buf = BytesMut::new().writer(); + { + let mut writer = ParquetMetadataWriter::new(&mut buf, &metadata.metadata); + writer.finish().unwrap(); + } + + let data = buf.into_inner().freeze(); + + let decoded_metadata = parse_metadata(&data).unwrap(); + assert!(!has_page_index(&metadata.metadata)); + + assert_eq!(metadata.metadata, decoded_metadata); + } + + /// Temporary function so we can test loading metadata with page indexes + /// while we haven't fully figured out how to load it cleanly + #[cfg(feature = "async")] + async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { + use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; + use crate::errors::Result as ParquetResult; + use bytes::Bytes; + use futures::future::BoxFuture; + use futures::FutureExt; + use std::ops::Range; + + /// Adapt a `Bytes` to a `MetadataFetch` implementation. + struct AsyncBytes { + data: Bytes, + } + + impl AsyncBytes { + fn new(data: Bytes) -> Self { + Self { data } + } + } + + impl MetadataFetch for AsyncBytes { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + async move { Ok(self.data.slice(range.start..range.end)) }.boxed() + } + } + + /// A `MetadataFetch` implementation that reads from a subset of the full data + /// while accepting ranges that address the full data. + struct MaskedBytes { + inner: Box, + inner_range: Range, + } + + impl MaskedBytes { + fn new(inner: Box, inner_range: Range) -> Self { + Self { inner, inner_range } + } + } + + impl MetadataFetch for &mut MaskedBytes { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + let inner_range = self.inner_range.clone(); + println!("inner_range: {:?}", inner_range); + println!("range: {:?}", range); + assert!(inner_range.start <= range.start && inner_range.end >= range.end); + let range = + range.start - self.inner_range.start..range.end - self.inner_range.start; + self.inner.fetch(range) + } + } + + let metadata_length = data.len(); + let mut reader = MaskedBytes::new( + Box::new(AsyncBytes::new(data)), + file_size - metadata_length..file_size, + ); + let metadata = MetadataLoader::load(&mut reader, file_size, None) + .await + .unwrap(); + let loaded_metadata = metadata.finish(); + let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); + metadata.load_page_index(true, true).await.unwrap(); + metadata.finish() + } + + fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { + assert_eq!(left.column_descr(), right.column_descr()); + assert_eq!(left.encodings(), right.encodings()); + assert_eq!(left.num_values(), right.num_values()); + assert_eq!(left.compressed_size(), right.compressed_size()); + assert_eq!(left.data_page_offset(), right.data_page_offset()); + assert_eq!(left.statistics(), right.statistics()); + assert_eq!(left.offset_index_length(), right.offset_index_length()); + assert_eq!(left.column_index_length(), right.column_index_length()); + assert_eq!( + left.unencoded_byte_array_data_bytes(), + right.unencoded_byte_array_data_bytes() + ); + } + + fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) { + assert_eq!(left.num_rows(), right.num_rows()); + assert_eq!(left.file_offset(), right.file_offset()); + assert_eq!(left.total_byte_size(), right.total_byte_size()); + assert_eq!(left.schema_descr(), right.schema_descr()); + assert_eq!(left.num_columns(), right.num_columns()); + left.columns() + .iter() + .zip(right.columns().iter()) + .for_each(|(lc, rc)| { + check_columns_are_equivalent(lc, rc); + }); + } + + #[tokio::test] + #[cfg(feature = "async")] + async fn test_encode_parquet_metadata_with_page_index() { + // Create a ParquetMetadata with page index information + let metadata = get_test_metadata(true, true); + assert!(has_page_index(&metadata.metadata)); + + let mut buf = BytesMut::new().writer(); + { + let mut writer = ParquetMetadataWriter::new(&mut buf, &metadata.metadata); + writer.finish().unwrap(); + } + + let data = buf.into_inner().freeze(); + + let decoded_metadata = load_metadata_from_bytes(data.len(), data).await; + + // Because the page index offsets will differ, compare invariant parts of the metadata + assert_eq!( + metadata.metadata.file_metadata(), + decoded_metadata.file_metadata() + ); + assert_eq!( + metadata.metadata.column_index(), + decoded_metadata.column_index() + ); + assert_eq!( + metadata.metadata.offset_index(), + decoded_metadata.offset_index() + ); + assert_eq!( + metadata.metadata.num_row_groups(), + decoded_metadata.num_row_groups() + ); + + metadata + .metadata + .row_groups() + .iter() + .zip(decoded_metadata.row_groups().iter()) + .for_each(|(left, right)| { + check_row_groups_are_equivalent(left, right); + }); + } + #[test] fn test_byte_array_size_statistics() { let message_type = " diff --git a/parquet/src/thrift.rs b/parquet/src/thrift.rs index ad6c3f688002..abb2ac13c4ed 100644 --- a/parquet/src/thrift.rs +++ b/parquet/src/thrift.rs @@ -17,6 +17,7 @@ //! Custom thrift definitions +pub use thrift::protocol::TCompactOutputProtocol; use thrift::protocol::{ TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType,