diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 79bcf1fc75d4..4f105fbdfe7c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -293,20 +293,28 @@ private Dataset validFileIdentDS() { private Dataset actualFileIdentDS() { StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities); + Dataset dataList; if (compareToFileList == null) { - return toFileURI.apply(listedFileDS()); + dataList = + table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix(); } else { - return toFileURI.apply(filteredCompareToFileList()); + dataList = filteredCompareToFileList(); } + + return toFileURI.apply(dataList); } - private Dataset listWithPrefix() { + @VisibleForTesting + Dataset listWithPrefix() { List matchingFiles = Lists.newArrayList(); + PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs()); + Iterator iterator = ((SupportsPrefixOperations) table.io()).listPrefix(location).iterator(); while (iterator.hasNext()) { org.apache.iceberg.io.FileInfo fileInfo = iterator.next(); - if (fileInfo.createdAtMillis() < olderThanTimestamp) { + if (fileInfo.createdAtMillis() < olderThanTimestamp + && pathFilter.accept(new Path(fileInfo.location()))) { matchingFiles.add(fileInfo.location()); } } @@ -314,7 +322,8 @@ private Dataset listWithPrefix() { return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()); } - private Dataset listWithoutPrefix() { + @VisibleForTesting + Dataset listWithoutPrefix() { List subDirs = Lists.newArrayList(); List matchingFiles = Lists.newArrayList(); @@ -349,14 +358,6 @@ private Dataset listWithoutPrefix() { return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()); } - private Dataset listedFileDS() { - if (table.io() instanceof SupportsPrefixOperations) { - return listWithPrefix(); - } else { - return listWithoutPrefix(); - } - } - private static void listDirRecursively( String dir, Predicate predicate, diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index a0016a5e421a..29c6035e1cba 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -575,9 +575,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() { waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + // test list methods by directly instantiating the action + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); - DeleteOrphanFiles.Result result = - actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2); } @@ -610,9 +613,12 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + // test list methods by directly instantiating the action + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); - DeleteOrphanFiles.Result result = - actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty(); assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue(); @@ -675,12 +681,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException { waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); - DeleteOrphanFiles.Result result = - actions - .deleteOrphanFiles(table) - .olderThan(System.currentTimeMillis()) - .deleteWith(s -> {}) - .execute(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {}); + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()) .as("Action should find 1 file") .isEqualTo(invalidFiles); @@ -713,8 +717,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException table.refresh(); - DeleteOrphanFiles.Result result = - SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFilesSparkAction action = + SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); + + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1); @@ -854,12 +861,14 @@ public void testCompareToFileList() throws IOException { .as("Invalid file should be present") .isTrue(); - DeleteOrphanFiles.Result result3 = + DeleteOrphanFilesSparkAction action3 = actions .deleteOrphanFiles(table) .compareToFileList(compareToFileList) - .olderThan(System.currentTimeMillis()) - .execute(); + .olderThan(System.currentTimeMillis()); + assertThatDatasetsAreEqualIgnoringOrder(action3.listWithPrefix(), action3.listWithoutPrefix()); + + DeleteOrphanFiles.Result result3 = action3.execute(); assertThat(result3.orphanFileLocations()) .as("Action should delete 1 file") .isEqualTo(invalidFilePaths); @@ -885,12 +894,14 @@ public void testCompareToFileList() throws IOException { .withColumnRenamed("filePath", "file_path") .withColumnRenamed("lastModified", "last_modified"); - DeleteOrphanFiles.Result result4 = + DeleteOrphanFilesSparkAction action4 = actions .deleteOrphanFiles(table) .compareToFileList(compareToFileListWithOutsideLocation) - .deleteWith(s -> {}) - .execute(); + .deleteWith(s -> {}); + assertThatDatasetsAreEqualIgnoringOrder(action4.listWithPrefix(), action4.listWithoutPrefix()); + + DeleteOrphanFiles.Result result4 = action4.execute(); assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty(); } @@ -1100,4 +1111,10 @@ private void executeTest( spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); assertThat(orphanFiles).isEqualTo(expectedOrphanFiles); } + + private void assertThatDatasetsAreEqualIgnoringOrder(Dataset actual, Dataset expected) { + assertThat(actual.collectAsList()) + .as("same as") + .containsExactlyInAnyOrderElementsOf(expected.collectAsList()); + } }