diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index b9e9d2898459..77de83994078 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader { } impl RecordBatchReader for ParquetRecordBatchReader { + /// Returns the projected [`SchemaRef`] for reading the parquet file. + /// + /// Note that the schema metadata will be stripped here. See + /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired. fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 04383bb51bda..80a554026d9a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -90,7 +90,7 @@ use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ @@ -385,13 +385,24 @@ impl ParquetRecordBatchStreamBuilder { offset: self.offset, }; + // Ensure schema of ParquetRecordBatchStream respects projection, and does + // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) + let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) { + Some(DataType::Struct(fields)) => { + fields.filter_leaves(|idx, _| self.projection.leaf_included(idx)) + } + None => Fields::empty(), + _ => unreachable!("Must be Struct for root type"), + }; + let schema = Arc::new(Schema::new(projected_fields)); + Ok(ParquetRecordBatchStream { metadata: self.metadata, batch_size, row_groups, projection: self.projection, selection: self.selection, - schema: self.schema, + schema, reader: Some(reader), state: StreamState::Init, }) @@ -572,7 +583,10 @@ impl std::fmt::Debug for ParquetRecordBatchStream { } impl ParquetRecordBatchStream { - /// Returns the [`SchemaRef`] for this parquet file + /// Returns the projected [`SchemaRef`] for reading the parquet file. + /// + /// Note that the schema metadata will be stripped here. See + /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired. pub fn schema(&self) -> &SchemaRef { &self.schema } @@ -855,11 +869,15 @@ mod tests { use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; - use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array}; + use arrow_array::{ + Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray, + StructArray, UInt64Array, + }; use arrow_schema::{DataType, Field, Schema}; use futures::{StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; - use std::sync::Mutex; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; use tempfile::tempfile; #[derive(Clone)] @@ -1584,6 +1602,114 @@ mod tests { test_get_row_group_column_bloom_filter(data, false).await; } + #[tokio::test] + async fn test_parquet_record_batch_stream_schema() { + fn get_all_field_names(schema: &Schema) -> Vec<&String> { + schema.all_fields().iter().map(|f| f.name()).collect() + } + + // ParquetRecordBatchReaderBuilder::schema differs from + // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned + // schema contents (in terms of custom metadata attached to schema, and fields + // returned). Test to ensure this remains consistent behaviour. + // + // Ensure same for asynchronous versions of the above. + + // Prep data, for a schema with nested fields, with custom metadata + let mut metadata = HashMap::with_capacity(1); + metadata.insert("key".to_string(), "value".to_string()); + + let nested_struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("d", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + ), + ( + Arc::new(Field::new("e", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef, + ), + ]); + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef, + ), + ( + Arc::new(Field::new("b", DataType::UInt64, true)), + Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef, + ), + ( + Arc::new(Field::new( + "c", + nested_struct_array.data_type().clone(), + true, + )), + Arc::new(nested_struct_array) as ArrayRef, + ), + ]); + + let schema = + Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone())); + let record_batch = RecordBatch::from(struct_array) + .with_schema(schema.clone()) + .unwrap(); + + // Write parquet with custom metadata in schema + let mut file = tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + writer.write(&record_batch).unwrap(); + writer.close().unwrap(); + + let all_fields = ["a", "b", "c", "d", "e"]; + // (leaf indices in mask, expected names in output schema all fields) + let projections = [ + (vec![], vec![]), + (vec![0], vec!["a"]), + (vec![0, 1], vec!["a", "b"]), + (vec![0, 1, 2], vec!["a", "b", "c", "d"]), + (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]), + ]; + + // Ensure we're consistent for each of these projections + for (indices, expected_projected_names) in projections { + let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| { + // Builder schema should preserve all fields and metadata + assert_eq!(get_all_field_names(&builder), all_fields); + assert_eq!(builder.metadata, metadata); + // Reader & batch schema should show only projected fields, and no metadata + assert_eq!(get_all_field_names(&reader), expected_projected_names); + assert_eq!(reader.metadata, HashMap::default()); + assert_eq!(get_all_field_names(&batch), expected_projected_names); + assert_eq!(batch.metadata, HashMap::default()); + }; + + let builder = + ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap(); + let sync_builder_schema = builder.schema().clone(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone()); + let mut reader = builder.with_projection(mask).build().unwrap(); + let sync_reader_schema = reader.schema(); + let batch = reader.next().unwrap().unwrap(); + let sync_batch_schema = batch.schema(); + assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema); + + // asynchronous should be same + let file = tokio::fs::File::from(file.try_clone().unwrap()); + let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); + let async_builder_schema = builder.schema().clone(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), indices); + let mut reader = builder.with_projection(mask).build().unwrap(); + let async_reader_schema = reader.schema().clone(); + let batch = reader.next().await.unwrap().unwrap(); + let async_batch_schema = batch.schema(); + assert_schemas( + async_builder_schema, + async_reader_schema, + async_batch_schema, + ); + } + } + #[tokio::test] async fn test_get_row_group_column_bloom_filter_with_length() { // convert to new parquet file with bloom_filter_length