diff --git a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java index 3ed770753734..94decdddad7a 100644 --- a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java +++ b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java @@ -45,4 +45,6 @@ public interface LocationProvider extends Serializable { * @return a fully-qualified location URI for a data file */ String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename); + + String dataLocationRoot(); } diff --git a/core/src/main/java/org/apache/iceberg/LocationProviders.java b/core/src/main/java/org/apache/iceberg/LocationProviders.java index 68bec2f4e4fc..0df31f888867 100644 --- a/core/src/main/java/org/apache/iceberg/LocationProviders.java +++ b/core/src/main/java/org/apache/iceberg/LocationProviders.java @@ -102,9 +102,14 @@ public String newDataLocation(PartitionSpec spec, StructLike partitionData, Stri public String newDataLocation(String filename) { return String.format("%s/%s", dataLocation, filename); } + + @Override + public String dataLocationRoot() { + return dataLocation; + } } - static class ObjectStoreLocationProvider implements LocationProvider { + public static class ObjectStoreLocationProvider implements LocationProvider { private static final HashFunction HASH_FUNC = Hashing.murmur3_32_fixed(); // Length of entropy generated in the file location @@ -228,5 +233,10 @@ private String dirsFromHash(String hash) { return hashWithDirs.toString(); } + + @Override + public String dataLocationRoot() { + return storageLocation; + } } } diff --git a/core/src/test/java/org/apache/iceberg/TestLocationProvider.java b/core/src/test/java/org/apache/iceberg/TestLocationProvider.java index 7edba51c3d85..ee3ff72f9059 100644 --- a/core/src/test/java/org/apache/iceberg/TestLocationProvider.java +++ b/core/src/test/java/org/apache/iceberg/TestLocationProvider.java @@ -57,6 +57,11 @@ public String newDataLocation(String filename) { public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { throw new RuntimeException("Test custom provider does not expect any invocation"); } + + @Override + public String dataLocationRoot() { + return tableLocation; + } } // publicly visible for testing to be dynamically loaded @@ -72,6 +77,11 @@ public String newDataLocation(String filename) { public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { throw new RuntimeException("Test custom provider does not expect any invocation"); } + + @Override + public String dataLocationRoot() { + return ""; + } } // publicly visible for testing to be dynamically loaded @@ -88,6 +98,11 @@ public String newDataLocation(String filename) { public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { throw new RuntimeException("Invalid provider should have not been instantiated!"); } + + @Override + public String dataLocationRoot() { + return ""; + } } // publicly visible for testing to be dynamically loaded diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 5fbb4117feb8..5d8d1521f7c2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; @@ -37,11 +38,13 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.iceberg.LocationProviders; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.actions.DeleteOrphanFiles; @@ -50,6 +53,7 @@ import org.apache.iceberg.hadoop.HiddenPathFilter; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -292,19 +296,77 @@ private Dataset validFileIdentDS() { private Dataset actualFileIdentDS() { StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities); + Dataset dataList; if (compareToFileList == null) { - return toFileURI.apply(listedFileDS()); + dataList = + table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix(); } else { - return toFileURI.apply(filteredCompareToFileList()); + dataList = filteredCompareToFileList(); } + + return toFileURI.apply(dataList); + } + + @VisibleForTesting + List listLocationWithPrefix(String location, PathFilter pathFilter) { + List matchingFiles = Lists.newArrayList(); + try { + Iterator iterator = + ((SupportsPrefixOperations) table.io()).listPrefix(location).iterator(); + while (iterator.hasNext()) { + org.apache.iceberg.io.FileInfo fileInfo = iterator.next(); + // NOTE: To avoid checking un necessary root folders, check the path relative to table + // location. + Path relativeFilePath = new Path(fileInfo.location().replace(location, "")); + if (fileInfo.createdAtMillis() < olderThanTimestamp + && pathFilter.accept(relativeFilePath)) { + matchingFiles.add(fileInfo.location()); + } + } + } catch (Exception e) { + if (!(e.getCause() instanceof FileNotFoundException)) { + throw e; + } + } + return matchingFiles; + } + + @VisibleForTesting + Dataset listWithPrefix() { + List matchingFiles = Lists.newArrayList(); + PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true); + + if (table.locationProvider() instanceof LocationProviders.ObjectStoreLocationProvider) { + // ObjectStoreLocationProvider generates hierarchical prefixes in a binary fashion + // (0000/, 0001/, 0010/, 0011/, ...). + // This allows us to parallelize listing operations across these prefixes. + List prefixes = + List.of( + "/0000", "/0001", "/0010", "/0011", "/0100", "/0101", "/0110", "/0111", "/1000", + "/1001", "/1010", "/1011", "/1100", "/1101", "/1110", "/1111"); + + String tableDataLocationRoot = table.locationProvider().dataLocationRoot(); + for (String prefix : prefixes) { + List result = listLocationWithPrefix(tableDataLocationRoot + prefix, pathFilter); + matchingFiles.addAll(result); + } + + } else { + matchingFiles.addAll(listLocationWithPrefix(location, pathFilter)); + } + + JavaRDD matchingFileRDD = sparkContext().parallelize(matchingFiles, 1); + return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()); } - private Dataset listedFileDS() { + @VisibleForTesting + Dataset listWithoutPrefix() { List subDirs = Lists.newArrayList(); List matchingFiles = Lists.newArrayList(); Predicate predicate = file -> file.getModificationTime() < olderThanTimestamp; - PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs()); + // don't check parent folders because it's already checked by recursive call + PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), false); // list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have // less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver @@ -330,7 +392,6 @@ private Dataset listedFileDS() { Broadcast conf = sparkContext().broadcast(hadoopConf); ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter); JavaRDD matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs); - JavaRDD completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD); return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()); } @@ -589,21 +650,42 @@ private FileURI toFileURI(I input) { static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable { private final Set hiddenPathPartitionNames; + private final boolean checkParents; - PartitionAwareHiddenPathFilter(Set hiddenPathPartitionNames) { + PartitionAwareHiddenPathFilter(Set hiddenPathPartitionNames, boolean checkParents) { this.hiddenPathPartitionNames = hiddenPathPartitionNames; + this.checkParents = checkParents; } @Override public boolean accept(Path path) { + if (!checkParents) { + return doAccept(path); + } + + // if any of the parent folders is not accepted then return false + return doAccept(path) && !hasHiddenPttParentFolder(path); + } + + private boolean doAccept(Path path) { return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path); } + /** + * Iterates through the parent folders if any of the parent folders of the given path is a + * hidden partition folder. + */ + public boolean hasHiddenPttParentFolder(Path path) { + return Stream.iterate(path, Path::getParent) + .takeWhile(Objects::nonNull) + .anyMatch(parentPath -> !doAccept(parentPath)); + } + private boolean isHiddenPartitionPath(Path path) { return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith); } - static PathFilter forSpecs(Map specs) { + static PathFilter forSpecs(Map specs, boolean checkParents) { if (specs == null) { return HiddenPathFilter.get(); } @@ -619,7 +701,7 @@ static PathFilter forSpecs(Map specs) { if (partitionNames.isEmpty()) { return HiddenPathFilter.get(); } else { - return new PartitionAwareHiddenPathFilter(partitionNames); + return new PartitionAwareHiddenPathFilter(partitionNames, checkParents); } } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index a0016a5e421a..b4c33f5eac12 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -515,6 +515,7 @@ public void testManyLeafPartitions() { @TestTemplate public void testHiddenPartitionPaths() { + // Schema schema = new Schema( optional(1, "c1", Types.IntegerType.get()), @@ -539,9 +540,12 @@ public void testHiddenPartitionPaths() { waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + // test list methods by directly instantiating the action + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); - DeleteOrphanFiles.Result result = - actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2); } @@ -575,9 +579,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() { waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + // test list methods by directly instantiating the action + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); - DeleteOrphanFiles.Result result = - actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2); } @@ -605,17 +612,23 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep Path dataPath = new Path(tableLocation + "/data"); FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt"); - fs.createNewFile(pathToFileInHiddenFolder); + fs.createNewFile(new Path(dataPath, "_c2_trunc/file.txt")); + Path pathToFileInHiddenFolder2 = new Path(dataPath, "_c2_trunc/subfolder/file.txt"); + fs.createNewFile(pathToFileInHiddenFolder2); waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + // test list methods by directly instantiating the action + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); - DeleteOrphanFiles.Result result = - actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty(); assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue(); + assertThat(fs.exists(pathToFileInHiddenFolder2)).isTrue(); } private List snapshotFiles(long snapshotId) { @@ -675,12 +688,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException { waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); - DeleteOrphanFiles.Result result = - actions - .deleteOrphanFiles(table) - .olderThan(System.currentTimeMillis()) - .deleteWith(s -> {}) - .execute(); + DeleteOrphanFilesSparkAction action = + actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {}); + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()) .as("Action should find 1 file") .isEqualTo(invalidFiles); @@ -713,8 +724,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException table.refresh(); - DeleteOrphanFiles.Result result = - SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); + DeleteOrphanFilesSparkAction action = + SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); + + DeleteOrphanFiles.Result result = action.execute(); assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1); @@ -830,37 +844,41 @@ public void testCompareToFileList() throws IOException { .withColumnRenamed("filePath", "file_path") .withColumnRenamed("lastModified", "last_modified"); - DeleteOrphanFiles.Result result1 = + DeleteOrphanFiles.Result deletedOrphanFiles1 = actions .deleteOrphanFiles(table) .compareToFileList(compareToFileList) .deleteWith(s -> {}) .execute(); - assertThat(result1.orphanFileLocations()) + assertThat(deletedOrphanFiles1.orphanFileLocations()) .as("Default olderThan interval should be safe") .isEmpty(); - DeleteOrphanFiles.Result result2 = + DeleteOrphanFiles.Result deletedOrphanFiles2 = actions .deleteOrphanFiles(table) .compareToFileList(compareToFileList) .olderThan(System.currentTimeMillis()) .deleteWith(s -> {}) .execute(); - assertThat(result2.orphanFileLocations()) + assertThat(deletedOrphanFiles2.orphanFileLocations()) .as("Action should find 1 file") .isEqualTo(invalidFilePaths); assertThat(fs.exists(new Path(invalidFilePaths.get(0)))) .as("Invalid file should be present") .isTrue(); - DeleteOrphanFiles.Result result3 = + DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction3 = actions .deleteOrphanFiles(table) .compareToFileList(compareToFileList) - .olderThan(System.currentTimeMillis()) - .execute(); - assertThat(result3.orphanFileLocations()) + .olderThan(System.currentTimeMillis()); + assertThatDatasetsAreEqualIgnoringOrder( + deleteOrphanFilesSparkAction3.listWithPrefix(), + deleteOrphanFilesSparkAction3.listWithoutPrefix()); + + DeleteOrphanFiles.Result deletedOrphanFiles3 = deleteOrphanFilesSparkAction3.execute(); + assertThat(deletedOrphanFiles3.orphanFileLocations()) .as("Action should delete 1 file") .isEqualTo(invalidFilePaths); assertThat(fs.exists(new Path(invalidFilePaths.get(0)))) @@ -885,13 +903,19 @@ public void testCompareToFileList() throws IOException { .withColumnRenamed("filePath", "file_path") .withColumnRenamed("lastModified", "last_modified"); - DeleteOrphanFiles.Result result4 = + DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction4 = actions .deleteOrphanFiles(table) .compareToFileList(compareToFileListWithOutsideLocation) - .deleteWith(s -> {}) - .execute(); - assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty(); + .deleteWith(s -> {}); + assertThatDatasetsAreEqualIgnoringOrder( + deleteOrphanFilesSparkAction4.listWithPrefix(), + deleteOrphanFilesSparkAction4.listWithoutPrefix()); + + DeleteOrphanFiles.Result deletedOrphanFiles4 = deleteOrphanFilesSparkAction4.execute(); + assertThat(deletedOrphanFiles4.orphanFileLocations()) + .as("Action should find nothing") + .isEmpty(); } protected long waitUntilAfter(long timestampMillis) { @@ -1067,6 +1091,33 @@ public void testRemoveOrphanFileActionWithDeleteMode() { DeleteOrphanFiles.PrefixMismatchMode.DELETE); } + @TestTemplate + public void testLocationProvider() { + ImmutableMap props = + ImmutableMap.of( + TableProperties.FORMAT_VERSION, + String.valueOf(formatVersion), + TableProperties.OBJECT_STORE_ENABLED, + "true"); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).truncate("c2", 2).identity("c3").build(); + Table table = TABLES.create(SCHEMA, spec, props, tableLocation); + + StructType structType = + new StructType() + .add("c1", DataTypes.IntegerType) + .add("c2", DataTypes.StringType) + .add("c3", DataTypes.StringType); + + List records = Lists.newArrayList(RowFactory.create(54321, "bbbb", "cccc")); + Dataset df = spark.createDataFrame(records, structType).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + DeleteOrphanFilesSparkAction action = + SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()); + // test list methods by directly instantiating the action + assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix()); + } + protected String randomName(String prefix) { return prefix + UUID.randomUUID().toString().replace("-", ""); } @@ -1100,4 +1151,11 @@ private void executeTest( spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); assertThat(orphanFiles).isEqualTo(expectedOrphanFiles); } + + private void assertThatDatasetsAreEqualIgnoringOrder( + Dataset actual, Dataset expected) { + assertThat(actual.collectAsList()) + .as("same as") + .containsExactlyInAnyOrderElementsOf(expected.collectAsList()); + } }