diff --git a/Cargo.toml b/Cargo.toml index c2a667930..2b9db4a8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ apache-avro = "0.17" array-init = "2" arrow-arith = { version = "53" } arrow-array = { version = "53" } +arrow-buffer = { version = "53" } arrow-cast = { version = "53" } arrow-ord = { version = "53" } arrow-schema = { version = "53" } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index f84e7ab67..6d5e79954 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -46,6 +46,7 @@ apache-avro = { workspace = true } array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index e167eeedd..7b96b5db2 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray}; +use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use crate::error::Result; @@ -138,6 +139,7 @@ impl RecordBatchProjector { fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { let mut rev_iterator = field_index.iter().rev(); let mut array = batch[*rev_iterator.next().unwrap()].clone(); + let mut null_buffer = array.logical_nulls(); for idx in rev_iterator { array = array .as_any() @@ -148,8 +150,11 @@ impl RecordBatchProjector { ))? .column(*idx) .clone(); + null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref()); } - Ok(array) + Ok(make_array( + array.to_data().into_builder().nulls(null_buffer).build()?, + )) } } diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 328e2b93d..28ed5323e 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -67,13 +67,12 @@ impl EqualityDeleteWriterConfig { original_arrow_schema, &equality_ids, // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids + // and https://iceberg.apache.org/spec/#equality-delete-files // - The identifier field ids must be used for primitive types. // - The identifier field ids must not be used for floating point types or nullable fields. - // - The identifier field ids can be nested field of struct but not nested field of nullable struct. |field| { // Only primitive type is allowed to be used for identifier field ids - if field.is_nullable() - || field.data_type().is_nested() + if field.data_type().is_nested() || matches!( field.data_type(), DataType::Float16 | DataType::Float32 | DataType::Float64 @@ -92,7 +91,7 @@ impl EqualityDeleteWriterConfig { .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?, )) }, - |field: &Field| !field.is_nullable(), + |_field: &Field| true, )?; Ok(Self { equality_ids, @@ -172,6 +171,7 @@ mod test { use arrow_array::types::Int32Type; use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray}; + use arrow_buffer::NullBuffer; use arrow_schema::DataType; use arrow_select::concat::concat_batches; use itertools::Itertools; @@ -484,14 +484,10 @@ mod test { // Float and Double are not allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); - // Int is nullable, not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err()); // Struct is not allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); // Nested field of struct is allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); - // Nested field of optional struct is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err()); // Nested field of map is not allowed to be used for equality delete assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); @@ -657,4 +653,61 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> { + // prepare data + // Int, Struct(Int) + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 1, + "col1", + Type::Struct(StructType::new(vec![NestedField::optional( + 2, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + ]) + .build() + .unwrap(); + let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + // null 1 + // 2 null(struct) + // 3 null(field) + let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef; + let nulls = NullBuffer::from(vec![true, false, true]); + let col1 = Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], + Some(nulls), + )); + let columns = vec![col0, col1]; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); + let equality_ids = vec![0_i32, 2]; + let equality_config = + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); + let projector = equality_config.projector.clone(); + + // check + let to_write_projected = projector.project_bacth(to_write)?; + let expect_batch = + RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![ + Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef, + Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef, + ]) + .unwrap(); + assert_eq!(to_write_projected, expect_batch); + Ok(()) + } }