From 87bb2163630241a220e3fa9d5c6ef6e52ed36b7f Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 19 Dec 2024 18:15:20 +0100 Subject: [PATCH 1/3] A table with positional deletes shoulds fail --- .../integration_tests/testdata/spark/entrypoint.sh | 2 ++ .../integration_tests/testdata/spark/provision.py | 8 +++++++- .../tests/read_positional_deletes.rs | 14 ++++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/crates/integration_tests/testdata/spark/entrypoint.sh b/crates/integration_tests/testdata/spark/entrypoint.sh index abbcc9332..90db14d0b 100755 --- a/crates/integration_tests/testdata/spark/entrypoint.sh +++ b/crates/integration_tests/testdata/spark/entrypoint.sh @@ -18,6 +18,8 @@ # under the License. # +set -e + start-master.sh -p 7077 start-worker.sh spark://spark-iceberg:7077 start-history-server.sh diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py index 1d5ffcb85..f7cb79837 100755 --- a/crates/integration_tests/testdata/spark/provision.py +++ b/crates/integration_tests/testdata/spark/provision.py @@ -18,7 +18,13 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr -spark = SparkSession.builder.getOrCreate() +spark = ( + SparkSession + .builder + .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") + .getOrCreate() +) spark.sql( f""" diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index ebd5ea67f..7a32b994b 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,6 +17,7 @@ //! Integration tests for rest catalog. +use futures::TryStreamExt; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -34,6 +35,15 @@ async fn test_read_table_with_positional_deletes() { .await .unwrap(); - // 😱 If we don't support positional deletes, we should not be able to plan them - println!("{:?}", table.scan().build().unwrap()); + let scan = table.scan().build().unwrap(); + println!("{:?}", scan); + + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + + // 😱 If we don't support positional deletes, we should fail when we try to read a table that + // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py + assert_eq!(num_rows, 10); } From 1ee086cf39c71276d0c4abe472a8a0532d3d7ed3 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 19 Dec 2024 19:08:37 +0100 Subject: [PATCH 2/3] Add possible fix --- crates/iceberg/src/scan.rs | 17 ++++++++++++----- .../tests/read_positional_deletes.rs | 17 +++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89cc21bbf..26d362ebe 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -619,15 +619,22 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list - .entries() + let entries = manifest_list.entries(); + + if entries .iter() - .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); + .any(|e| e.content != ManifestContentType::Data) + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Merge-on-read is not yet supported", + )); + } // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in filtered_entries { + for manifest_file in entries.iter() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -649,7 +656,7 @@ impl PlanContext { } } } else { - for manifest_file in filtered_entries { + for manifest_file in entries.iter() { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 7a32b994b..41ca057a6 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,7 +17,7 @@ //! Integration tests for rest catalog. -use futures::TryStreamExt; +use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -38,12 +38,17 @@ async fn test_read_table_with_positional_deletes() { let scan = table.scan().build().unwrap(); println!("{:?}", scan); - let batch_stream = scan.to_arrow().await.unwrap(); - let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); - - let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + assert!(scan + .to_arrow() + .await + .is_err_and(|e| e.kind() == FeatureUnsupported)); // 😱 If we don't support positional deletes, we should fail when we try to read a table that // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py - assert_eq!(num_rows, 10); + + // When we get support for it: + // let batch_stream = scan.to_arrow().await.unwrap(); + // let batches: Vec<_> = batch_stream.try_collect().await.is_err(); + // let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + // assert_eq!(num_rows, 10); } From fee5abf071954ff1f5e20a459a7f885c93af24d6 Mon Sep 17 00:00:00 2001 From: Fokko Date: Fri, 20 Dec 2024 08:10:28 +0100 Subject: [PATCH 3/3] Comment and refactor --- crates/iceberg/src/scan.rs | 9 ++++----- crates/integration_tests/testdata/spark/provision.py | 4 ++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 26d362ebe..1366d9414 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -364,9 +364,8 @@ impl TableScan { let manifest_list = self.plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any - // whose content type is not Data or whose partitions cannot match this - // scan's filter + // get the [`ManifestFile`]s from the [`ManifestList`], filtering out + // partitions cannot match the scan's filter let manifest_file_contexts = self .plan_context .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; @@ -634,7 +633,7 @@ impl PlanContext { // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in entries.iter() { + for manifest_file in entries { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -656,7 +655,7 @@ impl PlanContext { } } } else { - for manifest_file in entries.iter() { + for manifest_file in entries { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py index f7cb79837..364e366b4 100755 --- a/crates/integration_tests/testdata/spark/provision.py +++ b/crates/integration_tests/testdata/spark/provision.py @@ -18,6 +18,10 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr +# The configuration is important, otherwise we get many small +# parquet files with a single row. When a positional delete +# hits the Parquet file with one row, the parquet file gets +# dropped instead of having a merge-on-read delete file. spark = ( SparkSession .builder