Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Dec 9, 2024
1 parent de6e0da commit 66fde30
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.StructLikeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down Expand Up @@ -75,40 +72,23 @@ public InternalRow next() {

if (null == row) {
List<Object> rowValues = Lists.newArrayList();
if (null != projection.findField(MetadataColumns.DELETE_FILE_PATH.fieldId())) {
rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile()));
}

if (null != projection.findField(MetadataColumns.DELETE_FILE_POS.fieldId())) {
rowValues.add(position);
// remember the index where the deleted position needs to be set
deletedPositionIndex = rowValues.size() - 1;
}

Types.NestedField partition = projection.findField(MetadataColumns.PARTITION_COLUMN_ID);
if (null != partition) {
Object constant = idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID);
if (null != constant) {
rowValues.add(constant);
} else {
Types.StructType type = partition.type().asStructType();
StructInternalRow partitionRow = new StructInternalRow(type);
StructLike copiedPartition = StructLikeUtil.copy(deleteFile.partition());
partitionRow.setStruct(PartitionUtil.coercePartition(type, spec, copiedPartition));
rowValues.add(partitionRow);
for (Types.NestedField column : projection.columns()) {
int fieldId = column.fieldId();
if (fieldId == MetadataColumns.DELETE_FILE_PATH.fieldId()) {
rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile()));
} else if (fieldId == MetadataColumns.DELETE_FILE_POS.fieldId()) {
rowValues.add(position);
// remember the index where the deleted position needs to be set
deletedPositionIndex = rowValues.size() - 1;
} else if (fieldId == MetadataColumns.PARTITION_COLUMN_ID) {
rowValues.add(idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID));
} else if (fieldId == MetadataColumns.SPEC_ID_COLUMN_ID) {
rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID));
} else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) {
rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID));
}
}

if (null != projection.findField(MetadataColumns.SPEC_ID_COLUMN_ID)) {
Object constant = idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID);
rowValues.add(null != constant ? constant : deleteFile.specId());
}

if (null != projection.findField(MetadataColumns.FILE_PATH_COLUMN_ID)) {
Object constant = idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID);
rowValues.add(null != constant ? constant : UTF8String.fromString(deleteFile.location()));
}

this.row = new GenericInternalRow(rowValues.toArray());
} else if (null != deletedPositionIndex) {
// only update the deleted position if necessary, everything else stays the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
Expand Down Expand Up @@ -184,7 +185,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException
actualRows.add(reader.get().copy());
}

assertThat(internalRowsToJava(actualRows))
assertThat(internalRowsToJava(actualRows, projectedSchema))
.hasSize(2)
.containsExactly(
rowFromDeleteFile(dataFile1, deleteFile1, 0L),
Expand All @@ -205,14 +206,67 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException
actualRows.add(reader.get().copy());
}

assertThat(internalRowsToJava(actualRows))
assertThat(internalRowsToJava(actualRows, projectedSchema))
.hasSize(2)
.containsExactly(
rowFromDeleteFile(dataFile2, deleteFile2, 2L),
rowFromDeleteFile(dataFile2, deleteFile2, 3L));
}
}

@TestTemplate
public void readPositionDeletesTableWithDifferentColumnOrdering() throws IOException {
Pair<DeleteFile, CharSequenceSet> posDeletes1 =
FileHelpers.writeDeleteFile(
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
Lists.newArrayList(
Pair.of(dataFile1.location(), 0L), Pair.of(dataFile1.location(), 1L)),
formatVersion);

DeleteFile deleteFile1 = posDeletes1.first();
table
.newRowDelta()
.addDeletes(deleteFile1)
.validateDataFilesExist(posDeletes1.second())
.commit();

Table positionDeletesTable =
catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes"));

// select a few fields in backwards order
Schema projectedSchema =
new Schema(MetadataColumns.DELETE_FILE_POS, MetadataColumns.DELETE_FILE_PATH);

List<ScanTask> scanTasks =
Lists.newArrayList(
positionDeletesTable.newBatchScan().project(projectedSchema).planFiles());
assertThat(scanTasks).hasSize(1);

assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class);
PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask) scanTasks.get(0);

try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
positionDeletesTable.schema(),
projectedSchema,
false)) {
List<InternalRow> actualRows = Lists.newArrayList();
while (reader.next()) {
actualRows.add(reader.get().copy());
}

assertThat(internalRowsToJava(actualRows, projectedSchema))
.hasSize(2)
.containsExactly(
new Object[] {0L, UTF8String.fromString(dataFile1.location())},
new Object[] {1L, UTF8String.fromString(dataFile1.location())});
}
}

private DataFile writeDataFile(List<Record> records) throws IOException {
return FileHelpers.writeDataFile(
table,
Expand All @@ -221,15 +275,15 @@ private DataFile writeDataFile(List<Record> records) throws IOException {
records);
}

private List<Object[]> internalRowsToJava(List<InternalRow> rows) {
return rows.stream().map(this::toJava).collect(Collectors.toList());
private List<Object[]> internalRowsToJava(List<InternalRow> rows, Schema projection) {
return rows.stream().map(row -> toJava(row, projection)).collect(Collectors.toList());
}

private Object[] toJava(InternalRow row) {
private Object[] toJava(InternalRow row, Schema projection) {
Object[] values = new Object[row.numFields()];
values[0] = row.getUTF8String(0);
values[1] = row.getLong(1);
values[2] = row.getUTF8String(2);
for (int i = 0; i < projection.columns().size(); i++) {
values[i] = row.get(i, SparkSchemaUtil.convert(projection.columns().get(i).type()));
}
return values;
}

Expand Down

0 comments on commit 66fde30

Please sign in to comment.