Skip to content

Commit

Permalink
Fix projection of nested fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Nov 30, 2023
1 parent 1d96883 commit 7ea1e1b
Showing 1 changed file with 72 additions and 48 deletions.
120 changes: 72 additions & 48 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema, SchemaRef};
use arrow_schema::{DataType, Fields, Schema, SchemaRef};

use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
Expand Down Expand Up @@ -388,14 +388,10 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
// Ensure schema of ParquetRecordBatchStream respects projection, and does
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
Some(DataType::Struct(fields)) => fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
self.projection.leaf_included(idx).then_some(field.clone())
})
.collect::<Vec<_>>(),
None => vec![],
Some(DataType::Struct(fields)) => {
fields.filter_leaves(|idx, _| self.projection.leaf_included(idx))
}
None => Fields::empty(),
_ => unreachable!("Must be Struct for root type"),
};
let schema = Arc::new(Schema::new(projected_fields));
Expand Down Expand Up @@ -874,14 +870,14 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{
Array, ArrayRef, Float32Array, Int32Array, Int8Array, RecordBatchReader, Scalar,
StringArray, StructArray, UInt64Array,
Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Fields, Schema};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use tempfile::tempfile;

#[derive(Clone)]
Expand Down Expand Up @@ -1608,31 +1604,52 @@ mod tests {

#[tokio::test]
async fn test_parquet_record_batch_stream_schema() {
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
schema.all_fields().iter().map(|f| f.name()).collect()
}

// ParquetRecordBatchReaderBuilder::schema differs from
// ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
// schema contents (in terms of custom metadata attached to schema, and fields
// returned). Test to ensure this remains consistent behaviour.
//
// Ensure same for asynchronous versions of the above.

// Prep data, for a schema with nested fields, with custom metadata
let mut metadata = HashMap::with_capacity(1);
metadata.insert("key".to_string(), "value".to_string());

let schema = Arc::new(
Schema::new(Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::UInt64, true),
Field::new("d", DataType::Float32, true),
]))
.with_metadata(metadata.clone()),
);
let nested_struct_array = StructArray::from(vec![
(
Arc::new(Field::new("d", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
),
(
Arc::new(Field::new("e", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
),
]);
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::UInt64, true)),
Arc::new(Field::new("b", DataType::UInt64, true)),
Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
),
(
Arc::new(Field::new("d", DataType::Float32, true)),
Arc::new(Float32Array::from(vec![1.0, 2.0])) as ArrayRef,
Arc::new(Field::new(
"c",
nested_struct_array.data_type().clone(),
true,
)),
Arc::new(nested_struct_array) as ArrayRef,
),
]);

let schema =
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
let record_batch = RecordBatch::from(struct_array)
.with_schema(schema.clone())
.unwrap();
Expand All @@ -1643,46 +1660,53 @@ mod tests {
writer.write(&record_batch).unwrap();
writer.close().unwrap();

// Test projecting for [], [0], [0, 1], [0, 1, 2]
for num_projected in 0..schema.fields().len() {
let mask_indices = 0..num_projected;
let all_fields = ["a", "b", "c", "d", "e"];
// (leaf indices in mask, expected names in output schema all fields)
let projections = [
(vec![], vec![]),
(vec![0], vec!["a"]),
(vec![0, 1], vec!["a", "b"]),
(vec![0, 1, 2], vec!["a", "b", "c", "d"]),
(vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
];

// Ensure we're consistent for each of these projections
for (indices, expected_projected_names) in projections {
let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
// Builder schema should preserve all fields and metadata
assert_eq!(get_all_field_names(&builder), all_fields);
assert_eq!(builder.metadata, metadata);
// Reader & batch schema should show only projected fields, and no metadata
assert_eq!(get_all_field_names(&reader), expected_projected_names);
assert_eq!(reader.metadata, HashMap::default());
assert_eq!(get_all_field_names(&batch), expected_projected_names);
assert_eq!(batch.metadata, HashMap::default());
};

let builder =
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let sync_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), mask_indices.clone());
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
let mut reader = builder.with_projection(mask).build().unwrap();
let sync_reader_schema = reader.schema();
let batch = reader.next().unwrap().unwrap();
let sync_batch_schema = batch.schema();
assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);

// Builder schema should preserve all fields and metadata
assert_eq!(sync_builder_schema.fields.len(), schema.fields().len());
assert_eq!(sync_builder_schema.metadata, metadata);
// Reader & batch schema should show only projected fields, and no metadata
assert_eq!(sync_reader_schema.fields.len(), num_projected);
assert_eq!(sync_reader_schema.metadata, HashMap::default());
assert_eq!(sync_batch_schema.fields.len(), num_projected);
assert_eq!(sync_batch_schema.metadata, HashMap::default());

// Ensure parity with async implementation
// asynchronous should be same
let file = tokio::fs::File::from(file.try_clone().unwrap());
let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
let async_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), mask_indices);
let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
let mut reader = builder.with_projection(mask).build().unwrap();
let async_reader_schema = reader.schema().clone();
let batch = reader.next().await.unwrap().unwrap();
let async_batch_schema = batch.schema();

// Builder schema should preserve all fields and metadata
assert_eq!(async_builder_schema.fields.len(), schema.fields().len());
assert_eq!(async_builder_schema.metadata, metadata);
// Reader & batch schema should show only projected fields, and no metadata
assert_eq!(async_reader_schema.fields.len(), num_projected);
assert_eq!(async_reader_schema.metadata, HashMap::default());
assert_eq!(async_batch_schema.fields.len(), num_projected);
assert_eq!(async_batch_schema.metadata, HashMap::default());
assert_schemas(
async_builder_schema,
async_reader_schema,
async_batch_schema,
);
}
}

Expand Down

0 comments on commit 7ea1e1b

Please sign in to comment.