Skip to content

Commit

Permalink
Use SupportsPrefixOperations for Remove OrphanFile Procedure on Spark…
Browse files Browse the repository at this point in the history
… 3.5, improve naming

Co-authored-by: Rahil Chertara <[email protected]>
  • Loading branch information
Ismail Simsek and Rahil Chertara committed Jan 8, 2025
1 parent 42d3885 commit 2212f2a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -293,33 +294,49 @@ private Dataset<FileURI> validFileIdentDS() {

private Dataset<FileURI> actualFileIdentDS() {
StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
Dataset<String> dataList;
if (compareToFileList == null) {
return toFileURI.apply(listedFileDS());
dataList =
table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix();
} else {
return toFileURI.apply(filteredCompareToFileList());
dataList = filteredCompareToFileList();
}

return toFileURI.apply(dataList);
}

private Dataset<String> listWithPrefix() {
@VisibleForTesting
Dataset<String> listWithPrefix() {
List<String> matchingFiles = Lists.newArrayList();
// listPrefix only returns files. so we additionally need to check parent folders for each file
// in following example file itself is not filtered out,
// but it should be excluded due to its parent folder: `_c2_trunc`
// "/data/_c2_trunc/file.txt"
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true);

Iterator<org.apache.iceberg.io.FileInfo> iterator =
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
while (iterator.hasNext()) {
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
if (fileInfo.createdAtMillis() < olderThanTimestamp) {
// NOTE: check the path relative to table location. To avoid checking un necessary root
// folders
Path relativeFilePath = new Path(fileInfo.location().replace(location, ""));
if (fileInfo.createdAtMillis() < olderThanTimestamp && pathFilter.accept(relativeFilePath)) {
matchingFiles.add(fileInfo.location());
}
}
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}

private Dataset<String> listWithoutPrefix() {
@VisibleForTesting
Dataset<String> listWithoutPrefix() {
List<String> subDirs = Lists.newArrayList();
List<String> matchingFiles = Lists.newArrayList();

Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
// don't check parent folders because it's already checked by recursive call
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), false);

// list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
// less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
Expand Down Expand Up @@ -349,14 +366,6 @@ private Dataset<String> listWithoutPrefix() {
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}

private Dataset<String> listedFileDS() {
if (table.io() instanceof SupportsPrefixOperations) {
return listWithPrefix();
} else {
return listWithoutPrefix();
}
}

private static void listDirRecursively(
String dir,
Predicate<FileStatus> predicate,
Expand Down Expand Up @@ -611,21 +620,46 @@ private FileURI toFileURI(I input) {
static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {

private final Set<String> hiddenPathPartitionNames;
private final boolean checkParents;

PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames, boolean checkParents) {
this.hiddenPathPartitionNames = hiddenPathPartitionNames;
this.checkParents = checkParents;
}

@Override
public boolean accept(Path path) {
if (!checkParents) {
return doAccept(path);
}

// if any of the parent folders is not accepted then return false
if (hasHiddenPttParentFolder(path)) {
return false;
}

return doAccept(path);
}

private boolean doAccept(Path path) {
return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
}

/**
* Iterates through the parent folders if any of the parent folders of the given path is a
* hidden partition folder.
*/
public boolean hasHiddenPttParentFolder(Path path) {
return Stream.iterate(path, Path::getParent)
.takeWhile(Objects::nonNull)
.anyMatch(parentPath -> !doAccept(parentPath));
}

private boolean isHiddenPartitionPath(Path path) {
return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
}

static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
static PathFilter forSpecs(Map<Integer, PartitionSpec> specs, boolean checkParents) {
if (specs == null) {
return HiddenPathFilter.get();
}
Expand All @@ -641,7 +675,7 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
if (partitionNames.isEmpty()) {
return HiddenPathFilter.get();
} else {
return new PartitionAwareHiddenPathFilter(partitionNames);
return new PartitionAwareHiddenPathFilter(partitionNames, checkParents);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,12 @@ public void testHiddenPartitionPaths() {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
}
Expand Down Expand Up @@ -575,9 +578,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
}
Expand Down Expand Up @@ -605,17 +611,23 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep
Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt");
fs.createNewFile(pathToFileInHiddenFolder);
fs.createNewFile(new Path(dataPath, "_c2_trunc/file.txt"));
Path pathToFileInHiddenFolder2 = new Path(dataPath, "_c2_trunc/subfolder/file.txt");
fs.createNewFile(pathToFileInHiddenFolder2);

waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty();
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
assertThat(fs.exists(pathToFileInHiddenFolder2)).isTrue();
}

private List<String> snapshotFiles(long snapshotId) {
Expand Down Expand Up @@ -675,12 +687,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFiles.Result result =
actions
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
DeleteOrphanFiles.Result result = action.execute();
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
Expand Down Expand Up @@ -713,8 +723,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException

table.refresh();

DeleteOrphanFiles.Result result =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFilesSparkAction action =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1);

Expand Down Expand Up @@ -830,37 +843,41 @@ public void testCompareToFileList() throws IOException {
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result1 =
DeleteOrphanFiles.Result deletedOrphanFiles1 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.deleteWith(s -> {})
.execute();
assertThat(result1.orphanFileLocations())
assertThat(deletedOrphanFiles1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();

DeleteOrphanFiles.Result result2 =
DeleteOrphanFiles.Result deletedOrphanFiles2 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
assertThat(result2.orphanFileLocations())
assertThat(deletedOrphanFiles2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFilePaths);
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should be present")
.isTrue();

DeleteOrphanFiles.Result result3 =
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction3 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
assertThat(result3.orphanFileLocations())
.olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(
deleteOrphanFilesSparkAction3.listWithPrefix(),
deleteOrphanFilesSparkAction3.listWithoutPrefix());

DeleteOrphanFiles.Result deletedOrphanFiles3 = deleteOrphanFilesSparkAction3.execute();
assertThat(deletedOrphanFiles3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
Expand All @@ -885,13 +902,19 @@ public void testCompareToFileList() throws IOException {
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result4 =
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction4 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> {})
.execute();
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
.deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(
deleteOrphanFilesSparkAction4.listWithPrefix(),
deleteOrphanFilesSparkAction4.listWithoutPrefix());

DeleteOrphanFiles.Result deletedOrphanFiles4 = deleteOrphanFilesSparkAction4.execute();
assertThat(deletedOrphanFiles4.orphanFileLocations())
.as("Action should find nothing")
.isEmpty();
}

protected long waitUntilAfter(long timestampMillis) {
Expand Down Expand Up @@ -1100,4 +1123,11 @@ private void executeTest(
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
}

private void assertThatDatasetsAreEqualIgnoringOrder(
Dataset<String> actual, Dataset<String> expected) {
assertThat(actual.collectAsList())
.as("same as")
.containsExactlyInAnyOrderElementsOf(expected.collectAsList());
}
}

0 comments on commit 2212f2a

Please sign in to comment.