diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index 1cd5d110de52..03b671d930f9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.storage.HoodieStorage; @@ -41,12 +42,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeSet; +import java.util.TreeMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -72,7 +73,6 @@ public class ConsistentBucketIndexUtils { * @param table Hoodie table * @param partition Table partition * @param numBuckets Default bucket number - * * @return Consistent hashing metadata */ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition, int numBuckets) { @@ -97,7 +97,7 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t } /** - * Loads hashing metadata of the given partition, if it does not exist, returns empty. + * Loads latest committed hashing metadata of the given partition, if it does not exist, returns empty. * * @param table Hoodie table * @param partition Table partition @@ -106,7 +106,6 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t public static Option loadMetadata(HoodieTable table, String partition) { HoodieTableMetaClient metaClient = table.getMetaClient(); StoragePath metadataPath = FSUtils.constructAbsolutePath(metaClient.getHashingMetadataPath(), partition); - StoragePath partitionPath = FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition); try { Predicate hashingMetaCommitFilePredicate = pathInfo -> { String filename = pathInfo.getPath().getName(); @@ -117,55 +116,74 @@ public static Option loadMetadata(HoodieTable t return filename.contains(HASHING_METADATA_FILE_SUFFIX); }; final List metaFiles = metaClient.getStorage().listDirectEntries(metadataPath); - final TreeSet commitMetaTss = metaFiles.stream().filter(hashingMetaCommitFilePredicate) - .map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName())) - .sorted() - .collect(Collectors.toCollection(TreeSet::new)); - final List hashingMetaFiles = metaFiles.stream().filter(hashingMetadataFilePredicate) - .sorted(Comparator.comparing(f -> f.getPath().getName())) + + final TreeMap> versionedHashMetadataFiles = metaFiles.stream() + .filter(hashingMetadataFilePredicate) + .map(metaFile -> { + String instantTime = HoodieConsistentHashingMetadata.getTimestampFromFile(metaFile.getPath().getName()); + return Pair.of(instantTime, Pair.of(metaFile, false)); + }) + .sorted(Collections.reverseOrder()) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (a, b) -> a, TreeMap::new)); + + metaFiles.stream().filter(hashingMetaCommitFilePredicate) + .forEach(commitFile -> { + String instantTime = HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()); + if (!versionedHashMetadataFiles.containsKey(instantTime)) { + // unexpect that the commit file exists but the corresponding metadata file does not + LOG.error("Commit file {} exists but the corresponding metadata file does not", commitFile.getPath().getName()); + throw new HoodieIndexException("Commit file: " + commitFile.getPath().getName() + " exists but the corresponding metadata file does not"); + } + versionedHashMetadataFiles.computeIfPresent(instantTime, (k, v) -> Pair.of(v.getLeft(), true)); + }); + + Option> latestCommittedMetaFile = Option.fromJavaOptional(versionedHashMetadataFiles.entrySet() + .stream() + .filter(entry -> entry.getValue().getRight()) + .map(entry -> Pair.of(entry.getKey(), entry.getValue().getLeft())) + .findFirst()); + + final List> uncommittedMetaFilesAfterLatestCommited = latestCommittedMetaFile + .map(pair -> versionedHashMetadataFiles.tailMap(pair.getLeft())) + .orElse(versionedHashMetadataFiles.tailMap(HoodieTimeline.INIT_INSTANT_TS, true)) + .entrySet() + .stream() + .map(entry -> Pair.of(entry.getKey(), entry.getValue().getLeft())) + .sorted(Comparator.comparing(Pair::getLeft)) .collect(Collectors.toList()); - // max committed metadata file - final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : commitMetaTss.last(); - // max updated metadata file - StoragePathInfo maxMetadataFile = hashingMetaFiles.isEmpty() - ? null - : hashingMetaFiles.get(hashingMetaFiles.size() - 1); - // If single file present in metadata and if its default file return it - if (maxMetadataFile != null && HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS)) { - return loadMetadataFromGivenFile(table, maxMetadataFile); - } - // if max updated metadata file and committed metadata file are same then return - if (maxCommitMetaFileTs != null && maxMetadataFile != null - && maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) { - return loadMetadataFromGivenFile(table, maxMetadataFile); + + if (uncommittedMetaFilesAfterLatestCommited.isEmpty()) { + // all metadata files are committed, pick the latest committed file's hash metadata + return latestCommittedMetaFile.map(pair -> loadMetadataFromGivenFile(table, pair.getRight())).orElse(Option.empty()); } - HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); - // fix the in-consistency between un-committed and committed hashing metadata files. - List fixed = new ArrayList<>(); - hashingMetaFiles.forEach(hashingMetaFile -> { - StoragePath path = hashingMetaFile.getPath(); - String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName()); - if (maxCommitMetaFileTs != null && timestamp.compareTo(maxCommitMetaFileTs) <= 0) { - // only fix the metadata with greater timestamp than max committed timestamp - return; - } - boolean isRehashingCommitted = completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS); - if (isRehashingCommitted) { - if (!commitMetaTss.contains(timestamp)) { - try { - createCommitMarker(table, path, partitionPath); - } catch (IOException e) { - throw new HoodieIOException("Exception while creating marker file " + path.getName() + " for partition " + partition, e); - } + // find that there are uncommitted metadata files after the latest committed metadata file, we need to resolve the inconsistency + HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); + // fix from the latest committed metadata file (exclusive) to the latest uncommitted metadata file (inclusive) + for (Pair pair : uncommittedMetaFilesAfterLatestCommited) { + String instantTime = pair.getLeft(); + StoragePathInfo hashMetadataPath = pair.getRight(); + /** check if the metadata file can be committed + * 1. the action corresponding to the file has already been committed on the timeline + * 2. the file is the first metadata file of the partition whose instant equals to {@link HoodieTimeline#INIT_INSTANT_TS} + */ + if (completedCommits.containsInstant(instantTime) || instantTime.equals(HoodieTimeline.INIT_INSTANT_TS)) { + try { + createCommitMarker(table, hashMetadataPath.getPath(), metadataPath); + } catch (IOException e) { + throw new HoodieIOException("Exception while creating marker file for hash metadata file: " + hashMetadataPath.getPath().getName() + " in partition " + partition, e); } - fixed.add(hashingMetaFile); - } else if (recommitMetadataFile(table, hashingMetaFile, partition)) { - fixed.add(hashingMetaFile); + // update the latest committed metadata file + latestCommittedMetaFile = Option.of(pair); + } else if (recommitMetadataFile(table, hashMetadataPath, partition)) { + // the un-initial hash metadata file exist but there is no corresponding commit file, and no corresponding completed commit on the active timeline + // recommit it to fix the inconsistency + // update the latest committed metadata file + latestCommittedMetaFile = Option.of(pair); } - }); - - return fixed.isEmpty() ? Option.empty() : loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1)); + } + // after fixing the inconsistency, return the latest committed metadata file + return latestCommittedMetaFile.map(pair -> loadMetadataFromGivenFile(table, pair.getRight())).orElse(Option.empty()); } catch (FileNotFoundException e) { return Option.empty(); } catch (IOException e) { @@ -203,17 +221,17 @@ public static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMet * * @param table Hoodie table * @param path File for which commit marker should be created - * @param partitionPath Partition path the file belongs to + * @param metadataPath Consistent-Bucket metadata path the file belongs to * @throws IOException */ - private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath partitionPath) throws IOException { + private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath metadataPath) throws IOException { HoodieStorage storage = table.getStorage(); - StoragePath fullPath = new StoragePath(partitionPath, + StoragePath fullPath = new StoragePath(metadataPath, getTimestampFromFile(path.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); if (storage.exists(fullPath)) { return; } - //prevent exception from race condition. We are ok with the file being created in another thread, so we should + // prevent exception from race condition. We are ok with the file being created in another thread, so we should // check for the marker after catching the exception and we don't need to fail if the file exists try { FileIOUtils.createFileInPath(storage, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING))); @@ -264,7 +282,7 @@ private static Option loadMetadataFromGivenFile * @return true if hashing metadata file is latest else false */ private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo metaFile, String partition) { - StoragePath partitionPath = FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition); + StoragePath metadataPath = FSUtils.constructAbsolutePath(table.getMetaClient().getHashingMetadataPath(), partition); String timestamp = getTimestampFromFile(metaFile.getPath().getName()); if (table.getPendingCommitsTimeline().containsInstant(timestamp)) { return false; @@ -282,7 +300,7 @@ private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo m if (table.getBaseFileOnlyView().getLatestBaseFiles(partition) .map(fileIdPrefix -> FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate)) { try { - createCommitMarker(table, metaFile.getPath(), partitionPath); + createCommitMarker(table, metaFile.getPath(), metadataPath); return true; } catch (IOException e) { throw new HoodieIOException("Exception while creating marker file " + metaFile.getPath().getName() + " for partition " + partition, e);