Skip to content
Merged
118 changes: 110 additions & 8 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,22 @@ impl CachingDeleteFileLoader {
let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);

// Per the Iceberg spec, evolve schema for equality deletes but only for the
// equality_ids columns, not all table columns.
let equality_ids_vec = task.equality_ids.clone().unwrap();
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
schema,
&equality_ids_vec,
)
.await?;

Ok(DeleteFileContext::FreshEqDel {
batch_stream: BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
schema,
)
.await?,
batch_stream: evolved_stream,
sender,
equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()),
equality_ids: HashSet::from_iter(equality_ids_vec),
})
}

Expand Down Expand Up @@ -536,6 +542,7 @@ mod tests {
use std::fs::File;
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Field, Fields};
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
Expand Down Expand Up @@ -686,4 +693,99 @@ mod tests {
let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
assert!(result.is_none()); // no pos dels for file 3
}

/// Verifies that evolve_schema on partial-schema equality deletes works correctly
/// when only equality_ids columns are evolved, not all table columns.
///
/// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
/// equality delete files can contain only a subset of columns.
#[tokio::test]
async fn test_partial_schema_equality_deletes_evolve_succeeds() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();

// Create table schema with REQUIRED fields
let table_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
crate::spec::NestedField::required(
1,
"id",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
crate::spec::NestedField::required(
2,
"data",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap(),
);

// Write equality delete file with PARTIAL schema (only 'data' column)
let delete_file_path = {
let data_vals = vec!["a", "d", "g"];
let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;

let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field(
"data",
DataType::Utf8,
false,
"2", // field ID
)]));

let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap();

let path = format!("{}/partial-eq-deletes.parquet", &table_location);
let file = File::create(&path).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer =
ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap();
writer.write(&delete_batch).expect("Writing batch");
writer.close().unwrap();
path
};

let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());

let batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(&delete_file_path)
.await
.unwrap();

// Only evolve the equality_ids columns (field 2), not all table columns
let equality_ids = vec![2];
let evolved_stream =
BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, &equality_ids)
.await
.unwrap();

let result = evolved_stream.try_collect::<Vec<_>>().await;

assert!(
result.is_ok(),
"Expected success when evolving only equality_ids columns, got error: {:?}",
result.err()
);

let batches = result.unwrap();
assert_eq!(batches.len(), 1);

let batch = &batches[0];
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 1); // Only 'data' column

// Verify the actual values are preserved after schema evolution
let data_col = batch.column(0).as_string::<i32>();
assert_eq!(data_col.value(0), "a");
assert_eq!(data_col.value(1), "d");
assert_eq!(data_col.value(2), "g");
}
}
24 changes: 14 additions & 10 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,17 @@ impl BasicDeleteFileLoader {
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

/// Evolves the schema of the RecordBatches from an equality delete file
/// Evolves the schema of the RecordBatches from an equality delete file.
///
/// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
/// only evolves the specified `equality_ids` columns, not all table columns.
pub(crate) async fn evolve_schema(
record_batch_stream: ArrowRecordBatchStream,
target_schema: Arc<Schema>,
equality_ids: &[i32],
) -> Result<ArrowRecordBatchStream> {
let eq_ids = target_schema
.as_ref()
.field_id_to_name_map()
.keys()
.cloned()
.collect::<Vec<_>>();

let mut record_batch_transformer =
RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
RecordBatchTransformer::build(target_schema.clone(), equality_ids);

let record_batch_stream = record_batch_stream.map(move |record_batch| {
record_batch.and_then(|record_batch| {
Expand All @@ -106,7 +103,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
) -> Result<ArrowRecordBatchStream> {
let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?;

Self::evolve_schema(raw_batch_stream, schema).await
// For equality deletes, only evolve the equality_ids columns.
// For positional deletes (equality_ids is None), use all field IDs.
let field_ids = match &task.equality_ids {
Some(ids) => ids.clone(),
None => schema.field_id_to_name_map().keys().cloned().collect(),
};

Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
}
}

Expand Down
Loading