diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 4a0668976a25..a5266101c252 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -109,6 +109,11 @@ public Builder copy(DeleteFile toCopy) { this.keyMetadata = toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata()); this.sortOrderId = toCopy.sortOrderId(); + this.splitOffsets = toCopy.splitOffsets(); + // Preserve DV-specific fields for deletion vectors + this.referencedDataFile = toCopy.referencedDataFile(); + this.contentOffset = toCopy.contentOffset(); + this.contentSizeInBytes = toCopy.contentSizeInBytes(); return this; } diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index ee7679f5e972..6f58b29315be 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -40,6 +41,11 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinCompressionCodec; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -446,16 +452,8 @@ private static RewriteResult writeDeleteFileEntry( switch (file.content()) { case POSITION_DELETES: - String targetDeleteFilePath = newPath(file.location(), sourcePrefix, targetPrefix); - Metrics metricsWithTargetPath = - ContentFileUtil.replacePathBounds(file, sourcePrefix, targetPrefix); - DeleteFile movedFile = - FileMetadata.deleteFileBuilder(spec) - .copy(file) - .withPath(targetDeleteFilePath) - .withMetrics(metricsWithTargetPath) - .build(); - appendEntryWithFile(entry, writer, movedFile); + DeleteFile posDeleteFile = newPositionDeleteEntry(file, spec, sourcePrefix, targetPrefix); + appendEntryWithFile(entry, writer, posDeleteFile); // keep the following entries in metadata but exclude them from copyPlan // 1) deleted position delete files // 2) entries not changed by snapshotIds @@ -465,7 +463,7 @@ private static RewriteResult writeDeleteFileEntry( .add( Pair.of( stagingPath(file.location(), sourcePrefix, stagingLocation), - movedFile.location())); + posDeleteFile.location())); } result.toRewrite().add(file); return result; @@ -524,6 +522,56 @@ private static DeleteFile newEqualityDeleteEntry( .build(); } + private static DeleteFile newPositionDeleteEntry( + DeleteFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) { + String path = file.location(); + Preconditions.checkArgument( + path.startsWith(sourcePrefix), + "Expected delete file %s to start with prefix: %s", + path, + sourcePrefix); + + FileMetadata.Builder builder = + FileMetadata.deleteFileBuilder(spec) + .copy(file) + .withPath(newPath(path, sourcePrefix, targetPrefix)) + .withMetrics(ContentFileUtil.replacePathBounds(file, sourcePrefix, targetPrefix)); + + // Update referencedDataFile for DV files + String newReferencedDataFile = + rewriteReferencedDataFilePathForDV(file, sourcePrefix, targetPrefix); + if (newReferencedDataFile != null) { + builder.withReferencedDataFile(newReferencedDataFile); + } + + return builder.build(); + } + + /** + * Replace the referenced data file path for a DV (Deletion Vector) file. + * + *

For DV files, returns the updated path with the target prefix. For non-DV files or files + * without a referenced data file, returns null. + * + * @param deleteFile delete file to check + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return updated referenced data file path, or null if not applicable + */ + private static String rewriteReferencedDataFilePathForDV( + DeleteFile deleteFile, String sourcePrefix, String targetPrefix) { + if (!ContentFileUtil.isDV(deleteFile) || deleteFile.referencedDataFile() == null) { + return null; + } + + String oldReferencedDataFile = deleteFile.referencedDataFile(); + if (oldReferencedDataFile.startsWith(sourcePrefix)) { + return newPath(oldReferencedDataFile, sourcePrefix, targetPrefix); + } + + return oldReferencedDataFile; + } + /** Class providing engine-specific methods to read and write position delete files. */ public interface PositionDeleteReaderWriter extends Serializable { CloseableIterable reader(InputFile inputFile, FileFormat format, PartitionSpec spec); @@ -562,6 +610,14 @@ public static void rewritePositionDeleteFile( throw new UnsupportedOperationException( String.format("Expected delete file %s to start with prefix: %s", path, sourcePrefix)); } + + // DV files (Puffin format for v3+) need special handling to rewrite internal blob metadata + if (ContentFileUtil.isDV(deleteFile)) { + rewriteDVFile(deleteFile, outputFile, io, sourcePrefix, targetPrefix); + return; + } + + // For non-DV position delete files (v2), rewrite using the reader/writer InputFile sourceFile = io.newInputFile(path); try (CloseableIterable reader = posDeleteReaderWriter.reader(sourceFile, deleteFile.format(), spec)) { @@ -592,6 +648,57 @@ record = recordIt.next(); } } + /** + * Rewrite a DV (Deletion Vector) file, updating the referenced data file paths in blob metadata. + * + * @param deleteFile source DV file to be rewritten + * @param outputFile output file to write the rewritten DV to + * @param io file io + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix to replace it + */ + private static void rewriteDVFile( + DeleteFile deleteFile, + OutputFile outputFile, + FileIO io, + String sourcePrefix, + String targetPrefix) + throws IOException { + List rewrittenBlobs = Lists.newArrayList(); + try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) { + // Read all blobs and rewrite them with updated referenced data file paths + for (Pair blobPair : + reader.readAll(reader.fileMetadata().blobs())) { + org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first(); + ByteBuffer blobData = blobPair.second(); + + // Get the original properties and update the referenced data file path + Map properties = Maps.newHashMap(blobMetadata.properties()); + String referencedDataFile = properties.get("referenced-data-file"); + if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) { + String newReferencedDataFile = newPath(referencedDataFile, sourcePrefix, targetPrefix); + properties.put("referenced-data-file", newReferencedDataFile); + } + + // Create a new blob with updated properties + rewrittenBlobs.add( + new Blob( + blobMetadata.type(), + blobMetadata.inputFields(), + blobMetadata.snapshotId(), + blobMetadata.sequenceNumber(), + blobData, + PuffinCompressionCodec.forName(blobMetadata.compressionCodec()), + properties)); + } + } + + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { + rewrittenBlobs.forEach(writer::write); + } + } + private static PositionDelete newPositionDeleteRecord( Record record, String sourcePrefix, String targetPrefix) { PositionDelete delete = PositionDelete.create(); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 6dac5d5da00d..0bcaf0af6581 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -39,6 +40,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -59,6 +63,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -80,10 +85,12 @@ import org.apache.spark.storage.BroadcastBlockId; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import scala.Tuple2; +@ExtendWith(ParameterizedTestExtension.class) public class TestRewriteTablePathsAction extends TestBase { @TempDir private Path staging; @@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase { @TempDir private Path newTableDir; @TempDir private Path targetTableDir; + @Parameters(name = "formatVersion = {0}") + protected static List formatVersions() { + return TestHelpers.V2_AND_ABOVE; + } + + @Parameter private int formatVersion; + protected ActionsProvider actions() { return SparkActions.get(); } @@ -135,7 +149,15 @@ protected Table createTableWithSnapshots( private Table createTableWithSnapshots( String location, int snapshotNumber, Map properties, String mode) { - Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); + Table newTable = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.builder() + .put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) + .putAll(properties) + .build(), + location); List records = Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); @@ -159,7 +181,7 @@ private void dropNameSpaces() { sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs); } - @Test + @TestTemplate public void testRewritePath() throws Exception { String targetTableLocation = targetTableLocation(); @@ -207,7 +229,7 @@ public void testRewritePath() throws Exception { assertEquals("Rows should match after copy", expected, actual); } - @Test + @TestTemplate public void testSameLocations() { assertThatThrownBy( () -> @@ -220,7 +242,7 @@ public void testSameLocations() { .hasMessageContaining("Source prefix cannot be the same as target prefix"); } - @Test + @TestTemplate public void testStartVersion() throws Exception { RewriteTablePath.Result result = actions() @@ -244,7 +266,7 @@ public void testStartVersion() throws Exception { .isEmpty(); } - @Test + @TestTemplate public void testIncrementalRewrite() throws Exception { String location = newTableLocation(); Table sourceTable = @@ -291,7 +313,7 @@ public void testIncrementalRewrite() throws Exception { assertEquals("Rows should match after copy", expected, actual); } - @Test + @TestTemplate public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) throws Exception { String location = newTableLocation(); @@ -316,7 +338,7 @@ public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path locat checkFileNum(3, 3, 3, 12, result1); } - @Test + @TestTemplate public void testFullTableRewritePath() throws Exception { RewriteTablePath.Result result = actions() @@ -327,7 +349,7 @@ public void testFullTableRewritePath() throws Exception { checkFileNum(3, 2, 2, 9, result); } - @Test + @TestTemplate public void testManifestRewriteAndIncrementalCopy() throws Exception { RewriteTablePath.Result initialResult = actions() @@ -354,7 +376,7 @@ public void testManifestRewriteAndIncrementalCopy() throws Exception { checkFileNum(1, 1, addedManifest, 3, postReweiteResult); } - @Test + @TestTemplate public void testDeleteDataFile() throws Exception { List validDataFiles = spark @@ -386,17 +408,17 @@ public void testDeleteDataFile() throws Exception { .hasSize(1); } - @Test + @TestTemplate public void testPositionDeletesParquet() throws Exception { runPositionDeletesTest("parquet"); } - @Test + @TestTemplate public void testPositionDeletesAvro() throws Exception { runPositionDeletesTest("avro"); } - @Test + @TestTemplate public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } @@ -427,7 +449,8 @@ private void runPositionDeletesTest(String fileFormat) throws Exception { FileHelpers.writeDeleteFile( tableWithPosDeletes, tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), - deletes) + deletes, + formatVersion) .first(); tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); @@ -454,7 +477,7 @@ private void runPositionDeletesTest(String fileFormat) throws Exception { .hasSize(1); } - @Test + @TestTemplate public void testPositionDeleteWithRow() throws Exception { String dataFileLocation = table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(); @@ -467,7 +490,8 @@ public void testPositionDeleteWithRow() throws Exception { .toURI() .toString()); deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", "AAAA")); - DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes); + DeleteFile positionDeletes = + FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes, formatVersion); table.newRowDelta().addDeletes(positionDeletes).commit(); assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1); @@ -486,18 +510,24 @@ public void testPositionDeleteWithRow() throws Exception { // copy the metadata files and data files copyTableFiles(result); - // check copied position delete row - Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; - assertEquals( - "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + // check copied position delete row - only v2 stores row data with position deletes + // v3+ uses Deletion Vectors (DV) which only store position information + if (formatVersion == 2) { + Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; + assertEquals( + "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + } // Positional delete affects a single row, so only one row must remain assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList()) .hasSize(1); } - @Test + @TestTemplate public void testPositionDeletesAcrossFiles() throws Exception { + assumeThat(formatVersion) + .as("Can't write multiple deletes into a single v3 delete file") + .isEqualTo(2); Stream allFiles = StreamSupport.stream(table.snapshots().spliterator(), false) .flatMap(s -> StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false)); @@ -510,7 +540,7 @@ public void testPositionDeletesAcrossFiles() throws Exception { File file = new File(removePrefix(table.location() + "/data/deeply/nested/file.parquet")); DeleteFile positionDeletes = FileHelpers.writeDeleteFile( - table, table.io().newOutputFile(file.toURI().toString()), deletes) + table, table.io().newOutputFile(file.toURI().toString()), deletes, formatVersion) .first(); table.newRowDelta().addDeletes(positionDeletes).commit(); @@ -535,7 +565,7 @@ public void testPositionDeletesAcrossFiles() throws Exception { .isEmpty(); } - @Test + @TestTemplate public void testEqualityDeletes() throws Exception { Table sourceTable = createTableWithSnapshots(newTableLocation(), 1); @@ -590,7 +620,7 @@ public void testEqualityDeletes() throws Exception { .hasSize(2); } - @Test + @TestTemplate public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { String location = newTableLocation(); Table sourceTable = createTableWithSnapshots(location, 2); @@ -636,7 +666,7 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { result); } - @Test + @TestTemplate public void testRewritePathWithoutSnapshot() throws Exception { RewriteTablePath.Result result = actions() @@ -649,7 +679,7 @@ public void testRewritePathWithoutSnapshot() throws Exception { checkFileNum(1, 0, 0, 1, result); } - @Test + @TestTemplate public void testExpireSnapshotBeforeRewrite() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -664,7 +694,7 @@ public void testExpireSnapshotBeforeRewrite() throws Exception { checkFileNum(4, 1, 2, 9, result); } - @Test + @TestTemplate public void testRewritePathWithNonLiveEntry() throws Exception { String location = newTableLocation(); // first overwrite generate 1 manifest and 1 data file @@ -729,7 +759,7 @@ public void testRewritePathWithNonLiveEntry() throws Exception { assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation); } - @Test + @TestTemplate public void testStartSnapshotWithoutValidSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -748,7 +778,7 @@ public void testStartSnapshotWithoutValidSnapshot() throws Exception { checkFileNum(2, 1, 1, 5, result); } - @Test + @TestTemplate public void testMoveTheVersionExpireSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -766,7 +796,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception { checkFileNum(1, 0, 0, 1, result); } - @Test + @TestTemplate public void testMoveVersionWithInvalidSnapshots() { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -785,7 +815,7 @@ public void testMoveVersionWithInvalidSnapshots() { + "Please choose an earlier version without invalid snapshots."); } - @Test + @TestTemplate public void testRollBack() throws Exception { long secondSnapshotId = table.currentSnapshot().snapshotId(); @@ -812,7 +842,7 @@ public void testRollBack() throws Exception { checkFileNum(6, 3, 3, 15, result); } - @Test + @TestTemplate public void testWriteAuditPublish() throws Exception { // enable WAP table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); @@ -837,7 +867,7 @@ public void testWriteAuditPublish() throws Exception { checkFileNum(5, 3, 3, 14, result); } - @Test + @TestTemplate public void testSchemaChange() throws Exception { // change the schema table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); @@ -854,7 +884,7 @@ public void testSchemaChange() throws Exception { checkFileNum(4, 2, 2, 10, result); } - @Test + @TestTemplate public void testSnapshotIdInheritanceEnabled() throws Exception { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); @@ -872,7 +902,7 @@ public void testSnapshotIdInheritanceEnabled() throws Exception { checkFileNum(3, 2, 2, 9, result); } - @Test + @TestTemplate public void testMetadataCompression() throws Exception { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); @@ -898,7 +928,7 @@ public void testMetadataCompression() throws Exception { checkFileNum(2, 2, 2, 8, result); } - @Test + @TestTemplate public void testInvalidArgs() { RewriteTablePath actions = actions().rewriteTablePath(table); @@ -931,12 +961,12 @@ public void testInvalidArgs() { .hasMessageContaining("End version('null') cannot be empty"); } - @Test + @TestTemplate public void testTableWithManyPartitionStatisticFile() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithPartStats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithPartStats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0, "c1"); @@ -963,12 +993,12 @@ public void testTableWithManyPartitionStatisticFile() throws IOException { result, "partition-stats", sourceTableLocation, targetTableLocation); } - @Test + @TestTemplate public void testTableWithManyStatisticFiles() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithmanystats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithmanystats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); @@ -992,12 +1022,12 @@ public void testTableWithManyStatisticFiles() throws IOException { iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result); } - @Test + @TestTemplate public void testStatisticsFileSourcePath() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithstats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithstats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 1); @@ -1020,13 +1050,17 @@ public void testStatisticsFileSourcePath() throws IOException { findAndAssertFileInFileList(result, ".stats", sourceTableLocation, targetTableLocation); } - @Test + @TestTemplate public void testMetadataCompressionWithMetastoreTable() throws Exception { Map properties = Maps.newHashMap(); properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); Table sourceTable = createMetastoreTable( - newTableLocation(), properties, "default", "testMetadataCompression", 2); + newTableLocation(), + properties, + "default", + String.format("v%sMetadataCompression", formatVersion), + 2); TableMetadata currentMetadata = currentMetadata(sourceTable); @@ -1054,10 +1088,11 @@ public void testMetadataCompressionWithMetastoreTable() throws Exception { } // Metastore table tests - @Test + @TestTemplate public void testMetadataLocationChange() throws Exception { + String tableName = String.format("v%stblWithLocation", formatVersion); Table sourceTable = - createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", "tbl", 1); + createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", tableName, 1); String metadataFilePath = currentMetadata(sourceTable).metadataFileLocation(); String newMetadataDir = "new-metadata-dir"; @@ -1066,7 +1101,7 @@ public void testMetadataLocationChange() throws Exception { .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + newMetadataDir) .commit(); - spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')"); + sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')", tableName); sourceTable.refresh(); // copy table @@ -1099,12 +1134,15 @@ public void testMetadataLocationChange() throws Exception { checkFileNum(2, 1, 1, 5, result2); } - @Test + @TestTemplate public void testDeleteFrom() throws Exception { - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - properties.put("write.delete.mode", "merge-on-read"); - String tableName = "v2tbl"; + Map properties = + ImmutableMap.of( + TableProperties.FORMAT_VERSION, + String.valueOf(formatVersion), + "write.delete.mode", + "merge-on-read"); + String tableName = String.format("v%stbl", formatVersion); Table sourceTable = createMetastoreTable(newTableLocation(), properties, "default", tableName, 0); // ingest data @@ -1152,7 +1190,7 @@ public void testDeleteFrom() throws Exception { // register table String metadataLocation = currentMetadata(sourceTable).metadataFileLocation(); String versionFile = fileName(metadataLocation); - String targetTableName = "copiedV2Table"; + String targetTableName = String.format("copiedV%sTable", formatVersion); TableIdentifier tableIdentifier = TableIdentifier.of("default", targetTableName); catalog.registerTable(tableIdentifier, targetTableLocation() + "/metadata/" + versionFile); @@ -1168,7 +1206,7 @@ public void testDeleteFrom() throws Exception { assertEquals("Rows must match", originalData, copiedData); } - @Test + @TestTemplate public void testKryoDeserializeBroadcastValues() { sparkContext.getConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); RewriteTablePathSparkAction action = @@ -1179,8 +1217,11 @@ public void testKryoDeserializeBroadcastValues() { assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid()); } - @Test + @TestTemplate public void testNestedDirectoryStructurePreservation() throws Exception { + assumeThat(formatVersion) + .as("Can't add multiple DVs for the same data file in v3") + .isEqualTo(2); String sourceTableLocation = newTableLocation(); Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1); @@ -1217,12 +1258,18 @@ public void testNestedDirectoryStructurePreservation() throws Exception { DeleteFile positionDeletes1 = FileHelpers.writeDeleteFile( - sourceTable, sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1) + sourceTable, + sourceTable.io().newOutputFile(file1.toURI().toString()), + deletes1, + formatVersion) .first(); DeleteFile positionDeletes2 = FileHelpers.writeDeleteFile( - sourceTable, sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2) + sourceTable, + sourceTable.io().newOutputFile(file2.toURI().toString()), + deletes2, + formatVersion) .first(); sourceTable.newRowDelta().addDeletes(positionDeletes1).commit(); @@ -1266,7 +1313,7 @@ public void testNestedDirectoryStructurePreservation() throws Exception { assertThat(targetPath2).startsWith(targetTableLocation()); } - @Test + @TestTemplate public void testRewritePathWithoutCreateFileList() throws Exception { String targetTableLocation = targetTableLocation(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 6dac5d5da00d..0bcaf0af6581 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -39,6 +40,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -59,6 +63,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -80,10 +85,12 @@ import org.apache.spark.storage.BroadcastBlockId; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import scala.Tuple2; +@ExtendWith(ParameterizedTestExtension.class) public class TestRewriteTablePathsAction extends TestBase { @TempDir private Path staging; @@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase { @TempDir private Path newTableDir; @TempDir private Path targetTableDir; + @Parameters(name = "formatVersion = {0}") + protected static List formatVersions() { + return TestHelpers.V2_AND_ABOVE; + } + + @Parameter private int formatVersion; + protected ActionsProvider actions() { return SparkActions.get(); } @@ -135,7 +149,15 @@ protected Table createTableWithSnapshots( private Table createTableWithSnapshots( String location, int snapshotNumber, Map properties, String mode) { - Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); + Table newTable = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.builder() + .put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) + .putAll(properties) + .build(), + location); List records = Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); @@ -159,7 +181,7 @@ private void dropNameSpaces() { sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs); } - @Test + @TestTemplate public void testRewritePath() throws Exception { String targetTableLocation = targetTableLocation(); @@ -207,7 +229,7 @@ public void testRewritePath() throws Exception { assertEquals("Rows should match after copy", expected, actual); } - @Test + @TestTemplate public void testSameLocations() { assertThatThrownBy( () -> @@ -220,7 +242,7 @@ public void testSameLocations() { .hasMessageContaining("Source prefix cannot be the same as target prefix"); } - @Test + @TestTemplate public void testStartVersion() throws Exception { RewriteTablePath.Result result = actions() @@ -244,7 +266,7 @@ public void testStartVersion() throws Exception { .isEmpty(); } - @Test + @TestTemplate public void testIncrementalRewrite() throws Exception { String location = newTableLocation(); Table sourceTable = @@ -291,7 +313,7 @@ public void testIncrementalRewrite() throws Exception { assertEquals("Rows should match after copy", expected, actual); } - @Test + @TestTemplate public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) throws Exception { String location = newTableLocation(); @@ -316,7 +338,7 @@ public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path locat checkFileNum(3, 3, 3, 12, result1); } - @Test + @TestTemplate public void testFullTableRewritePath() throws Exception { RewriteTablePath.Result result = actions() @@ -327,7 +349,7 @@ public void testFullTableRewritePath() throws Exception { checkFileNum(3, 2, 2, 9, result); } - @Test + @TestTemplate public void testManifestRewriteAndIncrementalCopy() throws Exception { RewriteTablePath.Result initialResult = actions() @@ -354,7 +376,7 @@ public void testManifestRewriteAndIncrementalCopy() throws Exception { checkFileNum(1, 1, addedManifest, 3, postReweiteResult); } - @Test + @TestTemplate public void testDeleteDataFile() throws Exception { List validDataFiles = spark @@ -386,17 +408,17 @@ public void testDeleteDataFile() throws Exception { .hasSize(1); } - @Test + @TestTemplate public void testPositionDeletesParquet() throws Exception { runPositionDeletesTest("parquet"); } - @Test + @TestTemplate public void testPositionDeletesAvro() throws Exception { runPositionDeletesTest("avro"); } - @Test + @TestTemplate public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } @@ -427,7 +449,8 @@ private void runPositionDeletesTest(String fileFormat) throws Exception { FileHelpers.writeDeleteFile( tableWithPosDeletes, tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), - deletes) + deletes, + formatVersion) .first(); tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); @@ -454,7 +477,7 @@ private void runPositionDeletesTest(String fileFormat) throws Exception { .hasSize(1); } - @Test + @TestTemplate public void testPositionDeleteWithRow() throws Exception { String dataFileLocation = table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(); @@ -467,7 +490,8 @@ public void testPositionDeleteWithRow() throws Exception { .toURI() .toString()); deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", "AAAA")); - DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes); + DeleteFile positionDeletes = + FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes, formatVersion); table.newRowDelta().addDeletes(positionDeletes).commit(); assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1); @@ -486,18 +510,24 @@ public void testPositionDeleteWithRow() throws Exception { // copy the metadata files and data files copyTableFiles(result); - // check copied position delete row - Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; - assertEquals( - "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + // check copied position delete row - only v2 stores row data with position deletes + // v3+ uses Deletion Vectors (DV) which only store position information + if (formatVersion == 2) { + Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; + assertEquals( + "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + } // Positional delete affects a single row, so only one row must remain assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList()) .hasSize(1); } - @Test + @TestTemplate public void testPositionDeletesAcrossFiles() throws Exception { + assumeThat(formatVersion) + .as("Can't write multiple deletes into a single v3 delete file") + .isEqualTo(2); Stream allFiles = StreamSupport.stream(table.snapshots().spliterator(), false) .flatMap(s -> StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false)); @@ -510,7 +540,7 @@ public void testPositionDeletesAcrossFiles() throws Exception { File file = new File(removePrefix(table.location() + "/data/deeply/nested/file.parquet")); DeleteFile positionDeletes = FileHelpers.writeDeleteFile( - table, table.io().newOutputFile(file.toURI().toString()), deletes) + table, table.io().newOutputFile(file.toURI().toString()), deletes, formatVersion) .first(); table.newRowDelta().addDeletes(positionDeletes).commit(); @@ -535,7 +565,7 @@ public void testPositionDeletesAcrossFiles() throws Exception { .isEmpty(); } - @Test + @TestTemplate public void testEqualityDeletes() throws Exception { Table sourceTable = createTableWithSnapshots(newTableLocation(), 1); @@ -590,7 +620,7 @@ public void testEqualityDeletes() throws Exception { .hasSize(2); } - @Test + @TestTemplate public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { String location = newTableLocation(); Table sourceTable = createTableWithSnapshots(location, 2); @@ -636,7 +666,7 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { result); } - @Test + @TestTemplate public void testRewritePathWithoutSnapshot() throws Exception { RewriteTablePath.Result result = actions() @@ -649,7 +679,7 @@ public void testRewritePathWithoutSnapshot() throws Exception { checkFileNum(1, 0, 0, 1, result); } - @Test + @TestTemplate public void testExpireSnapshotBeforeRewrite() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -664,7 +694,7 @@ public void testExpireSnapshotBeforeRewrite() throws Exception { checkFileNum(4, 1, 2, 9, result); } - @Test + @TestTemplate public void testRewritePathWithNonLiveEntry() throws Exception { String location = newTableLocation(); // first overwrite generate 1 manifest and 1 data file @@ -729,7 +759,7 @@ public void testRewritePathWithNonLiveEntry() throws Exception { assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation); } - @Test + @TestTemplate public void testStartSnapshotWithoutValidSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -748,7 +778,7 @@ public void testStartSnapshotWithoutValidSnapshot() throws Exception { checkFileNum(2, 1, 1, 5, result); } - @Test + @TestTemplate public void testMoveTheVersionExpireSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -766,7 +796,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception { checkFileNum(1, 0, 0, 1, result); } - @Test + @TestTemplate public void testMoveVersionWithInvalidSnapshots() { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -785,7 +815,7 @@ public void testMoveVersionWithInvalidSnapshots() { + "Please choose an earlier version without invalid snapshots."); } - @Test + @TestTemplate public void testRollBack() throws Exception { long secondSnapshotId = table.currentSnapshot().snapshotId(); @@ -812,7 +842,7 @@ public void testRollBack() throws Exception { checkFileNum(6, 3, 3, 15, result); } - @Test + @TestTemplate public void testWriteAuditPublish() throws Exception { // enable WAP table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); @@ -837,7 +867,7 @@ public void testWriteAuditPublish() throws Exception { checkFileNum(5, 3, 3, 14, result); } - @Test + @TestTemplate public void testSchemaChange() throws Exception { // change the schema table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); @@ -854,7 +884,7 @@ public void testSchemaChange() throws Exception { checkFileNum(4, 2, 2, 10, result); } - @Test + @TestTemplate public void testSnapshotIdInheritanceEnabled() throws Exception { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); @@ -872,7 +902,7 @@ public void testSnapshotIdInheritanceEnabled() throws Exception { checkFileNum(3, 2, 2, 9, result); } - @Test + @TestTemplate public void testMetadataCompression() throws Exception { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); @@ -898,7 +928,7 @@ public void testMetadataCompression() throws Exception { checkFileNum(2, 2, 2, 8, result); } - @Test + @TestTemplate public void testInvalidArgs() { RewriteTablePath actions = actions().rewriteTablePath(table); @@ -931,12 +961,12 @@ public void testInvalidArgs() { .hasMessageContaining("End version('null') cannot be empty"); } - @Test + @TestTemplate public void testTableWithManyPartitionStatisticFile() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithPartStats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithPartStats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0, "c1"); @@ -963,12 +993,12 @@ public void testTableWithManyPartitionStatisticFile() throws IOException { result, "partition-stats", sourceTableLocation, targetTableLocation); } - @Test + @TestTemplate public void testTableWithManyStatisticFiles() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithmanystats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithmanystats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); @@ -992,12 +1022,12 @@ public void testTableWithManyStatisticFiles() throws IOException { iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result); } - @Test + @TestTemplate public void testStatisticsFileSourcePath() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithstats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithstats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 1); @@ -1020,13 +1050,17 @@ public void testStatisticsFileSourcePath() throws IOException { findAndAssertFileInFileList(result, ".stats", sourceTableLocation, targetTableLocation); } - @Test + @TestTemplate public void testMetadataCompressionWithMetastoreTable() throws Exception { Map properties = Maps.newHashMap(); properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); Table sourceTable = createMetastoreTable( - newTableLocation(), properties, "default", "testMetadataCompression", 2); + newTableLocation(), + properties, + "default", + String.format("v%sMetadataCompression", formatVersion), + 2); TableMetadata currentMetadata = currentMetadata(sourceTable); @@ -1054,10 +1088,11 @@ public void testMetadataCompressionWithMetastoreTable() throws Exception { } // Metastore table tests - @Test + @TestTemplate public void testMetadataLocationChange() throws Exception { + String tableName = String.format("v%stblWithLocation", formatVersion); Table sourceTable = - createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", "tbl", 1); + createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", tableName, 1); String metadataFilePath = currentMetadata(sourceTable).metadataFileLocation(); String newMetadataDir = "new-metadata-dir"; @@ -1066,7 +1101,7 @@ public void testMetadataLocationChange() throws Exception { .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + newMetadataDir) .commit(); - spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')"); + sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')", tableName); sourceTable.refresh(); // copy table @@ -1099,12 +1134,15 @@ public void testMetadataLocationChange() throws Exception { checkFileNum(2, 1, 1, 5, result2); } - @Test + @TestTemplate public void testDeleteFrom() throws Exception { - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - properties.put("write.delete.mode", "merge-on-read"); - String tableName = "v2tbl"; + Map properties = + ImmutableMap.of( + TableProperties.FORMAT_VERSION, + String.valueOf(formatVersion), + "write.delete.mode", + "merge-on-read"); + String tableName = String.format("v%stbl", formatVersion); Table sourceTable = createMetastoreTable(newTableLocation(), properties, "default", tableName, 0); // ingest data @@ -1152,7 +1190,7 @@ public void testDeleteFrom() throws Exception { // register table String metadataLocation = currentMetadata(sourceTable).metadataFileLocation(); String versionFile = fileName(metadataLocation); - String targetTableName = "copiedV2Table"; + String targetTableName = String.format("copiedV%sTable", formatVersion); TableIdentifier tableIdentifier = TableIdentifier.of("default", targetTableName); catalog.registerTable(tableIdentifier, targetTableLocation() + "/metadata/" + versionFile); @@ -1168,7 +1206,7 @@ public void testDeleteFrom() throws Exception { assertEquals("Rows must match", originalData, copiedData); } - @Test + @TestTemplate public void testKryoDeserializeBroadcastValues() { sparkContext.getConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); RewriteTablePathSparkAction action = @@ -1179,8 +1217,11 @@ public void testKryoDeserializeBroadcastValues() { assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid()); } - @Test + @TestTemplate public void testNestedDirectoryStructurePreservation() throws Exception { + assumeThat(formatVersion) + .as("Can't add multiple DVs for the same data file in v3") + .isEqualTo(2); String sourceTableLocation = newTableLocation(); Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1); @@ -1217,12 +1258,18 @@ public void testNestedDirectoryStructurePreservation() throws Exception { DeleteFile positionDeletes1 = FileHelpers.writeDeleteFile( - sourceTable, sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1) + sourceTable, + sourceTable.io().newOutputFile(file1.toURI().toString()), + deletes1, + formatVersion) .first(); DeleteFile positionDeletes2 = FileHelpers.writeDeleteFile( - sourceTable, sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2) + sourceTable, + sourceTable.io().newOutputFile(file2.toURI().toString()), + deletes2, + formatVersion) .first(); sourceTable.newRowDelta().addDeletes(positionDeletes1).commit(); @@ -1266,7 +1313,7 @@ public void testNestedDirectoryStructurePreservation() throws Exception { assertThat(targetPath2).startsWith(targetTableLocation()); } - @Test + @TestTemplate public void testRewritePathWithoutCreateFileList() throws Exception { String targetTableLocation = targetTableLocation(); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 6dac5d5da00d..0bcaf0af6581 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -39,6 +40,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -59,6 +63,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -80,10 +85,12 @@ import org.apache.spark.storage.BroadcastBlockId; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import scala.Tuple2; +@ExtendWith(ParameterizedTestExtension.class) public class TestRewriteTablePathsAction extends TestBase { @TempDir private Path staging; @@ -91,6 +98,13 @@ public class TestRewriteTablePathsAction extends TestBase { @TempDir private Path newTableDir; @TempDir private Path targetTableDir; + @Parameters(name = "formatVersion = {0}") + protected static List formatVersions() { + return TestHelpers.V2_AND_ABOVE; + } + + @Parameter private int formatVersion; + protected ActionsProvider actions() { return SparkActions.get(); } @@ -135,7 +149,15 @@ protected Table createTableWithSnapshots( private Table createTableWithSnapshots( String location, int snapshotNumber, Map properties, String mode) { - Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); + Table newTable = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.builder() + .put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) + .putAll(properties) + .build(), + location); List records = Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); @@ -159,7 +181,7 @@ private void dropNameSpaces() { sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs); } - @Test + @TestTemplate public void testRewritePath() throws Exception { String targetTableLocation = targetTableLocation(); @@ -207,7 +229,7 @@ public void testRewritePath() throws Exception { assertEquals("Rows should match after copy", expected, actual); } - @Test + @TestTemplate public void testSameLocations() { assertThatThrownBy( () -> @@ -220,7 +242,7 @@ public void testSameLocations() { .hasMessageContaining("Source prefix cannot be the same as target prefix"); } - @Test + @TestTemplate public void testStartVersion() throws Exception { RewriteTablePath.Result result = actions() @@ -244,7 +266,7 @@ public void testStartVersion() throws Exception { .isEmpty(); } - @Test + @TestTemplate public void testIncrementalRewrite() throws Exception { String location = newTableLocation(); Table sourceTable = @@ -291,7 +313,7 @@ public void testIncrementalRewrite() throws Exception { assertEquals("Rows should match after copy", expected, actual); } - @Test + @TestTemplate public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) throws Exception { String location = newTableLocation(); @@ -316,7 +338,7 @@ public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path locat checkFileNum(3, 3, 3, 12, result1); } - @Test + @TestTemplate public void testFullTableRewritePath() throws Exception { RewriteTablePath.Result result = actions() @@ -327,7 +349,7 @@ public void testFullTableRewritePath() throws Exception { checkFileNum(3, 2, 2, 9, result); } - @Test + @TestTemplate public void testManifestRewriteAndIncrementalCopy() throws Exception { RewriteTablePath.Result initialResult = actions() @@ -354,7 +376,7 @@ public void testManifestRewriteAndIncrementalCopy() throws Exception { checkFileNum(1, 1, addedManifest, 3, postReweiteResult); } - @Test + @TestTemplate public void testDeleteDataFile() throws Exception { List validDataFiles = spark @@ -386,17 +408,17 @@ public void testDeleteDataFile() throws Exception { .hasSize(1); } - @Test + @TestTemplate public void testPositionDeletesParquet() throws Exception { runPositionDeletesTest("parquet"); } - @Test + @TestTemplate public void testPositionDeletesAvro() throws Exception { runPositionDeletesTest("avro"); } - @Test + @TestTemplate public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } @@ -427,7 +449,8 @@ private void runPositionDeletesTest(String fileFormat) throws Exception { FileHelpers.writeDeleteFile( tableWithPosDeletes, tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), - deletes) + deletes, + formatVersion) .first(); tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); @@ -454,7 +477,7 @@ private void runPositionDeletesTest(String fileFormat) throws Exception { .hasSize(1); } - @Test + @TestTemplate public void testPositionDeleteWithRow() throws Exception { String dataFileLocation = table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(); @@ -467,7 +490,8 @@ public void testPositionDeleteWithRow() throws Exception { .toURI() .toString()); deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", "AAAA")); - DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes); + DeleteFile positionDeletes = + FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes, formatVersion); table.newRowDelta().addDeletes(positionDeletes).commit(); assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1); @@ -486,18 +510,24 @@ public void testPositionDeleteWithRow() throws Exception { // copy the metadata files and data files copyTableFiles(result); - // check copied position delete row - Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; - assertEquals( - "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + // check copied position delete row - only v2 stores row data with position deletes + // v3+ uses Deletion Vectors (DV) which only store position information + if (formatVersion == 2) { + Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; + assertEquals( + "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + } // Positional delete affects a single row, so only one row must remain assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList()) .hasSize(1); } - @Test + @TestTemplate public void testPositionDeletesAcrossFiles() throws Exception { + assumeThat(formatVersion) + .as("Can't write multiple deletes into a single v3 delete file") + .isEqualTo(2); Stream allFiles = StreamSupport.stream(table.snapshots().spliterator(), false) .flatMap(s -> StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false)); @@ -510,7 +540,7 @@ public void testPositionDeletesAcrossFiles() throws Exception { File file = new File(removePrefix(table.location() + "/data/deeply/nested/file.parquet")); DeleteFile positionDeletes = FileHelpers.writeDeleteFile( - table, table.io().newOutputFile(file.toURI().toString()), deletes) + table, table.io().newOutputFile(file.toURI().toString()), deletes, formatVersion) .first(); table.newRowDelta().addDeletes(positionDeletes).commit(); @@ -535,7 +565,7 @@ public void testPositionDeletesAcrossFiles() throws Exception { .isEmpty(); } - @Test + @TestTemplate public void testEqualityDeletes() throws Exception { Table sourceTable = createTableWithSnapshots(newTableLocation(), 1); @@ -590,7 +620,7 @@ public void testEqualityDeletes() throws Exception { .hasSize(2); } - @Test + @TestTemplate public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { String location = newTableLocation(); Table sourceTable = createTableWithSnapshots(location, 2); @@ -636,7 +666,7 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { result); } - @Test + @TestTemplate public void testRewritePathWithoutSnapshot() throws Exception { RewriteTablePath.Result result = actions() @@ -649,7 +679,7 @@ public void testRewritePathWithoutSnapshot() throws Exception { checkFileNum(1, 0, 0, 1, result); } - @Test + @TestTemplate public void testExpireSnapshotBeforeRewrite() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -664,7 +694,7 @@ public void testExpireSnapshotBeforeRewrite() throws Exception { checkFileNum(4, 1, 2, 9, result); } - @Test + @TestTemplate public void testRewritePathWithNonLiveEntry() throws Exception { String location = newTableLocation(); // first overwrite generate 1 manifest and 1 data file @@ -729,7 +759,7 @@ public void testRewritePathWithNonLiveEntry() throws Exception { assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation); } - @Test + @TestTemplate public void testStartSnapshotWithoutValidSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -748,7 +778,7 @@ public void testStartSnapshotWithoutValidSnapshot() throws Exception { checkFileNum(2, 1, 1, 5, result); } - @Test + @TestTemplate public void testMoveTheVersionExpireSnapshot() throws Exception { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -766,7 +796,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception { checkFileNum(1, 0, 0, 1, result); } - @Test + @TestTemplate public void testMoveVersionWithInvalidSnapshots() { // expire one snapshot actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); @@ -785,7 +815,7 @@ public void testMoveVersionWithInvalidSnapshots() { + "Please choose an earlier version without invalid snapshots."); } - @Test + @TestTemplate public void testRollBack() throws Exception { long secondSnapshotId = table.currentSnapshot().snapshotId(); @@ -812,7 +842,7 @@ public void testRollBack() throws Exception { checkFileNum(6, 3, 3, 15, result); } - @Test + @TestTemplate public void testWriteAuditPublish() throws Exception { // enable WAP table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); @@ -837,7 +867,7 @@ public void testWriteAuditPublish() throws Exception { checkFileNum(5, 3, 3, 14, result); } - @Test + @TestTemplate public void testSchemaChange() throws Exception { // change the schema table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); @@ -854,7 +884,7 @@ public void testSchemaChange() throws Exception { checkFileNum(4, 2, 2, 10, result); } - @Test + @TestTemplate public void testSnapshotIdInheritanceEnabled() throws Exception { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); @@ -872,7 +902,7 @@ public void testSnapshotIdInheritanceEnabled() throws Exception { checkFileNum(3, 2, 2, 9, result); } - @Test + @TestTemplate public void testMetadataCompression() throws Exception { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); @@ -898,7 +928,7 @@ public void testMetadataCompression() throws Exception { checkFileNum(2, 2, 2, 8, result); } - @Test + @TestTemplate public void testInvalidArgs() { RewriteTablePath actions = actions().rewriteTablePath(table); @@ -931,12 +961,12 @@ public void testInvalidArgs() { .hasMessageContaining("End version('null') cannot be empty"); } - @Test + @TestTemplate public void testTableWithManyPartitionStatisticFile() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithPartStats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithPartStats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0, "c1"); @@ -963,12 +993,12 @@ public void testTableWithManyPartitionStatisticFile() throws IOException { result, "partition-stats", sourceTableLocation, targetTableLocation); } - @Test + @TestTemplate public void testTableWithManyStatisticFiles() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithmanystats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithmanystats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); @@ -992,12 +1022,12 @@ public void testTableWithManyStatisticFiles() throws IOException { iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result); } - @Test + @TestTemplate public void testStatisticsFileSourcePath() throws IOException { String sourceTableLocation = newTableLocation(); - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - String tableName = "v2tblwithstats"; + Map properties = + ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + String tableName = String.format("v%stblwithstats", formatVersion); Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 1); @@ -1020,13 +1050,17 @@ public void testStatisticsFileSourcePath() throws IOException { findAndAssertFileInFileList(result, ".stats", sourceTableLocation, targetTableLocation); } - @Test + @TestTemplate public void testMetadataCompressionWithMetastoreTable() throws Exception { Map properties = Maps.newHashMap(); properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); Table sourceTable = createMetastoreTable( - newTableLocation(), properties, "default", "testMetadataCompression", 2); + newTableLocation(), + properties, + "default", + String.format("v%sMetadataCompression", formatVersion), + 2); TableMetadata currentMetadata = currentMetadata(sourceTable); @@ -1054,10 +1088,11 @@ public void testMetadataCompressionWithMetastoreTable() throws Exception { } // Metastore table tests - @Test + @TestTemplate public void testMetadataLocationChange() throws Exception { + String tableName = String.format("v%stblWithLocation", formatVersion); Table sourceTable = - createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", "tbl", 1); + createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", tableName, 1); String metadataFilePath = currentMetadata(sourceTable).metadataFileLocation(); String newMetadataDir = "new-metadata-dir"; @@ -1066,7 +1101,7 @@ public void testMetadataLocationChange() throws Exception { .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + newMetadataDir) .commit(); - spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')"); + sql("insert into hive.default.%s values (1, 'AAAAAAAAAA', 'AAAA')", tableName); sourceTable.refresh(); // copy table @@ -1099,12 +1134,15 @@ public void testMetadataLocationChange() throws Exception { checkFileNum(2, 1, 1, 5, result2); } - @Test + @TestTemplate public void testDeleteFrom() throws Exception { - Map properties = Maps.newHashMap(); - properties.put("format-version", "2"); - properties.put("write.delete.mode", "merge-on-read"); - String tableName = "v2tbl"; + Map properties = + ImmutableMap.of( + TableProperties.FORMAT_VERSION, + String.valueOf(formatVersion), + "write.delete.mode", + "merge-on-read"); + String tableName = String.format("v%stbl", formatVersion); Table sourceTable = createMetastoreTable(newTableLocation(), properties, "default", tableName, 0); // ingest data @@ -1152,7 +1190,7 @@ public void testDeleteFrom() throws Exception { // register table String metadataLocation = currentMetadata(sourceTable).metadataFileLocation(); String versionFile = fileName(metadataLocation); - String targetTableName = "copiedV2Table"; + String targetTableName = String.format("copiedV%sTable", formatVersion); TableIdentifier tableIdentifier = TableIdentifier.of("default", targetTableName); catalog.registerTable(tableIdentifier, targetTableLocation() + "/metadata/" + versionFile); @@ -1168,7 +1206,7 @@ public void testDeleteFrom() throws Exception { assertEquals("Rows must match", originalData, copiedData); } - @Test + @TestTemplate public void testKryoDeserializeBroadcastValues() { sparkContext.getConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); RewriteTablePathSparkAction action = @@ -1179,8 +1217,11 @@ public void testKryoDeserializeBroadcastValues() { assertThat(tableBroadcast.getValue().uuid()).isEqualTo(table.uuid()); } - @Test + @TestTemplate public void testNestedDirectoryStructurePreservation() throws Exception { + assumeThat(formatVersion) + .as("Can't add multiple DVs for the same data file in v3") + .isEqualTo(2); String sourceTableLocation = newTableLocation(); Table sourceTable = createTableWithSnapshots(sourceTableLocation, 1); @@ -1217,12 +1258,18 @@ public void testNestedDirectoryStructurePreservation() throws Exception { DeleteFile positionDeletes1 = FileHelpers.writeDeleteFile( - sourceTable, sourceTable.io().newOutputFile(file1.toURI().toString()), deletes1) + sourceTable, + sourceTable.io().newOutputFile(file1.toURI().toString()), + deletes1, + formatVersion) .first(); DeleteFile positionDeletes2 = FileHelpers.writeDeleteFile( - sourceTable, sourceTable.io().newOutputFile(file2.toURI().toString()), deletes2) + sourceTable, + sourceTable.io().newOutputFile(file2.toURI().toString()), + deletes2, + formatVersion) .first(); sourceTable.newRowDelta().addDeletes(positionDeletes1).commit(); @@ -1266,7 +1313,7 @@ public void testNestedDirectoryStructurePreservation() throws Exception { assertThat(targetPath2).startsWith(targetTableLocation()); } - @Test + @TestTemplate public void testRewritePathWithoutCreateFileList() throws Exception { String targetTableLocation = targetTableLocation();