Skip to content

Commit

Permalink
fix: fix unable to load latest committed consistent-bucket-hash-metadata
Browse files Browse the repository at this point in the history
1. fix unable to load latest committed consistent-bucket-hash-metadata

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Nov 27, 2024
1 parent 9140bd3 commit df0fcce
Showing 1 changed file with 71 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.storage.HoodieStorage;
Expand All @@ -42,10 +43,13 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -72,7 +76,6 @@ public class ConsistentBucketIndexUtils {
* @param table Hoodie table
* @param partition Table partition
* @param numBuckets Default bucket number
*
* @return Consistent hashing metadata
*/
public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition, int numBuckets) {
Expand All @@ -97,7 +100,7 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t
}

/**
* Loads hashing metadata of the given partition, if it does not exist, returns empty.
* Loads latest committed hashing metadata of the given partition, if it does not exist, returns empty.
*
* @param table Hoodie table
* @param partition Table partition
Expand All @@ -116,55 +119,81 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t
return filename.contains(HASHING_METADATA_FILE_SUFFIX);
};
final List<StoragePathInfo> metaFiles = metaClient.getStorage().listDirectEntries(metadataPath);

final TreeMap<String/*instantTime*/, Pair<StoragePathInfo/*hash metadata file path*/, Boolean/*commited*/>> versionedHashMetadataFiles = metaFiles.stream()
.filter(hashingMetadataFilePredicate)
.map(metaFile -> {
String instantTime = HoodieConsistentHashingMetadata.getTimestampFromFile(metaFile.getPath().getName());
return Pair.of(instantTime, Pair.of(metaFile, false));
})
.sorted(Collections.reverseOrder())
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (a, b) -> a, TreeMap::new));

metaFiles.stream().filter(hashingMetaCommitFilePredicate)
.forEach(commitFile -> {
String instantTime = HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName());
if (!versionedHashMetadataFiles.containsKey(instantTime)) {
// unexpect that the commit file exists but the corresponding metadata file does not
LOG.error("Commit file {} exists but the corresponding metadata file does not", commitFile.getPath().getName());
throw new HoodieIndexException("Commit file: " + commitFile.getPath().getName() + " exists but the corresponding metadata file does not");
}
versionedHashMetadataFiles.computeIfPresent(instantTime, (k, v) -> Pair.of(v.getLeft(), true));
});
final TreeSet<String> commitMetaTss = metaFiles.stream().filter(hashingMetaCommitFilePredicate)
.map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.sorted(Comparator.reverseOrder())
.collect(Collectors.toCollection(TreeSet::new));
final List<StoragePathInfo> hashingMetaFiles = metaFiles.stream().filter(hashingMetadataFilePredicate)
.sorted(Comparator.comparing(f -> f.getPath().getName()))
.collect(Collectors.toList());
// max committed metadata file
final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : commitMetaTss.last();
// max updated metadata file
StoragePathInfo maxMetadataFile = hashingMetaFiles.isEmpty()
? null
: hashingMetaFiles.get(hashingMetaFiles.size() - 1);
// If single file present in metadata and if its default file return it
if (maxMetadataFile != null && HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS)) {
return loadMetadataFromGivenFile(table, maxMetadataFile);
}
// if max updated metadata file and committed metadata file are same then return
if (maxCommitMetaFileTs != null && maxMetadataFile != null
&& maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) {
return loadMetadataFromGivenFile(table, maxMetadataFile);

Option<Pair<String/*instant*/, StoragePathInfo/*hash metadata file path*/>> latestCommittedMetaFile = Option.fromJavaOptional(versionedHashMetadataFiles.entrySet()
.stream()
.filter(entry -> entry.getValue().getRight())
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getLeft()))
.findFirst());

final List<Pair<String/*instant*/, StoragePathInfo/*hash metadata file path*/>> uncommittedMetaFilesAfterLatestCommited = latestCommittedMetaFile
.map(pair -> versionedHashMetadataFiles.tailMap(pair.getLeft()))
.orElse(versionedHashMetadataFiles.tailMap(HoodieTimeline.INIT_INSTANT_TS, true))
.entrySet()
.stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getLeft()))
.sorted(Comparator.comparing(Pair::getLeft))
.collect(Collectors.toList());

if (uncommittedMetaFilesAfterLatestCommited.isEmpty()) {
// all metadata files are committed, pick the latest committed file's hash metadata
return latestCommittedMetaFile.map(pair -> loadMetadataFromGivenFile(table, pair.getRight())).orElse(Option.empty());
}
HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();

// fix the in-consistency between un-committed and committed hashing metadata files.
List<StoragePathInfo> fixed = new ArrayList<>();
hashingMetaFiles.forEach(hashingMetaFile -> {
StoragePath path = hashingMetaFile.getPath();
String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
if (maxCommitMetaFileTs != null && timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
// only fix the metadata with greater timestamp than max committed timestamp
return;
}
boolean isRehashingCommitted = completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
if (isRehashingCommitted) {
if (!commitMetaTss.contains(timestamp)) {
try {
createCommitMarker(table, path, metadataPath);
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + path.getName() + " for partition " + partition, e);
}
// find that there are uncommitted metadata files after the latest committed metadata file, we need to resolve the inconsistency
HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
// fix from the latest committed metadata file (exclusive) to the latest uncommitted metadata file (inclusive)
for (Pair<String, StoragePathInfo> pair : uncommittedMetaFilesAfterLatestCommited) {
String instantTime = pair.getLeft();
StoragePathInfo hashMetadataPath = pair.getRight();
/** check if the metadata file can be committed
* 1. the action corresponding to the file has already been committed on the timeline
* 2. the file is the first metadata file of the partition whose instant equals to {@link HoodieTimeline#INIT_INSTANT_TS}
*/
if (completedCommits.containsInstant(instantTime) || instantTime.equals(HoodieTimeline.INIT_INSTANT_TS)) {
try {
createCommitMarker(table, hashMetadataPath.getPath(), metadataPath);
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file for hash metadata file: " + hashMetadataPath.getPath().getName() + " in partition " + partition, e);
}
fixed.add(hashingMetaFile);
} else if (recommitMetadataFile(table, hashingMetaFile, partition)) {
fixed.add(hashingMetaFile);
// update the latest committed metadata file
latestCommittedMetaFile = Option.of(pair);
} else if (recommitMetadataFile(table, hashMetadataPath, partition)) {
// the un-initial hash metadata file exist but there is no corresponding commit file, and no corresponding completed commit on the active timeline
// recommit it to fix the inconsistency
// update the latest committed metadata file
latestCommittedMetaFile = Option.of(pair);
}
});

return fixed.isEmpty() ? Option.empty() : loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1));
}
// after fixing the inconsistency, return the latest committed metadata file
return latestCommittedMetaFile.map(pair -> loadMetadataFromGivenFile(table, pair.getRight())).orElse(Option.empty());
} catch (FileNotFoundException e) {
return Option.empty();
} catch (IOException e) {
Expand Down Expand Up @@ -212,7 +241,7 @@ private static void createCommitMarker(HoodieTable table, StoragePath path, Stor
if (storage.exists(fullPath)) {
return;
}
//prevent exception from race condition. We are ok with the file being created in another thread, so we should
// prevent exception from race condition. We are ok with the file being created in another thread, so we should
// check for the marker after catching the exception and we don't need to fail if the file exists
try {
FileIOUtils.createFileInPath(storage, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
Expand Down

0 comments on commit df0fcce

Please sign in to comment.