Skip to content

Commit

Permalink
fix nullable field of equality delete writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 23, 2024
1 parent 10e9a61 commit 7b18773
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "53" }
arrow-array = { version = "53" }
arrow-buffer = { version = "53" }
arrow-cast = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53" }
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ apache-avro = { workspace = true }
array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions crates/iceberg/src/arrow/record_batch_projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

use std::sync::Arc;

use arrow_array::{ArrayRef, RecordBatch, StructArray};
use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};

use crate::error::Result;
Expand Down Expand Up @@ -138,6 +139,7 @@ impl RecordBatchProjector {
fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
let mut rev_iterator = field_index.iter().rev();
let mut array = batch[*rev_iterator.next().unwrap()].clone();
let mut null_buffer = array.logical_nulls();
for idx in rev_iterator {
array = array
.as_any()
Expand All @@ -148,8 +150,11 @@ impl RecordBatchProjector {
))?
.column(*idx)
.clone();
null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref());
}
Ok(array)
Ok(make_array(
array.to_data().into_builder().nulls(null_buffer).build()?,
))
}
}

Expand Down
69 changes: 61 additions & 8 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,12 @@ impl EqualityDeleteWriterConfig {
original_arrow_schema,
&equality_ids,
// The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids
// and https://iceberg.apache.org/spec/#equality-delete-files
// - The identifier field ids must be used for primitive types.
// - The identifier field ids must not be used for floating point types or nullable fields.
// - The identifier field ids can be nested field of struct but not nested field of nullable struct.
|field| {
// Only primitive type is allowed to be used for identifier field ids
if field.is_nullable()
|| field.data_type().is_nested()
if field.data_type().is_nested()
|| matches!(
field.data_type(),
DataType::Float16 | DataType::Float32 | DataType::Float64
Expand All @@ -92,7 +91,7 @@ impl EqualityDeleteWriterConfig {
.map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?,
))
},
|field: &Field| !field.is_nullable(),
|_field: &Field| true,
)?;
Ok(Self {
equality_ids,
Expand Down Expand Up @@ -172,6 +171,7 @@ mod test {

use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::DataType;
use arrow_select::concat::concat_batches;
use itertools::Itertools;
Expand Down Expand Up @@ -484,14 +484,10 @@ mod test {
// Float and Double are not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err());
// Int is nullable, not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err());
// Struct is not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err());
// Nested field of struct is allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok());
// Nested field of optional struct is not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err());
// Nested field of map is not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err());
Expand Down Expand Up @@ -657,4 +653,61 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
// prepare data
// Int, Struct(Int)
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::optional(0, "col0", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
1,
"col1",
Type::Struct(StructType::new(vec![NestedField::optional(
2,
"sub_col",
Type::Primitive(PrimitiveType::Int),
)
.into()])),
)
.into(),
])
.build()
.unwrap();
let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
// null 1
// 2 null(struct)
// 3 null(field)
let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
let nulls = NullBuffer::from(vec![true, false, true]);
let col1 = Arc::new(StructArray::new(
if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
fields.clone()
} else {
unreachable!()
},
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
Some(nulls),
));
let columns = vec![col0, col1];

let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
let equality_ids = vec![0_i32, 2];
let equality_config =
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
let projector = equality_config.projector.clone();

// check
let to_write_projected = projector.project_bacth(to_write)?;
let expect_batch =
RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
])
.unwrap();
assert_eq!(to_write_projected, expect_batch);
Ok(())
}
}

0 comments on commit 7b18773

Please sign in to comment.