Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support scan nested type(struct, map, list) #882

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 46 additions & 7 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -273,6 +273,28 @@ impl ArrowReader {
Ok((iceberg_field_ids, field_id_map))
}

/// Insert the leaf field id into the field_ids using for projection.
/// For nested type, it will recursively insert the leaf field id.
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
match field.field_type.as_ref() {
Type::Primitive(_) => {
field_ids.push(field.id);
}
Type::Struct(struct_type) => {
for nested_field in struct_type.fields() {
Self::include_leaf_field_id(nested_field, field_ids);
}
}
Type::List(list_type) => {
Self::include_leaf_field_id(&list_type.element_field, field_ids);
}
Type::Map(map_type) => {
Self::include_leaf_field_id(&map_type.key_field, field_ids);
Self::include_leaf_field_id(&map_type.value_field, field_ids);
}
}
}

fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
Expand All @@ -297,11 +319,21 @@ impl ArrowReader {
scale: requested_scale,
}),
) if requested_precision >= file_precision && file_scale == requested_scale => true,
// Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
(Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
_ => false,
}
}

if field_ids.is_empty() {
let mut leaf_field_ids = vec![];
for field_id in field_ids {
let field = iceberg_schema_of_task.field_by_id(*field_id);
if let Some(field) = field {
Self::include_leaf_field_id(field, &mut leaf_field_ids);
}
}

if leaf_field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
Expand All @@ -318,7 +350,7 @@ impl ArrowReader {
.and_then(|field_id| i32::from_str(field_id).ok())
.map_or(false, |field_id| {
projected_fields.insert((*f).clone(), field_id);
field_ids.contains(&field_id)
leaf_field_ids.contains(&field_id)
})
}),
arrow_schema.metadata().clone(),
Expand Down Expand Up @@ -351,19 +383,26 @@ impl ArrowReader {
true
});

if column_map.len() != field_ids.len() {
if column_map.len() != leaf_field_ids.len() {
let missing_fields = leaf_field_ids
.iter()
.filter(|field_id| !column_map.contains_key(field_id))
.collect::<Vec<_>>();
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, iceberg_schema_of_task
),
));
)
.with_context("column_map", format! {"{:?}", column_map})
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
.with_context("missing_fields", format! {"{:?}", missing_fields}));
}

let mut indices = vec![];
for field_id in field_ids {
if let Some(col_idx) = column_map.get(field_id) {
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
} else {
return Err(Error::new(
Expand Down
8 changes: 5 additions & 3 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use crate::spec::{
use crate::{Error, ErrorKind};

/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
/// UTC time zone for Arrow timestamp type.
pub const UTC_TIME_ZONE: &str = "+00:00";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also required to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When users provide the timestamp data, they should set the time zone consistent with the iceberg. I think we can provide something to help user fill the metadata later.🤔


/// A post order arrow schema visitor.
///
Expand Down Expand Up @@ -598,14 +600,14 @@ impl SchemaVisitor for ToArrowSchemaConverter {
)),
crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
// Timestampz always stored as UTC
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
)),
crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Nanosecond, None),
)),
crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
// Store timestamptz_ns as UTC
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
)),
crate::spec::PrimitiveType::String => {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
Expand Down
12 changes: 1 addition & 11 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl<'a> TableScanBuilder<'a> {
)
})?;

let field = schema
schema
.as_struct()
.field_by_id(field_id)
.ok_or_else(|| {
Expand All @@ -261,16 +261,6 @@ impl<'a> TableScanBuilder<'a> {
)
})?;

if !field.field_type.is_primitive() {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a primitive type. Schema: {}",
column_name, schema
),
));
}

field_ids.push(field_id);
}

Expand Down
12 changes: 7 additions & 5 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};
use crate::spec::PrimitiveLiteral;

/// Field name for list type.
pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";
pub const LIST_FIELD_NAME: &str = "element";
/// Field name for map type's key.
pub const MAP_KEY_FIELD_NAME: &str = "key";
/// Field name for map type's value.
pub const MAP_VALUE_FIELD_NAME: &str = "value";
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved

pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
Expand Down Expand Up @@ -633,9 +635,9 @@ impl NestedField {
/// Construct list type's element field.
pub fn list_element(id: i32, field_type: Type, required: bool) -> Self {
if required {
Self::required(id, LIST_FILED_NAME, field_type)
Self::required(id, LIST_FIELD_NAME, field_type)
} else {
Self::optional(id, LIST_FILED_NAME, field_type)
Self::optional(id, LIST_FIELD_NAME, field_type)
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use super::NestedField;
use crate::error::Result;
use crate::expr::accessor::StructAccessor;
use crate::spec::datatypes::{
ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME,
ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FIELD_NAME,
MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME,
};
use crate::{ensure_data_valid, Error, ErrorKind};
Expand Down Expand Up @@ -774,7 +774,7 @@ impl SchemaVisitor for IndexByName {
}

fn list(&mut self, list: &ListType, _value: Self::T) -> Result<Self::T> {
self.add_field(LIST_FILED_NAME, list.element_field.id)
self.add_field(LIST_FIELD_NAME, list.element_field.id)
}

fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
Expand Down
1 change: 1 addition & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
Loading
Loading