Skip to content

Commit

Permalink
Spark: DVs + Positional Deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Dec 16, 2024
1 parent f40ec20 commit 897207a
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 93 deletions.
33 changes: 22 additions & 11 deletions core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,43 @@ public Map<String, String> properties() {
}

private Schema calculateSchema() {
int formatVersion = TableUtil.formatVersion(table());
Types.StructType partitionType = Partitioning.partitionType(table());
List<Types.NestedField> columns =
ImmutableList.of(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
MetadataColumns.DELETE_FILE_ROW_FIELD_NAME,
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC),
ImmutableList.Builder<Types.NestedField> builder =
ImmutableList.<Types.NestedField>builder()
.add(MetadataColumns.DELETE_FILE_PATH)
.add(MetadataColumns.DELETE_FILE_POS);
if (formatVersion == 2) {
builder.add(
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
MetadataColumns.DELETE_FILE_ROW_FIELD_NAME,
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC));
}

builder
.add(
Types.NestedField.required(
MetadataColumns.PARTITION_COLUMN_ID,
PARTITION,
partitionType,
"Partition that position delete row belongs to"),
"Partition that position delete row belongs to"))
.add(
Types.NestedField.required(
MetadataColumns.SPEC_ID_COLUMN_ID,
SPEC_ID,
Types.IntegerType.get(),
MetadataColumns.SPEC_ID_COLUMN_DOC),
MetadataColumns.SPEC_ID_COLUMN_DOC))
.add(
Types.NestedField.required(
MetadataColumns.FILE_PATH_COLUMN_ID,
DELETE_FILE_PATH,
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));

List<Types.NestedField> columns = builder.build();

// Calculate used ids (for de-conflict)
Set<Integer> currentlyUsedIds =
Collections.unmodifiableSet(TypeUtil.indexById(Types.StructType.of(columns)).keySet());
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public Map<String, String> properties() {
return properties;
}

public Table underlyingTable() {
return lazyTable();
}

public int formatVersion() {
if (formatVersion == UNKNOWN_FORMAT_VERSION) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ScanTaskUtil;

/**
* Container class representing a set of position delete files to be rewritten by a {@link
Expand Down Expand Up @@ -109,7 +110,7 @@ public long rewrittenBytes() {
}

public long addedBytes() {
return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
return addedDeleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum();
}

public int numRewrittenDeleteFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,12 @@ public void testPositionDeletesManyColumns() {
table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit();

PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table);
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()).isEqualTo(2010);
int expectedIds =
formatVersion >= 3
? 6 // partition col + 5 columns
: 2010; // partition col + 6 columns + 2003 ids inside the deleted row column
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size())
.isEqualTo(expectedIds);

BatchScan scan = positionDeletesTable.newBatchScan();
assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
Expand All @@ -54,6 +55,8 @@
import org.apache.iceberg.actions.SizeBasedFileRewriter;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
Expand All @@ -62,12 +65,14 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
Expand All @@ -88,13 +93,23 @@ public class TestRewritePositionDeleteFiles extends ExtensionsTestBase {
private static final int DELETE_FILES_PER_PARTITION = 2;
private static final int DELETE_FILE_SIZE = 10;

@Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
@Parameter(index = 3)
private int formatVersion;

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}")
public static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
CATALOG_PROPS
CATALOG_PROPS,
2
},
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
CATALOG_PROPS,
3
}
};
}
Expand Down Expand Up @@ -223,7 +238,11 @@ private void testDanglingDelete(String partitionCol, int numDataFiles) throws Ex
// write dangling delete files for 'old data files'
writePosDeletesForFiles(table, dataFiles);
List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);
if (formatVersion >= 3) {
assertThat(deleteFiles).hasSize(numDataFiles);
} else {
assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION);
}

List<Object[]> expectedRecords = records(tableName, partitionCol);

Expand All @@ -250,8 +269,8 @@ private void createTable(String partitionType, String partitionCol, String parti
"CREATE TABLE %s (id long, %s %s, c1 string, c2 string) "
+ "USING iceberg "
+ "PARTITIONED BY (%s) "
+ "TBLPROPERTIES('format-version'='2')",
tableName, partitionCol, partitionType, partitionTransform);
+ "TBLPROPERTIES('format-version'='%s')",
tableName, partitionCol, partitionType, partitionTransform, formatVersion);
}

private void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
Expand Down Expand Up @@ -303,17 +322,24 @@ private void writePosDeletesForFiles(Table table, List<DataFile> files) throws I

int counter = 0;
List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
for (DataFile partitionFile : partitionFiles) {
for (int deletePos = 0; deletePos < DELETE_FILE_SIZE; deletePos++) {
deletes.add(Pair.of(partitionFile.location(), (long) deletePos));
counter++;
if (counter == deleteFileSize) {
// Dump to file and reset variables
OutputFile output =
Files.localOutput(temp.resolve(UUID.randomUUID().toString()).toFile());
deleteFiles.add(writeDeleteFile(table, output, partition, deletes));
counter = 0;
deletes.clear();
if (formatVersion >= 3) {
for (DataFile partitionFile : partitionFiles) {
deleteFiles.addAll(
writeDV(table, partition, partitionFile.location(), deletesForPartition));
}
} else {
for (DataFile partitionFile : partitionFiles) {
for (int deletePos = 0; deletePos < DELETE_FILE_SIZE; deletePos++) {
deletes.add(Pair.of(partitionFile.location(), (long) deletePos));
counter++;
if (counter == deleteFileSize) {
// Dump to file and reset variables
OutputFile output =
Files.localOutput(temp.resolve(UUID.randomUUID().toString()).toFile());
deleteFiles.add(writeDeleteFile(table, output, partition, deletes));
counter = 0;
deletes.clear();
}
}
}
}
Expand All @@ -324,6 +350,20 @@ private void writePosDeletesForFiles(Table table, List<DataFile> files) throws I
rowDelta.commit();
}

private List<DeleteFile> writeDV(
Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException {
OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
try (DVFileWriter closeableWriter = writer) {
for (int row = 0; row < numPositionsToDelete; row++) {
closeableWriter.delete(path, row, table.spec(), partition);
}
}

return writer.result().deleteFiles();
}

private DeleteFile writeDeleteFile(
Table table, OutputFile out, StructLike partition, List<Pair<CharSequence, Long>> deletes)
throws IOException {
Expand Down Expand Up @@ -357,7 +397,7 @@ private List<Object[]> records(String table, String partitionCol) {
}

private long size(List<DeleteFile> deleteFiles) {
return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
return deleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum();
}

private List<DataFile> dataFiles(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
return Stream.of(task.file());
}

@SuppressWarnings("resource") // handled by BaseReader
@Override
protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
String filePath = task.file().location();
Expand Down
Loading

0 comments on commit 897207a

Please sign in to comment.