From 3333aa6fdc5e9f4026acba313329fc5971114743 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 11 Sep 2024 20:17:02 +0800 Subject: [PATCH] reorder record batch --- crates/iceberg/src/arrow/reader.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 592945544..b1488a873 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -186,7 +186,7 @@ impl ArrowReader { // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response - let projection_mask = Self::get_arrow_projection_mask( + let (projection_mask, reorder) = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), @@ -234,6 +234,11 @@ impl ArrowReader { // to the requester. let mut record_batch_stream = record_batch_stream_builder.build()?; while let Some(batch) = record_batch_stream.try_next().await? { + let batch = if let Some(reorder) = reorder.as_ref() { + batch.project(&reorder).expect("must be able to reorder") + } else { + batch + }; tx.send(Ok(batch)).await? } @@ -261,9 +266,9 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - ) -> Result { + ) -> Result<(ProjectionMask, Option>)> { if field_ids.is_empty() { - Ok(ProjectionMask::all()) + Ok((ProjectionMask::all(), None)) } else { // Build the map between field id and column index in Parquet schema. let mut column_map = HashMap::new(); @@ -322,7 +327,12 @@ impl ArrowReader { )); } } - Ok(ProjectionMask::leaves(parquet_schema, indices)) + + let mut indexed_pairs: Vec<(usize, usize)> = indices.iter().cloned().enumerate().collect(); + indexed_pairs.sort_by_key(|&(_, index)| index); + let reorder_vec: Vec = indexed_pairs.iter().map(|&(pos, _)| pos).collect(); + + Ok((ProjectionMask::leaves(parquet_schema, indices), Some(reorder_vec))) } }