From 87e111d3b7c7585a7f8551ec9ca8505ad81a20a4 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 19 Dec 2024 19:08:37 +0100 Subject: [PATCH] Add possible fix --- crates/iceberg/src/scan.rs | 16 ++++++++++------ .../tests/read_positional_deletes.rs | 14 +++++++++----- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89cc21bbf..6fa5c05a3 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -619,15 +619,19 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list - .entries() - .iter() - .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); + let entries = manifest_list.entries(); + + if entries.iter().any(|e| e.content != ManifestContentType::Data) { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Merge-on-read is not yet supported", + )); + } // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in filtered_entries { + for manifest_file in entries.iter() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -649,7 +653,7 @@ impl PlanContext { } } } else { - for manifest_file in filtered_entries { + for manifest_file in entries.iter() { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 7a32b994b..aa52a83f0 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,7 +17,9 @@ //! Integration tests for rest catalog. +use arrow_array::RecordBatch; use futures::TryStreamExt; +use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -38,12 +40,14 @@ async fn test_read_table_with_positional_deletes() { let scan = table.scan().build().unwrap(); println!("{:?}", scan); - let batch_stream = scan.to_arrow().await.unwrap(); - let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - - let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + assert!(scan.to_arrow().await.is_err_and(|e| e.kind() == FeatureUnsupported)); // 😱 If we don't support positional deletes, we should fail when we try to read a table that // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py - assert_eq!(num_rows, 10); + + // When we get support for it: + // let batch_stream = scan.to_arrow().await.unwrap(); + // let batches: Vec<_> = batch_stream.try_collect().await.is_err(); + // let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + // assert_eq!(num_rows, 10); }