Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8590] fix: wrong file path for consistent-bucket-commit-marker-file #12344

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.storage.HoodieStorage;
Expand All @@ -41,12 +42,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -72,7 +73,6 @@ public class ConsistentBucketIndexUtils {
* @param table Hoodie table
* @param partition Table partition
* @param numBuckets Default bucket number
*
* @return Consistent hashing metadata
*/
public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition, int numBuckets) {
Expand All @@ -97,7 +97,7 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t
}

/**
* Loads hashing metadata of the given partition, if it does not exist, returns empty.
* Loads latest committed hashing metadata of the given partition, if it does not exist, returns empty.
*
* @param table Hoodie table
* @param partition Table partition
Expand All @@ -106,7 +106,6 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t
public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) {
HoodieTableMetaClient metaClient = table.getMetaClient();
StoragePath metadataPath = FSUtils.constructAbsolutePath(metaClient.getHashingMetadataPath(), partition);
StoragePath partitionPath = FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition);
try {
Predicate<StoragePathInfo> hashingMetaCommitFilePredicate = pathInfo -> {
String filename = pathInfo.getPath().getName();
Expand All @@ -117,55 +116,74 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t
return filename.contains(HASHING_METADATA_FILE_SUFFIX);
};
final List<StoragePathInfo> metaFiles = metaClient.getStorage().listDirectEntries(metadataPath);
final TreeSet<String> commitMetaTss = metaFiles.stream().filter(hashingMetaCommitFilePredicate)
.map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));
final List<StoragePathInfo> hashingMetaFiles = metaFiles.stream().filter(hashingMetadataFilePredicate)
.sorted(Comparator.comparing(f -> f.getPath().getName()))

final TreeMap<String/*instantTime*/, Pair<StoragePathInfo/*hash metadata file path*/, Boolean/*commited*/>> versionedHashMetadataFiles = metaFiles.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beyond1920 can you review, and add test cases please.

Copy link
Member Author

@TheR1sing3un TheR1sing3un Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beyond1920 can you review, and add test cases please.

Thanks! I will add more test cases to verify metadata correctness.

.filter(hashingMetadataFilePredicate)
.map(metaFile -> {
String instantTime = HoodieConsistentHashingMetadata.getTimestampFromFile(metaFile.getPath().getName());
return Pair.of(instantTime, Pair.of(metaFile, false));
})
.sorted(Collections.reverseOrder())
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (a, b) -> a, TreeMap::new));

metaFiles.stream().filter(hashingMetaCommitFilePredicate)
.forEach(commitFile -> {
String instantTime = HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName());
if (!versionedHashMetadataFiles.containsKey(instantTime)) {
// unexpect that the commit file exists but the corresponding metadata file does not
LOG.error("Commit file {} exists but the corresponding metadata file does not", commitFile.getPath().getName());
throw new HoodieIndexException("Commit file: " + commitFile.getPath().getName() + " exists but the corresponding metadata file does not");
}
versionedHashMetadataFiles.computeIfPresent(instantTime, (k, v) -> Pair.of(v.getLeft(), true));
});

Option<Pair<String/*instant*/, StoragePathInfo/*hash metadata file path*/>> latestCommittedMetaFile = Option.fromJavaOptional(versionedHashMetadataFiles.entrySet()
.stream()
.filter(entry -> entry.getValue().getRight())
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getLeft()))
.findFirst());

final List<Pair<String/*instant*/, StoragePathInfo/*hash metadata file path*/>> uncommittedMetaFilesAfterLatestCommited = latestCommittedMetaFile
.map(pair -> versionedHashMetadataFiles.tailMap(pair.getLeft()))
.orElse(versionedHashMetadataFiles.tailMap(HoodieTimeline.INIT_INSTANT_TS, true))
.entrySet()
.stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getLeft()))
.sorted(Comparator.comparing(Pair::getLeft))
.collect(Collectors.toList());
// max committed metadata file
final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : commitMetaTss.last();
// max updated metadata file
StoragePathInfo maxMetadataFile = hashingMetaFiles.isEmpty()
? null
: hashingMetaFiles.get(hashingMetaFiles.size() - 1);
// If single file present in metadata and if its default file return it
if (maxMetadataFile != null && HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS)) {
return loadMetadataFromGivenFile(table, maxMetadataFile);
}
// if max updated metadata file and committed metadata file are same then return
if (maxCommitMetaFileTs != null && maxMetadataFile != null
&& maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) {
return loadMetadataFromGivenFile(table, maxMetadataFile);

if (uncommittedMetaFilesAfterLatestCommited.isEmpty()) {
// all metadata files are committed, pick the latest committed file's hash metadata
return latestCommittedMetaFile.map(pair -> loadMetadataFromGivenFile(table, pair.getRight())).orElse(Option.empty());
}
HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();

// fix the in-consistency between un-committed and committed hashing metadata files.
List<StoragePathInfo> fixed = new ArrayList<>();
hashingMetaFiles.forEach(hashingMetaFile -> {
StoragePath path = hashingMetaFile.getPath();
String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
if (maxCommitMetaFileTs != null && timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
// only fix the metadata with greater timestamp than max committed timestamp
return;
}
boolean isRehashingCommitted = completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
if (isRehashingCommitted) {
if (!commitMetaTss.contains(timestamp)) {
try {
createCommitMarker(table, path, partitionPath);
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + path.getName() + " for partition " + partition, e);
}
// find that there are uncommitted metadata files after the latest committed metadata file, we need to resolve the inconsistency
HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
// fix from the latest committed metadata file (exclusive) to the latest uncommitted metadata file (inclusive)
for (Pair<String, StoragePathInfo> pair : uncommittedMetaFilesAfterLatestCommited) {
String instantTime = pair.getLeft();
StoragePathInfo hashMetadataPath = pair.getRight();
/** check if the metadata file can be committed
* 1. the action corresponding to the file has already been committed on the timeline
* 2. the file is the first metadata file of the partition whose instant equals to {@link HoodieTimeline#INIT_INSTANT_TS}
*/
if (completedCommits.containsInstant(instantTime) || instantTime.equals(HoodieTimeline.INIT_INSTANT_TS)) {
try {
createCommitMarker(table, hashMetadataPath.getPath(), metadataPath);
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file for hash metadata file: " + hashMetadataPath.getPath().getName() + " in partition " + partition, e);
}
fixed.add(hashingMetaFile);
} else if (recommitMetadataFile(table, hashingMetaFile, partition)) {
fixed.add(hashingMetaFile);
// update the latest committed metadata file
latestCommittedMetaFile = Option.of(pair);
} else if (recommitMetadataFile(table, hashMetadataPath, partition)) {
// the un-initial hash metadata file exist but there is no corresponding commit file, and no corresponding completed commit on the active timeline
// recommit it to fix the inconsistency
// update the latest committed metadata file
latestCommittedMetaFile = Option.of(pair);
}
});

return fixed.isEmpty() ? Option.empty() : loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1));
}
// after fixing the inconsistency, return the latest committed metadata file
return latestCommittedMetaFile.map(pair -> loadMetadataFromGivenFile(table, pair.getRight())).orElse(Option.empty());
} catch (FileNotFoundException e) {
return Option.empty();
} catch (IOException e) {
Expand Down Expand Up @@ -203,17 +221,17 @@ public static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMet
*
* @param table Hoodie table
* @param path File for which commit marker should be created
* @param partitionPath Partition path the file belongs to
* @param metadataPath Consistent-Bucket metadata path the file belongs to
* @throws IOException
*/
private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath partitionPath) throws IOException {
private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath metadataPath) throws IOException {
HoodieStorage storage = table.getStorage();
StoragePath fullPath = new StoragePath(partitionPath,
StoragePath fullPath = new StoragePath(metadataPath,
getTimestampFromFile(path.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX);
if (storage.exists(fullPath)) {
return;
}
//prevent exception from race condition. We are ok with the file being created in another thread, so we should
// prevent exception from race condition. We are ok with the file being created in another thread, so we should
// check for the marker after catching the exception and we don't need to fail if the file exists
try {
FileIOUtils.createFileInPath(storage, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
Expand Down Expand Up @@ -264,7 +282,7 @@ private static Option<HoodieConsistentHashingMetadata> loadMetadataFromGivenFile
* @return true if hashing metadata file is latest else false
*/
private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo metaFile, String partition) {
StoragePath partitionPath = FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
StoragePath metadataPath = FSUtils.constructAbsolutePath(table.getMetaClient().getHashingMetadataPath(), partition);
String timestamp = getTimestampFromFile(metaFile.getPath().getName());
if (table.getPendingCommitsTimeline().containsInstant(timestamp)) {
return false;
Expand All @@ -282,7 +300,7 @@ private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo m
if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
.map(fileIdPrefix -> FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate)) {
try {
createCommitMarker(table, metaFile.getPath(), partitionPath);
createCommitMarker(table, metaFile.getPath(), metadataPath);
return true;
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + metaFile.getPath().getName() + " for partition " + partition, e);
Expand Down
Loading