Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8543] Fixing Secondary Index Record generation to not rely on WriteStatus #12313

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
writeStatus.getStat().setPrevBaseFile(latestValidFilePath);

HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
new StoragePath(config.getBasePath()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
Expand Down Expand Up @@ -1056,17 +1055,17 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig());
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getWritesFileIdEncoding(),
dataWriteConfig.getMetadataConfig());

// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
if (dataWriteConfig.isRecordIndexEnabled()) {
HoodieData<HoodieRecord> updatesFromWriteStatuses = getRecordIndexUpserts(writeStatus);
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), updatesFromWriteStatuses.union(additionalUpdates));
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
}
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we still have the updateFromWriteStatuses method..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still rename writeStatus to writeStatuses. plural

Copy link
Contributor Author

@nsivabalan nsivabalan Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I have punted below changes to next patch.
a. Remove HoodieWriteDelegate from WriteStatus
b. Remove the update api in HoodieMetadataWriter totally.

Current patch ensures none of MDT record generation uses the RDD<.WriteStatus>.

I can incorporate the b in this patch if you prefer it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. we can follow up. but we should fix this for good.

return partitionToRecordMap;
});
closeInternal();
Expand All @@ -1080,7 +1079,8 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord>
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig());
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getWritesFileIdEncoding(), dataWriteConfig.getMetadataConfig());
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), records.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
Expand Down Expand Up @@ -1127,7 +1127,8 @@ private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime);
}

private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap,
String instantTime) {
// If write operation type based on commit metadata is COMPACT or CLUSTER then no need to update,
// because these operations do not change the secondary key - record key mapping.
if (commitMetadata.getOperationType() == WriteOperationType.COMPACT
Expand All @@ -1141,28 +1142,22 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, writeStatus);
secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, instantTime);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
}
partitionToRecordMap.put(partition, secondaryIndexRecords);
});
}

private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = getPartitionFilePairs(commitMetadata);
// Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of
// the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this
// operation is going to be expensive (in CPU, memory and IO)
List<String> keysToRemove = new ArrayList<>();
writeStatus.collectAsList().forEach(status -> {
status.getWrittenRecordDelegates().forEach(recordDelegate -> {
// Consider those keys which were either updated or deleted in this commit
if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) {
keysToRemove.add(recordDelegate.getRecordKey());
}
});
});
List<String> keysToRemove = HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(),
dataMetaClient, instantTime);

HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition);
// Fetch the secondary keys that each of the record keys ('keysToRemove') maps to
// This is obtained by scanning the entire secondary index partition in the metadata table
Expand Down Expand Up @@ -1667,51 +1662,6 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
}
}

/**
* Return records that represent upserts to the record index due to write operation on the dataset.
*
* @param writeStatuses {@code WriteStatus} from the write operation
*/
private HoodieData<HoodieRecord> getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
return writeStatuses.flatMap(writeStatus -> {
List<HoodieRecord> recordList = new LinkedList<>();
for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) {
if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
if (recordDelegate.getIgnoreIndexUpdate()) {
continue;
}
HoodieRecord hoodieRecord;
Option<HoodieRecordLocation> newLocation = recordDelegate.getNewLocation();
if (newLocation.isPresent()) {
if (recordDelegate.getCurrentLocation().isPresent()) {
// This is an update, no need to update index if the location has not changed
// newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's
// instantTime refers to the current commit which was completed.
if (!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId())) {
final String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.",
recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get());
LOG.error(msg);
throw new HoodieMetadataException(msg);
}
// for updates, we can skip updating RLI partition in MDT
} else {
// Insert new record case
hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(),
newLocation.get().getFileId(), newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
recordList.add(hoodieRecord);
}
} else {
// Delete existing index for a deleted record
hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
recordList.add(hoodieRecord);
}
}
}
return recordList.iterator();
});
}

private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
try (HoodieMetadataFileSystemView fsView = getMetadataView()) {
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = replaceCommitMetadata
Expand Down
Loading