Skip to content

Commit

Permalink
fix: ensure arrow reader returns Unsupported when delete files presen…
Browse files Browse the repository at this point in the history
…t in a file scan task
  • Loading branch information
sdd committed Dec 23, 2024
1 parent 260ed04 commit 753a56f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
8 changes: 8 additions & 0 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ impl ArrowReader {
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<()> {
// TODO: add support for delete files
if !task.deletes.is_empty() {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Delete files are not yet supported",
));
}

// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(&task.data_file_path)?;
Expand Down
24 changes: 20 additions & 4 deletions crates/integration_tests/tests/read_positional_deletes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Integration tests for rest catalog.
use futures::TryStreamExt;
use iceberg::ErrorKind::FeatureUnsupported;
use iceberg::{Catalog, TableIdent};
use iceberg_integration_tests::set_test_fixture;
Expand All @@ -35,16 +36,31 @@ async fn test_read_table_with_positional_deletes() {
.await
.unwrap();

let scan = table.scan().build().unwrap();
let scan = table
.scan()
.with_delete_file_processing_enabled(true)
.build()
.unwrap();
println!("{:?}", scan);

assert!(scan
.to_arrow()
let plan: Vec<_> = scan
.plan_files()
.await
.unwrap()
.try_collect()
.await
.is_err_and(|e| e.kind() == FeatureUnsupported));
.unwrap();
println!("{:?}", plan);

// Scan plan phase should include delete files in file plan
// when with_delete_file_processing_enabled == true
assert_eq!(plan[0].deletes.len(), 2);

// 😱 If we don't support positional deletes, we should fail when we try to read a table that
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py
let result = scan.to_arrow().await.unwrap().try_collect::<Vec<_>>().await;

assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported));

// When we get support for it:
// let batch_stream = scan.to_arrow().await.unwrap();
Expand Down

0 comments on commit 753a56f

Please sign in to comment.