diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index f86791c5c432..80a554026d9a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -90,7 +90,7 @@ use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Schema, SchemaRef}; +use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ @@ -388,14 +388,10 @@ impl ParquetRecordBatchStreamBuilder { // Ensure schema of ParquetRecordBatchStream respects projection, and does // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) { - Some(DataType::Struct(fields)) => fields - .iter() - .enumerate() - .filter_map(|(idx, field)| { - self.projection.leaf_included(idx).then_some(field.clone()) - }) - .collect::>(), - None => vec![], + Some(DataType::Struct(fields)) => { + fields.filter_leaves(|idx, _| self.projection.leaf_included(idx)) + } + None => Fields::empty(), _ => unreachable!("Must be Struct for root type"), }; let schema = Arc::new(Schema::new(projected_fields)); @@ -874,14 +870,14 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::{ - Array, ArrayRef, Float32Array, Int32Array, Int8Array, RecordBatchReader, Scalar, - StringArray, StructArray, UInt64Array, + Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray, + StructArray, UInt64Array, }; - use arrow_schema::{DataType, Field, Fields, Schema}; + use arrow_schema::{DataType, Field, Schema}; use futures::{StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; use std::collections::HashMap; - use std::sync::Mutex; + use std::sync::{Arc, Mutex}; use tempfile::tempfile; #[derive(Clone)] @@ -1608,31 +1604,52 @@ mod tests { #[tokio::test] async fn test_parquet_record_batch_stream_schema() { + fn get_all_field_names(schema: &Schema) -> Vec<&String> { + schema.all_fields().iter().map(|f| f.name()).collect() + } + + // ParquetRecordBatchReaderBuilder::schema differs from + // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned + // schema contents (in terms of custom metadata attached to schema, and fields + // returned). Test to ensure this remains consistent behaviour. + // + // Ensure same for asynchronous versions of the above. + + // Prep data, for a schema with nested fields, with custom metadata let mut metadata = HashMap::with_capacity(1); metadata.insert("key".to_string(), "value".to_string()); - let schema = Arc::new( - Schema::new(Fields::from(vec![ - Field::new("a", DataType::Int32, true), - Field::new("c", DataType::UInt64, true), - Field::new("d", DataType::Float32, true), - ])) - .with_metadata(metadata.clone()), - ); + let nested_struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("d", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + ), + ( + Arc::new(Field::new("e", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef, + ), + ]); let struct_array = StructArray::from(vec![ ( Arc::new(Field::new("a", DataType::Int32, true)), Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef, ), ( - Arc::new(Field::new("c", DataType::UInt64, true)), + Arc::new(Field::new("b", DataType::UInt64, true)), Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef, ), ( - Arc::new(Field::new("d", DataType::Float32, true)), - Arc::new(Float32Array::from(vec![1.0, 2.0])) as ArrayRef, + Arc::new(Field::new( + "c", + nested_struct_array.data_type().clone(), + true, + )), + Arc::new(nested_struct_array) as ArrayRef, ), ]); + + let schema = + Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone())); let record_batch = RecordBatch::from(struct_array) .with_schema(schema.clone()) .unwrap(); @@ -1643,46 +1660,53 @@ mod tests { writer.write(&record_batch).unwrap(); writer.close().unwrap(); - // Test projecting for [], [0], [0, 1], [0, 1, 2] - for num_projected in 0..schema.fields().len() { - let mask_indices = 0..num_projected; + let all_fields = ["a", "b", "c", "d", "e"]; + // (leaf indices in mask, expected names in output schema all fields) + let projections = [ + (vec![], vec![]), + (vec![0], vec!["a"]), + (vec![0, 1], vec!["a", "b"]), + (vec![0, 1, 2], vec!["a", "b", "c", "d"]), + (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]), + ]; + + // Ensure we're consistent for each of these projections + for (indices, expected_projected_names) in projections { + let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| { + // Builder schema should preserve all fields and metadata + assert_eq!(get_all_field_names(&builder), all_fields); + assert_eq!(builder.metadata, metadata); + // Reader & batch schema should show only projected fields, and no metadata + assert_eq!(get_all_field_names(&reader), expected_projected_names); + assert_eq!(reader.metadata, HashMap::default()); + assert_eq!(get_all_field_names(&batch), expected_projected_names); + assert_eq!(batch.metadata, HashMap::default()); + }; let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap(); let sync_builder_schema = builder.schema().clone(); - let mask = ProjectionMask::leaves(builder.parquet_schema(), mask_indices.clone()); + let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone()); let mut reader = builder.with_projection(mask).build().unwrap(); let sync_reader_schema = reader.schema(); let batch = reader.next().unwrap().unwrap(); let sync_batch_schema = batch.schema(); + assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema); - // Builder schema should preserve all fields and metadata - assert_eq!(sync_builder_schema.fields.len(), schema.fields().len()); - assert_eq!(sync_builder_schema.metadata, metadata); - // Reader & batch schema should show only projected fields, and no metadata - assert_eq!(sync_reader_schema.fields.len(), num_projected); - assert_eq!(sync_reader_schema.metadata, HashMap::default()); - assert_eq!(sync_batch_schema.fields.len(), num_projected); - assert_eq!(sync_batch_schema.metadata, HashMap::default()); - - // Ensure parity with async implementation + // asynchronous should be same let file = tokio::fs::File::from(file.try_clone().unwrap()); let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); let async_builder_schema = builder.schema().clone(); - let mask = ProjectionMask::leaves(builder.parquet_schema(), mask_indices); + let mask = ProjectionMask::leaves(builder.parquet_schema(), indices); let mut reader = builder.with_projection(mask).build().unwrap(); let async_reader_schema = reader.schema().clone(); let batch = reader.next().await.unwrap().unwrap(); let async_batch_schema = batch.schema(); - - // Builder schema should preserve all fields and metadata - assert_eq!(async_builder_schema.fields.len(), schema.fields().len()); - assert_eq!(async_builder_schema.metadata, metadata); - // Reader & batch schema should show only projected fields, and no metadata - assert_eq!(async_reader_schema.fields.len(), num_projected); - assert_eq!(async_reader_schema.metadata, HashMap::default()); - assert_eq!(async_batch_schema.fields.len(), num_projected); - assert_eq!(async_batch_schema.metadata, HashMap::default()); + assert_schemas( + async_builder_schema, + async_reader_schema, + async_batch_schema, + ); } }