From efd86d46cff71dd5b67f55082e69430ad5deb50f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 13 Dec 2024 10:49:31 +0000 Subject: [PATCH] deploy: 2e0b64646fcfbd909788236a251a3a374a193542 --- api/iceberg/arrow/struct.ArrowReader.html | 6 +- .../arrow/struct.ArrowReaderBuilder.html | 12 +- api/src/iceberg/arrow/reader.rs.html | 232 ++++++++++++++++-- 3 files changed, 224 insertions(+), 26 deletions(-) diff --git a/api/iceberg/arrow/struct.ArrowReader.html b/api/iceberg/arrow/struct.ArrowReader.html index 74b1f3dda..c87390b61 100644 --- a/api/iceberg/arrow/struct.ArrowReader.html +++ b/api/iceberg/arrow/struct.ArrowReader.html @@ -1,7 +1,7 @@ -ArrowReader in iceberg::arrow - Rust

Struct iceberg::arrow::ArrowReader

source ·
pub struct ArrowReader { /* private fields */ }
Expand description

Reads data from Parquet files

-

Implementations§

source§

impl ArrowReader

source

pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream>

Take a stream of FileScanTasks and reads all the files. +ArrowReader in iceberg::arrow - Rust

Struct iceberg::arrow::ArrowReader

source ·
pub struct ArrowReader { /* private fields */ }
Expand description

Reads data from Parquet files

+

Implementations§

source§

impl ArrowReader

source

pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream>

Take a stream of FileScanTasks and reads all the files. Returns a stream of Arrow RecordBatches containing the data from the files

-

Trait Implementations§

source§

impl Clone for ArrowReader

source§

fn clone(&self) -> ArrowReader

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where +

Trait Implementations§

source§

impl Clone for ArrowReader

source§

fn clone(&self) -> ArrowReader

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Conv for T

§

fn conv<T>(self) -> T
where diff --git a/api/iceberg/arrow/struct.ArrowReaderBuilder.html b/api/iceberg/arrow/struct.ArrowReaderBuilder.html index 65cc711e2..7d064faa0 100644 --- a/api/iceberg/arrow/struct.ArrowReaderBuilder.html +++ b/api/iceberg/arrow/struct.ArrowReaderBuilder.html @@ -1,13 +1,13 @@ -ArrowReaderBuilder in iceberg::arrow - Rust

Struct iceberg::arrow::ArrowReaderBuilder

source ·
pub struct ArrowReaderBuilder { /* private fields */ }
Expand description

Builder to create ArrowReader

-

Implementations§

source§

impl ArrowReaderBuilder

source

pub fn with_data_file_concurrency_limit(self, val: usize) -> Self

Sets the max number of in flight data files that are being fetched

-
source

pub fn with_batch_size(self, batch_size: usize) -> Self

Sets the desired size of batches in the response +ArrowReaderBuilder in iceberg::arrow - Rust

Struct iceberg::arrow::ArrowReaderBuilder

source ·
pub struct ArrowReaderBuilder { /* private fields */ }
Expand description

Builder to create ArrowReader

+

Implementations§

source§

impl ArrowReaderBuilder

source

pub fn with_data_file_concurrency_limit(self, val: usize) -> Self

Sets the max number of in flight data files that are being fetched

+
source

pub fn with_batch_size(self, batch_size: usize) -> Self

Sets the desired size of batches in the response to something other than the default

-
source

pub fn with_row_group_filtering_enabled( +

source

pub fn with_row_group_filtering_enabled( self, row_group_filtering_enabled: bool, ) -> Self

Determines whether to enable row group filtering.

-
source

pub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self

Determines whether to enable row selection.

-
source

pub fn build(self) -> ArrowReader

Build the ArrowReader.

+
source

pub fn with_row_selection_enabled(self, row_selection_enabled: bool) -> Self

Determines whether to enable row selection.

+
source

pub fn build(self) -> ArrowReader

Build the ArrowReader.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where diff --git a/api/src/iceberg/arrow/reader.rs.html b/api/src/iceberg/arrow/reader.rs.html index d51cf8fad..4d2794226 100644 --- a/api/src/iceberg/arrow/reader.rs.html +++ b/api/src/iceberg/arrow/reader.rs.html @@ -1209,6 +1209,105 @@ 1209 1210 1211 +1212 +1213 +1214 +1215 +1216 +1217 +1218 +1219 +1220 +1221 +1222 +1223 +1224 +1225 +1226 +1227 +1228 +1229 +1230 +1231 +1232 +1233 +1234 +1235 +1236 +1237 +1238 +1239 +1240 +1241 +1242 +1243 +1244 +1245 +1246 +1247 +1248 +1249 +1250 +1251 +1252 +1253 +1254 +1255 +1256 +1257 +1258 +1259 +1260 +1261 +1262 +1263 +1264 +1265 +1266 +1267 +1268 +1269 +1270 +1271 +1272 +1273 +1274 +1275 +1276 +1277 +1278 +1279 +1280 +1281 +1282 +1283 +1284 +1285 +1286 +1287 +1288 +1289 +1290 +1291 +1292 +1293 +1294 +1295 +1296 +1297 +1298 +1299 +1300 +1301 +1302 +1303 +1304 +1305 +1306 +1307 +1308 +1309 +1310
// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -1236,7 +1335,9 @@
 use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
 use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
 use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
-use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_schema::{
+    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
 use arrow_string::like::starts_with;
 use bytes::Bytes;
 use fnv::FnvHashSet;
@@ -1539,22 +1640,27 @@
             let mut column_map = HashMap::new();
 
             let fields = arrow_schema.fields();
-            let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
-            fields.filter_leaves(|idx, field| {
-                let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
-                if field_id.is_none() {
-                    return false;
-                }
-
-                let field_id = i32::from_str(field_id.unwrap());
-                if field_id.is_err() {
-                    return false;
-                }
-                let field_id = field_id.unwrap();
+            // Pre-project only the fields that have been selected, possibly avoiding converting
+            // some Arrow types that are not yet supported.
+            let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
+            let projected_arrow_schema = ArrowSchema::new_with_metadata(
+                fields.filter_leaves(|_, f| {
+                    f.metadata()
+                        .get(PARQUET_FIELD_ID_META_KEY)
+                        .and_then(|field_id| i32::from_str(field_id).ok())
+                        .map_or(false, |field_id| {
+                            projected_fields.insert((*f).clone(), field_id);
+                            field_ids.contains(&field_id)
+                        })
+                }),
+                arrow_schema.metadata().clone(),
+            );
+            let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
 
-                if !field_ids.contains(&field_id) {
+            fields.filter_leaves(|idx, field| {
+                let Some(field_id) = projected_fields.get(field).cloned() else {
                     return false;
-                }
+                };
 
                 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
                 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -2339,13 +2445,20 @@
 
 #[cfg(test)]
 mod tests {
-    use std::collections::HashSet;
+    use std::collections::{HashMap, HashSet};
     use std::sync::Arc;
 
-    use crate::arrow::reader::CollectFieldIdVisitor;
+    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
+    use parquet::arrow::ProjectionMask;
+    use parquet::schema::parser::parse_message_type;
+    use parquet::schema::types::SchemaDescriptor;
+
+    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
+    use crate::arrow::ArrowReader;
     use crate::expr::visitors::bound_predicate_visitor::visit;
     use crate::expr::{Bind, Reference};
     use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
+    use crate::ErrorKind;
 
     fn table_schema_simple() -> SchemaRef {
         Arc::new(
@@ -2419,5 +2532,90 @@
 
         assert_eq!(visitor.field_ids, expected);
     }
+
+    #[test]
+    fn test_arrow_projection_mask() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_identifier_field_ids(vec![1])
+                .with_fields(vec![
+                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(
+                        3,
+                        "c3",
+                        Type::Primitive(PrimitiveType::Decimal {
+                            precision: 38,
+                            scale: 3,
+                        }),
+                    )
+                    .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            // Type not supported
+            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
+                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
+            ),
+            // Precision is beyond the supported range
+            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "3".to_string(),
+            )])),
+        ]));
+
+        let message_type = "
+message schema {
+  required binary c1 (STRING) = 1;
+  optional int32 c2 (INTEGER(8,true)) = 2;
+  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
+}
+    ";
+        let parquet_type = parse_message_type(message_type).expect("should parse schema");
+        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
+
+        // Try projecting the fields c2 and c3 with the unsupported data types
+        let err = ArrowReader::get_arrow_projection_mask(
+            &[1, 2, 3],
+            &schema,
+            &parquet_schema,
+            &arrow_schema,
+        )
+        .unwrap_err();
+
+        assert_eq!(err.kind(), ErrorKind::DataInvalid);
+        assert_eq!(
+            err.to_string(),
+            "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
+        );
+
+        // Omitting field c2, we still get an error due to c3 being selected
+        let err = ArrowReader::get_arrow_projection_mask(
+            &[1, 3],
+            &schema,
+            &parquet_schema,
+            &arrow_schema,
+        )
+        .unwrap_err();
+
+        assert_eq!(err.kind(), ErrorKind::DataInvalid);
+        assert_eq!(
+            err.to_string(),
+            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
+        );
+
+        // Finally avoid selecting fields with unsupported data types
+        let mask =
+            ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
+                .expect("Some ProjectionMask");
+        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
+    }
 }
 

\ No newline at end of file