diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89cc21bbf..1366d9414 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -364,9 +364,8 @@ impl TableScan { let manifest_list = self.plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any - // whose content type is not Data or whose partitions cannot match this - // scan's filter + // get the [`ManifestFile`]s from the [`ManifestList`], filtering out + // partitions cannot match the scan's filter let manifest_file_contexts = self .plan_context .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; @@ -619,15 +618,22 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list - .entries() + let entries = manifest_list.entries(); + + if entries .iter() - .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); + .any(|e| e.content != ManifestContentType::Data) + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Merge-on-read is not yet supported", + )); + } // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in filtered_entries { + for manifest_file in entries { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -649,7 +655,7 @@ impl PlanContext { } } } else { - for manifest_file in filtered_entries { + for manifest_file in entries { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } diff --git a/crates/integration_tests/testdata/spark/entrypoint.sh b/crates/integration_tests/testdata/spark/entrypoint.sh index abbcc9332..90db14d0b 100755 --- a/crates/integration_tests/testdata/spark/entrypoint.sh +++ b/crates/integration_tests/testdata/spark/entrypoint.sh @@ -18,6 +18,8 @@ # under the License. # +set -e + start-master.sh -p 7077 start-worker.sh spark://spark-iceberg:7077 start-history-server.sh diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py index 1d5ffcb85..364e366b4 100755 --- a/crates/integration_tests/testdata/spark/provision.py +++ b/crates/integration_tests/testdata/spark/provision.py @@ -18,7 +18,17 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr -spark = SparkSession.builder.getOrCreate() +# The configuration is important, otherwise we get many small +# parquet files with a single row. When a positional delete +# hits the Parquet file with one row, the parquet file gets +# dropped instead of having a merge-on-read delete file. +spark = ( + SparkSession + .builder + .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") + .getOrCreate() +) spark.sql( f""" diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index ebd5ea67f..41ca057a6 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,6 +17,7 @@ //! Integration tests for rest catalog. +use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -34,6 +35,20 @@ async fn test_read_table_with_positional_deletes() { .await .unwrap(); - // 😱 If we don't support positional deletes, we should not be able to plan them - println!("{:?}", table.scan().build().unwrap()); + let scan = table.scan().build().unwrap(); + println!("{:?}", scan); + + assert!(scan + .to_arrow() + .await + .is_err_and(|e| e.kind() == FeatureUnsupported)); + + // 😱 If we don't support positional deletes, we should fail when we try to read a table that + // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py + + // When we get support for it: + // let batch_stream = scan.to_arrow().await.unwrap(); + // let batches: Vec<_> = batch_stream.try_collect().await.is_err(); + // let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + // assert_eq!(num_rows, 10); }