From 66fde30ed29114ccb04fdec4cbc157b42693da29 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 9 Dec 2024 09:27:13 +0100 Subject: [PATCH] review feedback --- .../iceberg/spark/source/DVIterator.java | 48 ++++--------- .../source/TestPositionDeletesReader.java | 70 ++++++++++++++++--- 2 files changed, 76 insertions(+), 42 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java index ee56b7964692..e77e0cb64c84 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java @@ -25,15 +25,12 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; import org.apache.iceberg.data.BaseDeleteLoader; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.PartitionUtil; -import org.apache.iceberg.util.StructLikeUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.unsafe.types.UTF8String; @@ -75,40 +72,23 @@ public InternalRow next() { if (null == row) { List rowValues = Lists.newArrayList(); - if (null != projection.findField(MetadataColumns.DELETE_FILE_PATH.fieldId())) { - rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile())); - } - - if (null != projection.findField(MetadataColumns.DELETE_FILE_POS.fieldId())) { - rowValues.add(position); - // remember the index where the deleted position needs to be set - deletedPositionIndex = rowValues.size() - 1; - } - - Types.NestedField partition = projection.findField(MetadataColumns.PARTITION_COLUMN_ID); - if (null != partition) { - Object constant = idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID); - if (null != constant) { - rowValues.add(constant); - } else { - Types.StructType type = partition.type().asStructType(); - StructInternalRow partitionRow = new StructInternalRow(type); - StructLike copiedPartition = StructLikeUtil.copy(deleteFile.partition()); - partitionRow.setStruct(PartitionUtil.coercePartition(type, spec, copiedPartition)); - rowValues.add(partitionRow); + for (Types.NestedField column : projection.columns()) { + int fieldId = column.fieldId(); + if (fieldId == MetadataColumns.DELETE_FILE_PATH.fieldId()) { + rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile())); + } else if (fieldId == MetadataColumns.DELETE_FILE_POS.fieldId()) { + rowValues.add(position); + // remember the index where the deleted position needs to be set + deletedPositionIndex = rowValues.size() - 1; + } else if (fieldId == MetadataColumns.PARTITION_COLUMN_ID) { + rowValues.add(idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID)); + } else if (fieldId == MetadataColumns.SPEC_ID_COLUMN_ID) { + rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID)); + } else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) { + rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID)); } } - if (null != projection.findField(MetadataColumns.SPEC_ID_COLUMN_ID)) { - Object constant = idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID); - rowValues.add(null != constant ? constant : deleteFile.specId()); - } - - if (null != projection.findField(MetadataColumns.FILE_PATH_COLUMN_ID)) { - Object constant = idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID); - rowValues.add(null != constant ? constant : UTF8String.fromString(deleteFile.location())); - } - this.row = new GenericInternalRow(rowValues.toArray()); } else if (null != deletedPositionIndex) { // only update the deleted position if necessary, everything else stays the same diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java index c264e702b72a..ad0fcbeb680d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java @@ -50,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; @@ -184,7 +185,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException actualRows.add(reader.get().copy()); } - assertThat(internalRowsToJava(actualRows)) + assertThat(internalRowsToJava(actualRows, projectedSchema)) .hasSize(2) .containsExactly( rowFromDeleteFile(dataFile1, deleteFile1, 0L), @@ -205,7 +206,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException actualRows.add(reader.get().copy()); } - assertThat(internalRowsToJava(actualRows)) + assertThat(internalRowsToJava(actualRows, projectedSchema)) .hasSize(2) .containsExactly( rowFromDeleteFile(dataFile2, deleteFile2, 2L), @@ -213,6 +214,59 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException } } + @TestTemplate + public void readPositionDeletesTableWithDifferentColumnOrdering() throws IOException { + Pair posDeletes1 = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + Lists.newArrayList( + Pair.of(dataFile1.location(), 0L), Pair.of(dataFile1.location(), 1L)), + formatVersion); + + DeleteFile deleteFile1 = posDeletes1.first(); + table + .newRowDelta() + .addDeletes(deleteFile1) + .validateDataFilesExist(posDeletes1.second()) + .commit(); + + Table positionDeletesTable = + catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes")); + + // select a few fields in backwards order + Schema projectedSchema = + new Schema(MetadataColumns.DELETE_FILE_POS, MetadataColumns.DELETE_FILE_PATH); + + List scanTasks = + Lists.newArrayList( + positionDeletesTable.newBatchScan().project(projectedSchema).planFiles()); + assertThat(scanTasks).hasSize(1); + + assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class); + PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask) scanTasks.get(0); + + try (PositionDeletesRowReader reader = + new PositionDeletesRowReader( + table, + new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)), + positionDeletesTable.schema(), + projectedSchema, + false)) { + List actualRows = Lists.newArrayList(); + while (reader.next()) { + actualRows.add(reader.get().copy()); + } + + assertThat(internalRowsToJava(actualRows, projectedSchema)) + .hasSize(2) + .containsExactly( + new Object[] {0L, UTF8String.fromString(dataFile1.location())}, + new Object[] {1L, UTF8String.fromString(dataFile1.location())}); + } + } + private DataFile writeDataFile(List records) throws IOException { return FileHelpers.writeDataFile( table, @@ -221,15 +275,15 @@ private DataFile writeDataFile(List records) throws IOException { records); } - private List internalRowsToJava(List rows) { - return rows.stream().map(this::toJava).collect(Collectors.toList()); + private List internalRowsToJava(List rows, Schema projection) { + return rows.stream().map(row -> toJava(row, projection)).collect(Collectors.toList()); } - private Object[] toJava(InternalRow row) { + private Object[] toJava(InternalRow row, Schema projection) { Object[] values = new Object[row.numFields()]; - values[0] = row.getUTF8String(0); - values[1] = row.getLong(1); - values[2] = row.getUTF8String(2); + for (int i = 0; i < projection.columns().size(); i++) { + values[i] = row.get(i, SparkSchemaUtil.convert(projection.columns().get(i).type())); + } return values; }