Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SupportsPrefixOperations for Remove OrphanFile Procedure #11906

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -50,6 +51,7 @@
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
Expand Down Expand Up @@ -292,19 +294,49 @@ private Dataset<FileURI> validFileIdentDS() {

private Dataset<FileURI> actualFileIdentDS() {
StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
Dataset<String> dataList;
if (compareToFileList == null) {
return toFileURI.apply(listedFileDS());
dataList =
table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : listWithoutPrefix();
} else {
return toFileURI.apply(filteredCompareToFileList());
dataList = filteredCompareToFileList();
}

return toFileURI.apply(dataList);
}

@VisibleForTesting
Dataset<String> listWithPrefix() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to break up the key space, possibly by taking hints from what LocationProvider is configured for the table. A single listing is not scalable.

List<String> matchingFiles = Lists.newArrayList();
// listPrefix only returns files. so we additionally need to check parent folders for each file
// in following example file itself is not filtered out,
// but it should be excluded due to its parent folder: `_c2_trunc`
// "/data/_c2_trunc/file.txt"
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true);

Iterator<org.apache.iceberg.io.FileInfo> iterator =
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
while (iterator.hasNext()) {
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
// NOTE: check the path relative to table location. To avoid checking un necessary root
// folders
Path relativeFilePath = new Path(fileInfo.location().replace(location, ""));
Comment on lines +321 to +323
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating relative path to avoid checking parent folders of the table. however this replace(location, "")); might not be the best solution. open to any ideas

if (fileInfo.createdAtMillis() < olderThanTimestamp && pathFilter.accept(relativeFilePath)) {
matchingFiles.add(fileInfo.location());
}
}
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}

private Dataset<String> listedFileDS() {
@VisibleForTesting
Dataset<String> listWithoutPrefix() {
ismailsimsek marked this conversation as resolved.
Show resolved Hide resolved
List<String> subDirs = Lists.newArrayList();
List<String> matchingFiles = Lists.newArrayList();

Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
// don't check parent folders because it's already checked by recursive call
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), false);

// list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
// less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
Expand All @@ -330,7 +362,6 @@ private Dataset<String> listedFileDS() {
Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);

JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}
Expand Down Expand Up @@ -589,21 +620,42 @@ private FileURI toFileURI(I input) {
static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {

private final Set<String> hiddenPathPartitionNames;
private final boolean checkParents;

PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames, boolean checkParents) {
this.hiddenPathPartitionNames = hiddenPathPartitionNames;
this.checkParents = checkParents;
}

@Override
public boolean accept(Path path) {
if (!checkParents) {
return doAccept(path);
}

// if any of the parent folders is not accepted then return false
return doAccept(path) && !hasHiddenPttParentFolder(path);
}

private boolean doAccept(Path path) {
return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
}

/**
* Iterates through the parent folders if any of the parent folders of the given path is a
* hidden partition folder.
*/
public boolean hasHiddenPttParentFolder(Path path) {
return Stream.iterate(path, Path::getParent)
.takeWhile(Objects::nonNull)
.anyMatch(parentPath -> !doAccept(parentPath));
}
Comment on lines +648 to +652
Copy link
Contributor Author

@ismailsimsek ismailsimsek Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it will check parent folders per file, to ensure none of the parent folder is hiddenpartition folder. this might be less performant for large list, if performance is a concern.


private boolean isHiddenPartitionPath(Path path) {
return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
}

static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
static PathFilter forSpecs(Map<Integer, PartitionSpec> specs, boolean checkParents) {
if (specs == null) {
return HiddenPathFilter.get();
}
Expand All @@ -619,7 +671,7 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
if (partitionNames.isEmpty()) {
return HiddenPathFilter.get();
} else {
return new PartitionAwareHiddenPathFilter(partitionNames);
return new PartitionAwareHiddenPathFilter(partitionNames, checkParents);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,12 @@ public void testHiddenPartitionPaths() {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
}
Expand Down Expand Up @@ -575,9 +578,12 @@ public void testHiddenPartitionPathsWithPartitionEvolution() {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
}
Expand Down Expand Up @@ -605,17 +611,23 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep
Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt");
fs.createNewFile(pathToFileInHiddenFolder);
fs.createNewFile(new Path(dataPath, "_c2_trunc/file.txt"));
Path pathToFileInHiddenFolder2 = new Path(dataPath, "_c2_trunc/subfolder/file.txt");
fs.createNewFile(pathToFileInHiddenFolder2);

waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
// test list methods by directly instantiating the action
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
ismailsimsek marked this conversation as resolved.
Show resolved Hide resolved

DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty();
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
assertThat(fs.exists(pathToFileInHiddenFolder2)).isTrue();
}

private List<String> snapshotFiles(long snapshotId) {
Expand Down Expand Up @@ -675,12 +687,10 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFiles.Result result =
actions
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
DeleteOrphanFilesSparkAction action =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());
DeleteOrphanFiles.Result result = action.execute();
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
Expand Down Expand Up @@ -713,8 +723,11 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException

table.refresh();

DeleteOrphanFiles.Result result =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
DeleteOrphanFilesSparkAction action =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(action.listWithPrefix(), action.listWithoutPrefix());

DeleteOrphanFiles.Result result = action.execute();

assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1);

Expand Down Expand Up @@ -830,37 +843,41 @@ public void testCompareToFileList() throws IOException {
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result1 =
DeleteOrphanFiles.Result deletedOrphanFiles1 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.deleteWith(s -> {})
.execute();
assertThat(result1.orphanFileLocations())
assertThat(deletedOrphanFiles1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();

DeleteOrphanFiles.Result result2 =
DeleteOrphanFiles.Result deletedOrphanFiles2 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
assertThat(result2.orphanFileLocations())
assertThat(deletedOrphanFiles2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFilePaths);
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should be present")
.isTrue();

DeleteOrphanFiles.Result result3 =
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction3 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
assertThat(result3.orphanFileLocations())
.olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(
deleteOrphanFilesSparkAction3.listWithPrefix(),
deleteOrphanFilesSparkAction3.listWithoutPrefix());

DeleteOrphanFiles.Result deletedOrphanFiles3 = deleteOrphanFilesSparkAction3.execute();
assertThat(deletedOrphanFiles3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
Expand All @@ -885,13 +902,19 @@ public void testCompareToFileList() throws IOException {
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result4 =
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction4 =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> {})
.execute();
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
.deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(
deleteOrphanFilesSparkAction4.listWithPrefix(),
deleteOrphanFilesSparkAction4.listWithoutPrefix());

DeleteOrphanFiles.Result deletedOrphanFiles4 = deleteOrphanFilesSparkAction4.execute();
assertThat(deletedOrphanFiles4.orphanFileLocations())
.as("Action should find nothing")
.isEmpty();
}

protected long waitUntilAfter(long timestampMillis) {
Expand Down Expand Up @@ -1100,4 +1123,11 @@ private void executeTest(
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
}

private void assertThatDatasetsAreEqualIgnoringOrder(
Dataset<String> actual, Dataset<String> expected) {
assertThat(actual.collectAsList())
.as("same as")
.containsExactlyInAnyOrderElementsOf(expected.collectAsList());
}
}