diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89cc21bbf..26d362ebe 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -619,15 +619,22 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list - .entries() + let entries = manifest_list.entries(); + + if entries .iter() - .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); + .any(|e| e.content != ManifestContentType::Data) + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Merge-on-read is not yet supported", + )); + } // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in filtered_entries { + for manifest_file in entries.iter() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -649,7 +656,7 @@ impl PlanContext { } } } else { - for manifest_file in filtered_entries { + for manifest_file in entries.iter() { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 7a32b994b..41ca057a6 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,7 +17,7 @@ //! Integration tests for rest catalog. -use futures::TryStreamExt; +use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -38,12 +38,17 @@ async fn test_read_table_with_positional_deletes() { let scan = table.scan().build().unwrap(); println!("{:?}", scan); - let batch_stream = scan.to_arrow().await.unwrap(); - let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - - let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + assert!(scan + .to_arrow() + .await + .is_err_and(|e| e.kind() == FeatureUnsupported)); // 😱 If we don't support positional deletes, we should fail when we try to read a table that // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py - assert_eq!(num_rows, 10); + + // When we get support for it: + // let batch_stream = scan.to_arrow().await.unwrap(); + // let batches: Vec<_> = batch_stream.try_collect().await.is_err(); + // let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + // assert_eq!(num_rows, 10); }