Skip to content

Commit

Permalink
fix: Reading a table with positional deletes should fail (#826)
Browse files Browse the repository at this point in the history
* A table with positional deletes shoulds fail

* Add possible fix

* Comment and refactor
  • Loading branch information
Fokko authored Dec 20, 2024
1 parent e1f24c1 commit 0777fa7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
22 changes: 14 additions & 8 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ impl TableScan {

let manifest_list = self.plan_context.get_manifest_list().await?;

// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
// whose content type is not Data or whose partitions cannot match this
// scan's filter
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out
// partitions cannot match the scan's filter
let manifest_file_contexts = self
.plan_context
.build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?;
Expand Down Expand Up @@ -619,15 +618,22 @@ impl PlanContext {
manifest_list: Arc<ManifestList>,
sender: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let filtered_entries = manifest_list
.entries()
let entries = manifest_list.entries();

if entries
.iter()
.filter(|manifest_file| manifest_file.content == ManifestContentType::Data);
.any(|e| e.content != ManifestContentType::Data)
{
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Merge-on-read is not yet supported",
));
}

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];
if self.predicate.is_some() {
for manifest_file in filtered_entries {
for manifest_file in entries {
let partition_bound_predicate = self.get_partition_filter(manifest_file)?;

// evaluate the ManifestFile against the partition filter. Skip
Expand All @@ -649,7 +655,7 @@ impl PlanContext {
}
}
} else {
for manifest_file in filtered_entries {
for manifest_file in entries {
let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone());
filtered_mfcs.push(Ok(mfc));
}
Expand Down
2 changes: 2 additions & 0 deletions crates/integration_tests/testdata/spark/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# under the License.
#

set -e

start-master.sh -p 7077
start-worker.sh spark://spark-iceberg:7077
start-history-server.sh
Expand Down
12 changes: 11 additions & 1 deletion crates/integration_tests/testdata/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_add, expr

spark = SparkSession.builder.getOrCreate()
# The configuration is important, otherwise we get many small
# parquet files with a single row. When a positional delete
# hits the Parquet file with one row, the parquet file gets
# dropped instead of having a merge-on-read delete file.
spark = (
SparkSession
.builder
.config("spark.sql.shuffle.partitions", "1")
.config("spark.default.parallelism", "1")
.getOrCreate()
)

spark.sql(
f"""
Expand Down
19 changes: 17 additions & 2 deletions crates/integration_tests/tests/read_positional_deletes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Integration tests for rest catalog.
use iceberg::ErrorKind::FeatureUnsupported;
use iceberg::{Catalog, TableIdent};
use iceberg_integration_tests::set_test_fixture;

Expand All @@ -34,6 +35,20 @@ async fn test_read_table_with_positional_deletes() {
.await
.unwrap();

// 😱 If we don't support positional deletes, we should not be able to plan them
println!("{:?}", table.scan().build().unwrap());
let scan = table.scan().build().unwrap();
println!("{:?}", scan);

assert!(scan
.to_arrow()
.await
.is_err_and(|e| e.kind() == FeatureUnsupported));

// 😱 If we don't support positional deletes, we should fail when we try to read a table that
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py

// When we get support for it:
// let batch_stream = scan.to_arrow().await.unwrap();
// let batches: Vec<_> = batch_stream.try_collect().await.is_err();
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
// assert_eq!(num_rows, 10);
}

0 comments on commit 0777fa7

Please sign in to comment.