diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 75929dfde4c4..5647253a5b4c 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -37,7 +37,7 @@ import org.apache.iceberg.types.Types; /** - * @deprecated will be removed in 2.0.0; use {@link PlannedDataReader} instead. + * @deprecated will be removed in 1.12.0; use {@link PlannedDataReader} instead. */ @Deprecated public class DataReader implements DatumReader, SupportsRowPosition { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index fc0dc298ae3f..d6a13bcd515d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -51,8 +51,8 @@ import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -725,7 +725,7 @@ private static CloseableIterable positionDeletesReader( return Avro.read(inputFile) .project(deleteSchema) .reuseContainers() - .createReaderFunc(DataReader::create) + .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) .build(); case PARQUET: diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 8399ae5d520a..6dac5d5da00d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -387,28 +387,59 @@ public void testDeleteDataFile() throws Exception { } @Test - public void testPositionDeletes() throws Exception { + public void testPositionDeletesParquet() throws Exception { + runPositionDeletesTest("parquet"); + } + + @Test + public void testPositionDeletesAvro() throws Exception { + runPositionDeletesTest("avro"); + } + + @Test + public void testPositionDeletesOrc() throws Exception { + runPositionDeletesTest("orc"); + } + + private void runPositionDeletesTest(String fileFormat) throws Exception { + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletes").concat(fileFormat), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat)); + List> deletes = Lists.newArrayList( Pair.of( - table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(), + tableWithPosDeletes + .currentSnapshot() + .addedDataFiles(tableWithPosDeletes.io()) + .iterator() + .next() + .location(), 0L)); - File file = new File(removePrefix(table.location() + "/data/deeply/nested/deletes.parquet")); + File file = + new File( + removePrefix( + tableWithPosDeletes.location() + "/data/deeply/nested/deletes." + fileFormat)); DeleteFile positionDeletes = FileHelpers.writeDeleteFile( - table, table.io().newOutputFile(file.toURI().toString()), deletes) + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes) .first(); - table.newRowDelta().addDeletes(positionDeletes).commit(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); - assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1); + assertThat(spark.read().format("iceberg").load(tableWithPosDeletes.location()).collectAsList()) + .hasSize(1); RewriteTablePath.Result result = actions() - .rewriteTablePath(table) + .rewriteTablePath(tableWithPosDeletes) .stagingLocation(stagingLocation()) - .rewriteLocationPrefix(table.location(), targetTableLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) .execute(); // We have one more snapshot, an additional manifest list, and a new (delete) manifest, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index fc0dc298ae3f..d6a13bcd515d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -51,8 +51,8 @@ import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -725,7 +725,7 @@ private static CloseableIterable positionDeletesReader( return Avro.read(inputFile) .project(deleteSchema) .reuseContainers() - .createReaderFunc(DataReader::create) + .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) .build(); case PARQUET: diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 8399ae5d520a..6dac5d5da00d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -387,28 +387,59 @@ public void testDeleteDataFile() throws Exception { } @Test - public void testPositionDeletes() throws Exception { + public void testPositionDeletesParquet() throws Exception { + runPositionDeletesTest("parquet"); + } + + @Test + public void testPositionDeletesAvro() throws Exception { + runPositionDeletesTest("avro"); + } + + @Test + public void testPositionDeletesOrc() throws Exception { + runPositionDeletesTest("orc"); + } + + private void runPositionDeletesTest(String fileFormat) throws Exception { + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletes").concat(fileFormat), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat)); + List> deletes = Lists.newArrayList( Pair.of( - table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(), + tableWithPosDeletes + .currentSnapshot() + .addedDataFiles(tableWithPosDeletes.io()) + .iterator() + .next() + .location(), 0L)); - File file = new File(removePrefix(table.location() + "/data/deeply/nested/deletes.parquet")); + File file = + new File( + removePrefix( + tableWithPosDeletes.location() + "/data/deeply/nested/deletes." + fileFormat)); DeleteFile positionDeletes = FileHelpers.writeDeleteFile( - table, table.io().newOutputFile(file.toURI().toString()), deletes) + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes) .first(); - table.newRowDelta().addDeletes(positionDeletes).commit(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); - assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1); + assertThat(spark.read().format("iceberg").load(tableWithPosDeletes.location()).collectAsList()) + .hasSize(1); RewriteTablePath.Result result = actions() - .rewriteTablePath(table) + .rewriteTablePath(tableWithPosDeletes) .stagingLocation(stagingLocation()) - .rewriteLocationPrefix(table.location(), targetTableLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) .execute(); // We have one more snapshot, an additional manifest list, and a new (delete) manifest, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index fc0dc298ae3f..d6a13bcd515d 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -51,8 +51,8 @@ import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -725,7 +725,7 @@ private static CloseableIterable positionDeletesReader( return Avro.read(inputFile) .project(deleteSchema) .reuseContainers() - .createReaderFunc(DataReader::create) + .createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema)) .build(); case PARQUET: diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 8399ae5d520a..6dac5d5da00d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -387,28 +387,59 @@ public void testDeleteDataFile() throws Exception { } @Test - public void testPositionDeletes() throws Exception { + public void testPositionDeletesParquet() throws Exception { + runPositionDeletesTest("parquet"); + } + + @Test + public void testPositionDeletesAvro() throws Exception { + runPositionDeletesTest("avro"); + } + + @Test + public void testPositionDeletesOrc() throws Exception { + runPositionDeletesTest("orc"); + } + + private void runPositionDeletesTest(String fileFormat) throws Exception { + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletes").concat(fileFormat), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat)); + List> deletes = Lists.newArrayList( Pair.of( - table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(), + tableWithPosDeletes + .currentSnapshot() + .addedDataFiles(tableWithPosDeletes.io()) + .iterator() + .next() + .location(), 0L)); - File file = new File(removePrefix(table.location() + "/data/deeply/nested/deletes.parquet")); + File file = + new File( + removePrefix( + tableWithPosDeletes.location() + "/data/deeply/nested/deletes." + fileFormat)); DeleteFile positionDeletes = FileHelpers.writeDeleteFile( - table, table.io().newOutputFile(file.toURI().toString()), deletes) + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes) .first(); - table.newRowDelta().addDeletes(positionDeletes).commit(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); - assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1); + assertThat(spark.read().format("iceberg").load(tableWithPosDeletes.location()).collectAsList()) + .hasSize(1); RewriteTablePath.Result result = actions() - .rewriteTablePath(table) + .rewriteTablePath(tableWithPosDeletes) .stagingLocation(stagingLocation()) - .rewriteLocationPrefix(table.location(), targetTableLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) .execute(); // We have one more snapshot, an additional manifest list, and a new (delete) manifest,