Struct iceberg::arrow::ArrowReader
source · pub struct ArrowReader { /* private fields */ }
Expand description
Reads data from Parquet files
-Implementations§
source§impl ArrowReader
impl ArrowReader
sourcepub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream>
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream>
Take a stream of FileScanTasks and reads all the files. +
Struct iceberg::arrow::ArrowReader
source · pub struct ArrowReader { /* private fields */ }
Expand description
Reads data from Parquet files
+Implementations§
source§impl ArrowReader
impl ArrowReader
sourcepub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream>
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream>
Take a stream of FileScanTasks and reads all the files. Returns a stream of Arrow RecordBatches containing the data from the files
-Trait Implementations§
source§impl Clone for ArrowReader
impl Clone for ArrowReader
source§fn clone(&self) -> ArrowReader
fn clone(&self) -> ArrowReader
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreAuto Trait Implementations§
impl Freeze for ArrowReader
impl !RefUnwindSafe for ArrowReader
impl Send for ArrowReader
impl Sync for ArrowReader
impl Unpin for ArrowReader
impl !UnwindSafe for ArrowReader
Blanket Implementations§
Trait Implementations§
source§impl Clone for ArrowReader
impl Clone for ArrowReader
source§fn clone(&self) -> ArrowReader
fn clone(&self) -> ArrowReader
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreAuto Trait Implementations§
impl Freeze for ArrowReader
impl !RefUnwindSafe for ArrowReader
impl Send for ArrowReader
impl Sync for ArrowReader
impl Unpin for ArrowReader
impl !UnwindSafe for ArrowReader
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§fn conv<T>(self) -> Twhere
diff --git a/api/iceberg/arrow/struct.ArrowReaderBuilder.html b/api/iceberg/arrow/struct.ArrowReaderBuilder.html
index 65cc711e2..7d064faa0 100644
--- a/api/iceberg/arrow/struct.ArrowReaderBuilder.html
+++ b/api/iceberg/arrow/struct.ArrowReaderBuilder.html
@@ -1,13 +1,13 @@
-ArrowReaderBuilder in iceberg::arrow - Rust Struct iceberg::arrow::ArrowReaderBuilder
source · pub struct ArrowReaderBuilder { /* private fields */ }
Expand description
Builder to create ArrowReader
-Implementations§
source§impl ArrowReaderBuilder
sourcepub fn with_data_file_concurrency_limit(self, val: usize) -> Self
Sets the max number of in flight data files that are being fetched
-sourcepub fn with_batch_size(self, batch_size: usize) -> Self
Sets the desired size of batches in the response
+
ArrowReaderBuilder in iceberg::arrow - Rust Struct iceberg::arrow::ArrowReaderBuilder
source · pub struct ArrowReaderBuilder { /* private fields */ }
Expand description
Builder to create ArrowReader
+Implementations§
source§impl ArrowReaderBuilder
sourcepub fn with_data_file_concurrency_limit(self, val: usize) -> Self
Sets the max number of in flight data files that are being fetched
+sourcepub fn with_batch_size(self, batch_size: usize) -> Self
Sets the desired size of batches in the response
to something other than the default
-sourcepub fn with_row_group_filtering_enabled(
+
sourcepub fn with_row_group_filtering_enabled(
self,
row_group_filtering_enabled: bool,
) -> Self
Determines whether to enable row group filtering.
-sourcepub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
Determines whether to enable row selection.
-sourcepub fn build(self) -> ArrowReader
Build the ArrowReader.
+sourcepub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
Determines whether to enable row selection.
+sourcepub fn build(self) -> ArrowReader
Build the ArrowReader.
Auto Trait Implementations§
§impl Freeze for ArrowReaderBuilder
§impl !RefUnwindSafe for ArrowReaderBuilder
§impl Send for ArrowReaderBuilder
§impl Sync for ArrowReaderBuilder
§impl Unpin for ArrowReaderBuilder
§impl !UnwindSafe for ArrowReaderBuilder
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
diff --git a/api/src/iceberg/arrow/reader.rs.html b/api/src/iceberg/arrow/reader.rs.html
index d51cf8fad..4d2794226 100644
--- a/api/src/iceberg/arrow/reader.rs.html
+++ b/api/src/iceberg/arrow/reader.rs.html
@@ -1209,6 +1209,105 @@
1209
1210
1211
+1212
+1213
+1214
+1215
+1216
+1217
+1218
+1219
+1220
+1221
+1222
+1223
+1224
+1225
+1226
+1227
+1228
+1229
+1230
+1231
+1232
+1233
+1234
+1235
+1236
+1237
+1238
+1239
+1240
+1241
+1242
+1243
+1244
+1245
+1246
+1247
+1248
+1249
+1250
+1251
+1252
+1253
+1254
+1255
+1256
+1257
+1258
+1259
+1260
+1261
+1262
+1263
+1264
+1265
+1266
+1267
+1268
+1269
+1270
+1271
+1272
+1273
+1274
+1275
+1276
+1277
+1278
+1279
+1280
+1281
+1282
+1283
+1284
+1285
+1286
+1287
+1288
+1289
+1290
+1291
+1292
+1293
+1294
+1295
+1296
+1297
+1298
+1299
+1300
+1301
+1302
+1303
+1304
+1305
+1306
+1307
+1308
+1309
+1310
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -1236,7 +1335,9 @@
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_schema::{
+ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
@@ -1539,22 +1640,27 @@
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
- let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
- fields.filter_leaves(|idx, field| {
- let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
- if field_id.is_none() {
- return false;
- }
-
- let field_id = i32::from_str(field_id.unwrap());
- if field_id.is_err() {
- return false;
- }
- let field_id = field_id.unwrap();
+ // Pre-project only the fields that have been selected, possibly avoiding converting
+ // some Arrow types that are not yet supported.
+ let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+ let projected_arrow_schema = ArrowSchema::new_with_metadata(
+ fields.filter_leaves(|_, f| {
+ f.metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|field_id| i32::from_str(field_id).ok())
+ .map_or(false, |field_id| {
+ projected_fields.insert((*f).clone(), field_id);
+ field_ids.contains(&field_id)
+ })
+ }),
+ arrow_schema.metadata().clone(),
+ );
+ let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
- if !field_ids.contains(&field_id) {
+ fields.filter_leaves(|idx, field| {
+ let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
- }
+ };
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -2339,13 +2445,20 @@
#[cfg(test)]
mod tests {
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
- use crate::arrow::reader::CollectFieldIdVisitor;
+ use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
+ use parquet::arrow::ProjectionMask;
+ use parquet::schema::parser::parse_message_type;
+ use parquet::schema::types::SchemaDescriptor;
+
+ use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
+ use crate::arrow::ArrowReader;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
+ use crate::ErrorKind;
fn table_schema_simple() -> SchemaRef {
Arc::new(
@@ -2419,5 +2532,90 @@
assert_eq!(visitor.field_ids, expected);
}
+
+ #[test]
+ fn test_arrow_projection_mask() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_fields(vec![
+ NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(
+ 3,
+ "c3",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 38,
+ scale: 3,
+ }),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ // Type not supported
+ Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
+ ),
+ // Precision is beyond the supported range
+ Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ ]));
+
+ let message_type = "
+message schema {
+ required binary c1 (STRING) = 1;
+ optional int32 c2 (INTEGER(8,true)) = 2;
+ optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
+}
+ ";
+ let parquet_type = parse_message_type(message_type).expect("should parse schema");
+ let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
+
+ // Try projecting the fields c2 and c3 with the unsupported data types
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 2, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
+ );
+
+ // Omitting field c2, we still get an error due to c3 being selected
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
+ );
+
+ // Finally avoid selecting fields with unsupported data types
+ let mask =
+ ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
+ .expect("Some ProjectionMask");
+ assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
+ }
}
\ No newline at end of file
fn conv<T>(self) -> Twhere
diff --git a/api/iceberg/arrow/struct.ArrowReaderBuilder.html b/api/iceberg/arrow/struct.ArrowReaderBuilder.html
index 65cc711e2..7d064faa0 100644
--- a/api/iceberg/arrow/struct.ArrowReaderBuilder.html
+++ b/api/iceberg/arrow/struct.ArrowReaderBuilder.html
@@ -1,13 +1,13 @@
-ArrowReaderBuilder in iceberg::arrow - Rust Struct iceberg::arrow::ArrowReaderBuilder
source · pub struct ArrowReaderBuilder { /* private fields */ }
Expand description
Builder to create ArrowReader
-Implementations§
source§impl ArrowReaderBuilder
sourcepub fn with_data_file_concurrency_limit(self, val: usize) -> Self
Sets the max number of in flight data files that are being fetched
-sourcepub fn with_batch_size(self, batch_size: usize) -> Self
Sets the desired size of batches in the response
+
ArrowReaderBuilder in iceberg::arrow - Rust Struct iceberg::arrow::ArrowReaderBuilder
source · pub struct ArrowReaderBuilder { /* private fields */ }
Expand description
Builder to create ArrowReader
+Implementations§
source§impl ArrowReaderBuilder
sourcepub fn with_data_file_concurrency_limit(self, val: usize) -> Self
Sets the max number of in flight data files that are being fetched
+sourcepub fn with_batch_size(self, batch_size: usize) -> Self
Sets the desired size of batches in the response
to something other than the default
-sourcepub fn with_row_group_filtering_enabled(
+
sourcepub fn with_row_group_filtering_enabled(
self,
row_group_filtering_enabled: bool,
) -> Self
Determines whether to enable row group filtering.
-sourcepub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
Determines whether to enable row selection.
-sourcepub fn build(self) -> ArrowReader
Build the ArrowReader.
+sourcepub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
Determines whether to enable row selection.
+sourcepub fn build(self) -> ArrowReader
Build the ArrowReader.
Auto Trait Implementations§
§impl Freeze for ArrowReaderBuilder
§impl !RefUnwindSafe for ArrowReaderBuilder
§impl Send for ArrowReaderBuilder
§impl Sync for ArrowReaderBuilder
§impl Unpin for ArrowReaderBuilder
§impl !UnwindSafe for ArrowReaderBuilder
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
diff --git a/api/src/iceberg/arrow/reader.rs.html b/api/src/iceberg/arrow/reader.rs.html
index d51cf8fad..4d2794226 100644
--- a/api/src/iceberg/arrow/reader.rs.html
+++ b/api/src/iceberg/arrow/reader.rs.html
@@ -1209,6 +1209,105 @@
1209
1210
1211
+1212
+1213
+1214
+1215
+1216
+1217
+1218
+1219
+1220
+1221
+1222
+1223
+1224
+1225
+1226
+1227
+1228
+1229
+1230
+1231
+1232
+1233
+1234
+1235
+1236
+1237
+1238
+1239
+1240
+1241
+1242
+1243
+1244
+1245
+1246
+1247
+1248
+1249
+1250
+1251
+1252
+1253
+1254
+1255
+1256
+1257
+1258
+1259
+1260
+1261
+1262
+1263
+1264
+1265
+1266
+1267
+1268
+1269
+1270
+1271
+1272
+1273
+1274
+1275
+1276
+1277
+1278
+1279
+1280
+1281
+1282
+1283
+1284
+1285
+1286
+1287
+1288
+1289
+1290
+1291
+1292
+1293
+1294
+1295
+1296
+1297
+1298
+1299
+1300
+1301
+1302
+1303
+1304
+1305
+1306
+1307
+1308
+1309
+1310
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -1236,7 +1335,9 @@
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_schema::{
+ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
@@ -1539,22 +1640,27 @@
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
- let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
- fields.filter_leaves(|idx, field| {
- let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
- if field_id.is_none() {
- return false;
- }
-
- let field_id = i32::from_str(field_id.unwrap());
- if field_id.is_err() {
- return false;
- }
- let field_id = field_id.unwrap();
+ // Pre-project only the fields that have been selected, possibly avoiding converting
+ // some Arrow types that are not yet supported.
+ let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+ let projected_arrow_schema = ArrowSchema::new_with_metadata(
+ fields.filter_leaves(|_, f| {
+ f.metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|field_id| i32::from_str(field_id).ok())
+ .map_or(false, |field_id| {
+ projected_fields.insert((*f).clone(), field_id);
+ field_ids.contains(&field_id)
+ })
+ }),
+ arrow_schema.metadata().clone(),
+ );
+ let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
- if !field_ids.contains(&field_id) {
+ fields.filter_leaves(|idx, field| {
+ let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
- }
+ };
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -2339,13 +2445,20 @@
#[cfg(test)]
mod tests {
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
- use crate::arrow::reader::CollectFieldIdVisitor;
+ use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
+ use parquet::arrow::ProjectionMask;
+ use parquet::schema::parser::parse_message_type;
+ use parquet::schema::types::SchemaDescriptor;
+
+ use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
+ use crate::arrow::ArrowReader;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
+ use crate::ErrorKind;
fn table_schema_simple() -> SchemaRef {
Arc::new(
@@ -2419,5 +2532,90 @@
assert_eq!(visitor.field_ids, expected);
}
+
+ #[test]
+ fn test_arrow_projection_mask() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_fields(vec![
+ NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(
+ 3,
+ "c3",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 38,
+ scale: 3,
+ }),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ // Type not supported
+ Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
+ ),
+ // Precision is beyond the supported range
+ Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ ]));
+
+ let message_type = "
+message schema {
+ required binary c1 (STRING) = 1;
+ optional int32 c2 (INTEGER(8,true)) = 2;
+ optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
+}
+ ";
+ let parquet_type = parse_message_type(message_type).expect("should parse schema");
+ let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
+
+ // Try projecting the fields c2 and c3 with the unsupported data types
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 2, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
+ );
+
+ // Omitting field c2, we still get an error due to c3 being selected
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
+ );
+
+ // Finally avoid selecting fields with unsupported data types
+ let mask =
+ ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
+ .expect("Some ProjectionMask");
+ assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
+ }
}
\ No newline at end of file
Struct iceberg::arrow::ArrowReaderBuilder
source · pub struct ArrowReaderBuilder { /* private fields */ }
Expand description
Builder to create ArrowReader
-Implementations§
source§impl ArrowReaderBuilder
impl ArrowReaderBuilder
sourcepub fn with_data_file_concurrency_limit(self, val: usize) -> Self
pub fn with_data_file_concurrency_limit(self, val: usize) -> Self
Sets the max number of in flight data files that are being fetched
-sourcepub fn with_batch_size(self, batch_size: usize) -> Self
pub fn with_batch_size(self, batch_size: usize) -> Self
Sets the desired size of batches in the response +
Struct iceberg::arrow::ArrowReaderBuilder
source · pub struct ArrowReaderBuilder { /* private fields */ }
Expand description
Builder to create ArrowReader
+Implementations§
source§impl ArrowReaderBuilder
impl ArrowReaderBuilder
sourcepub fn with_data_file_concurrency_limit(self, val: usize) -> Self
pub fn with_data_file_concurrency_limit(self, val: usize) -> Self
Sets the max number of in flight data files that are being fetched
+sourcepub fn with_batch_size(self, batch_size: usize) -> Self
pub fn with_batch_size(self, batch_size: usize) -> Self
Sets the desired size of batches in the response to something other than the default
-sourcepub fn with_row_group_filtering_enabled(
+
pub fn with_row_group_filtering_enabled( +
sourcepub fn with_row_group_filtering_enabled(
self,
row_group_filtering_enabled: bool,
) -> Self
pub fn with_row_group_filtering_enabled( self, row_group_filtering_enabled: bool, ) -> Self
Determines whether to enable row group filtering.
-sourcepub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
pub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
Determines whether to enable row selection.
-sourcepub fn build(self) -> ArrowReader
pub fn build(self) -> ArrowReader
Build the ArrowReader.
+sourcepub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
pub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self
Determines whether to enable row selection.
+sourcepub fn build(self) -> ArrowReader
pub fn build(self) -> ArrowReader
Build the ArrowReader.
Auto Trait Implementations§
impl Freeze for ArrowReaderBuilder
impl !RefUnwindSafe for ArrowReaderBuilder
impl Send for ArrowReaderBuilder
impl Sync for ArrowReaderBuilder
impl Unpin for ArrowReaderBuilder
impl !UnwindSafe for ArrowReaderBuilder
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
diff --git a/api/src/iceberg/arrow/reader.rs.html b/api/src/iceberg/arrow/reader.rs.html
index d51cf8fad..4d2794226 100644
--- a/api/src/iceberg/arrow/reader.rs.html
+++ b/api/src/iceberg/arrow/reader.rs.html
@@ -1209,6 +1209,105 @@
1209
1210
1211
+1212
+1213
+1214
+1215
+1216
+1217
+1218
+1219
+1220
+1221
+1222
+1223
+1224
+1225
+1226
+1227
+1228
+1229
+1230
+1231
+1232
+1233
+1234
+1235
+1236
+1237
+1238
+1239
+1240
+1241
+1242
+1243
+1244
+1245
+1246
+1247
+1248
+1249
+1250
+1251
+1252
+1253
+1254
+1255
+1256
+1257
+1258
+1259
+1260
+1261
+1262
+1263
+1264
+1265
+1266
+1267
+1268
+1269
+1270
+1271
+1272
+1273
+1274
+1275
+1276
+1277
+1278
+1279
+1280
+1281
+1282
+1283
+1284
+1285
+1286
+1287
+1288
+1289
+1290
+1291
+1292
+1293
+1294
+1295
+1296
+1297
+1298
+1299
+1300
+1301
+1302
+1303
+1304
+1305
+1306
+1307
+1308
+1309
+1310
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -1236,7 +1335,9 @@
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_schema::{
+ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
@@ -1539,22 +1640,27 @@
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
- let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
- fields.filter_leaves(|idx, field| {
- let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
- if field_id.is_none() {
- return false;
- }
-
- let field_id = i32::from_str(field_id.unwrap());
- if field_id.is_err() {
- return false;
- }
- let field_id = field_id.unwrap();
+ // Pre-project only the fields that have been selected, possibly avoiding converting
+ // some Arrow types that are not yet supported.
+ let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+ let projected_arrow_schema = ArrowSchema::new_with_metadata(
+ fields.filter_leaves(|_, f| {
+ f.metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|field_id| i32::from_str(field_id).ok())
+ .map_or(false, |field_id| {
+ projected_fields.insert((*f).clone(), field_id);
+ field_ids.contains(&field_id)
+ })
+ }),
+ arrow_schema.metadata().clone(),
+ );
+ let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
- if !field_ids.contains(&field_id) {
+ fields.filter_leaves(|idx, field| {
+ let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
- }
+ };
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -2339,13 +2445,20 @@
#[cfg(test)]
mod tests {
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
- use crate::arrow::reader::CollectFieldIdVisitor;
+ use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
+ use parquet::arrow::ProjectionMask;
+ use parquet::schema::parser::parse_message_type;
+ use parquet::schema::types::SchemaDescriptor;
+
+ use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
+ use crate::arrow::ArrowReader;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
+ use crate::ErrorKind;
fn table_schema_simple() -> SchemaRef {
Arc::new(
@@ -2419,5 +2532,90 @@
assert_eq!(visitor.field_ids, expected);
}
+
+ #[test]
+ fn test_arrow_projection_mask() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_fields(vec![
+ NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(
+ 3,
+ "c3",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 38,
+ scale: 3,
+ }),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ // Type not supported
+ Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
+ ),
+ // Precision is beyond the supported range
+ Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ ]));
+
+ let message_type = "
+message schema {
+ required binary c1 (STRING) = 1;
+ optional int32 c2 (INTEGER(8,true)) = 2;
+ optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
+}
+ ";
+ let parquet_type = parse_message_type(message_type).expect("should parse schema");
+ let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
+
+ // Try projecting the fields c2 and c3 with the unsupported data types
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 2, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
+ );
+
+ // Omitting field c2, we still get an error due to c3 being selected
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
+ );
+
+ // Finally avoid selecting fields with unsupported data types
+ let mask =
+ ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
+ .expect("Some ProjectionMask");
+ assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
+ }
}
impl<T> BorrowMut<T> for Twhere
diff --git a/api/src/iceberg/arrow/reader.rs.html b/api/src/iceberg/arrow/reader.rs.html
index d51cf8fad..4d2794226 100644
--- a/api/src/iceberg/arrow/reader.rs.html
+++ b/api/src/iceberg/arrow/reader.rs.html
@@ -1209,6 +1209,105 @@
1209
1210
1211
+1212
+1213
+1214
+1215
+1216
+1217
+1218
+1219
+1220
+1221
+1222
+1223
+1224
+1225
+1226
+1227
+1228
+1229
+1230
+1231
+1232
+1233
+1234
+1235
+1236
+1237
+1238
+1239
+1240
+1241
+1242
+1243
+1244
+1245
+1246
+1247
+1248
+1249
+1250
+1251
+1252
+1253
+1254
+1255
+1256
+1257
+1258
+1259
+1260
+1261
+1262
+1263
+1264
+1265
+1266
+1267
+1268
+1269
+1270
+1271
+1272
+1273
+1274
+1275
+1276
+1277
+1278
+1279
+1280
+1281
+1282
+1283
+1284
+1285
+1286
+1287
+1288
+1289
+1290
+1291
+1292
+1293
+1294
+1295
+1296
+1297
+1298
+1299
+1300
+1301
+1302
+1303
+1304
+1305
+1306
+1307
+1308
+1309
+1310
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -1236,7 +1335,9 @@
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_schema::{
+ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
@@ -1539,22 +1640,27 @@
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
- let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
- fields.filter_leaves(|idx, field| {
- let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
- if field_id.is_none() {
- return false;
- }
-
- let field_id = i32::from_str(field_id.unwrap());
- if field_id.is_err() {
- return false;
- }
- let field_id = field_id.unwrap();
+ // Pre-project only the fields that have been selected, possibly avoiding converting
+ // some Arrow types that are not yet supported.
+ let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+ let projected_arrow_schema = ArrowSchema::new_with_metadata(
+ fields.filter_leaves(|_, f| {
+ f.metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|field_id| i32::from_str(field_id).ok())
+ .map_or(false, |field_id| {
+ projected_fields.insert((*f).clone(), field_id);
+ field_ids.contains(&field_id)
+ })
+ }),
+ arrow_schema.metadata().clone(),
+ );
+ let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
- if !field_ids.contains(&field_id) {
+ fields.filter_leaves(|idx, field| {
+ let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
- }
+ };
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -2339,13 +2445,20 @@
#[cfg(test)]
mod tests {
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
- use crate::arrow::reader::CollectFieldIdVisitor;
+ use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
+ use parquet::arrow::ProjectionMask;
+ use parquet::schema::parser::parse_message_type;
+ use parquet::schema::types::SchemaDescriptor;
+
+ use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
+ use crate::arrow::ArrowReader;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
+ use crate::ErrorKind;
fn table_schema_simple() -> SchemaRef {
Arc::new(
@@ -2419,5 +2532,90 @@
assert_eq!(visitor.field_ids, expected);
}
+
+ #[test]
+ fn test_arrow_projection_mask() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_fields(vec![
+ NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(
+ 3,
+ "c3",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 38,
+ scale: 3,
+ }),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ // Type not supported
+ Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
+ ),
+ // Precision is beyond the supported range
+ Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ ]));
+
+ let message_type = "
+message schema {
+ required binary c1 (STRING) = 1;
+ optional int32 c2 (INTEGER(8,true)) = 2;
+ optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
+}
+ ";
+ let parquet_type = parse_message_type(message_type).expect("should parse schema");
+ let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
+
+ // Try projecting the fields c2 and c3 with the unsupported data types
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 2, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
+ );
+
+ // Omitting field c2, we still get an error due to c3 being selected
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
+ );
+
+ // Finally avoid selecting fields with unsupported data types
+ let mask =
+ ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
+ .expect("Some ProjectionMask");
+ assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
+ }
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -1236,7 +1335,9 @@
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_schema::{
+ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
@@ -1539,22 +1640,27 @@
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
- let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
- fields.filter_leaves(|idx, field| {
- let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
- if field_id.is_none() {
- return false;
- }
-
- let field_id = i32::from_str(field_id.unwrap());
- if field_id.is_err() {
- return false;
- }
- let field_id = field_id.unwrap();
+ // Pre-project only the fields that have been selected, possibly avoiding converting
+ // some Arrow types that are not yet supported.
+ let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+ let projected_arrow_schema = ArrowSchema::new_with_metadata(
+ fields.filter_leaves(|_, f| {
+ f.metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|field_id| i32::from_str(field_id).ok())
+ .map_or(false, |field_id| {
+ projected_fields.insert((*f).clone(), field_id);
+ field_ids.contains(&field_id)
+ })
+ }),
+ arrow_schema.metadata().clone(),
+ );
+ let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
- if !field_ids.contains(&field_id) {
+ fields.filter_leaves(|idx, field| {
+ let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
- }
+ };
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -2339,13 +2445,20 @@
#[cfg(test)]
mod tests {
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
- use crate::arrow::reader::CollectFieldIdVisitor;
+ use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
+ use parquet::arrow::ProjectionMask;
+ use parquet::schema::parser::parse_message_type;
+ use parquet::schema::types::SchemaDescriptor;
+
+ use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
+ use crate::arrow::ArrowReader;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
+ use crate::ErrorKind;
fn table_schema_simple() -> SchemaRef {
Arc::new(
@@ -2419,5 +2532,90 @@
assert_eq!(visitor.field_ids, expected);
}
+
+ #[test]
+ fn test_arrow_projection_mask() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_fields(vec![
+ NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(
+ 3,
+ "c3",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 38,
+ scale: 3,
+ }),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ // Type not supported
+ Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
+ ),
+ // Precision is beyond the supported range
+ Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ ]));
+
+ let message_type = "
+message schema {
+ required binary c1 (STRING) = 1;
+ optional int32 c2 (INTEGER(8,true)) = 2;
+ optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
+}
+ ";
+ let parquet_type = parse_message_type(message_type).expect("should parse schema");
+ let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
+
+ // Try projecting the fields c2 and c3 with the unsupported data types
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 2, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
+ );
+
+ // Omitting field c2, we still get an error due to c3 being selected
+ let err = ArrowReader::get_arrow_projection_mask(
+ &[1, 3],
+ &schema,
+ &parquet_schema,
+ &arrow_schema,
+ )
+ .unwrap_err();
+
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert_eq!(
+ err.to_string(),
+ "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
+ );
+
+ // Finally avoid selecting fields with unsupported data types
+ let mask =
+ ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
+ .expect("Some ProjectionMask");
+ assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
+ }
}