Skip to content

Commit

Permalink
Add possible fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Dec 19, 2024
1 parent 87bb216 commit 1ee086c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
17 changes: 12 additions & 5 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,15 +619,22 @@ impl PlanContext {
manifest_list: Arc<ManifestList>,
sender: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let filtered_entries = manifest_list
.entries()
let entries = manifest_list.entries();

if entries
.iter()
.filter(|manifest_file| manifest_file.content == ManifestContentType::Data);
.any(|e| e.content != ManifestContentType::Data)
{
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Merge-on-read is not yet supported",
));
}

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];
if self.predicate.is_some() {
for manifest_file in filtered_entries {
for manifest_file in entries.iter() {
let partition_bound_predicate = self.get_partition_filter(manifest_file)?;

// evaluate the ManifestFile against the partition filter. Skip
Expand All @@ -649,7 +656,7 @@ impl PlanContext {
}
}
} else {
for manifest_file in filtered_entries {
for manifest_file in entries.iter() {
let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone());
filtered_mfcs.push(Ok(mfc));
}
Expand Down
17 changes: 11 additions & 6 deletions crates/integration_tests/tests/read_positional_deletes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Integration tests for rest catalog.
use futures::TryStreamExt;
use iceberg::ErrorKind::FeatureUnsupported;
use iceberg::{Catalog, TableIdent};
use iceberg_integration_tests::set_test_fixture;

Expand All @@ -38,12 +38,17 @@ async fn test_read_table_with_positional_deletes() {
let scan = table.scan().build().unwrap();
println!("{:?}", scan);

let batch_stream = scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
assert!(scan
.to_arrow()
.await
.is_err_and(|e| e.kind() == FeatureUnsupported));

// 😱 If we don't support positional deletes, we should fail when we try to read a table that
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py
assert_eq!(num_rows, 10);

// When we get support for it:
// let batch_stream = scan.to_arrow().await.unwrap();
// let batches: Vec<_> = batch_stream.try_collect().await.is_err();
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
// assert_eq!(num_rows, 10);
}

0 comments on commit 1ee086c

Please sign in to comment.