From 69ff9a25cc43d6919d4fb4e82c3a7f0dccdae2a1 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 16 Nov 2024 12:59:59 -0800 Subject: [PATCH 1/6] Updating how RLI records are generated for MDT updates --- .../org/apache/hudi/io/HoodieMergeHandle.java | 1 + .../HoodieBackedTableMetadataWriter.java | 56 +----- .../client/functional/TestBaseFileUtils.java | 164 ++++++++++++++++++ .../hudi/common/model/HoodieWriteStat.java | 17 +- .../log/AbstractHoodieLogRecordScanner.java | 4 + .../log/HoodieUnMergedLogRecordScanner.java | 11 +- .../apache/hudi/metadata/BaseFileUtils.java | 95 ++++++++++ .../metadata/HoodieTableMetadataUtil.java | 105 +++++++++++ .../common/model/TestHoodieWriteStat.java | 6 + .../functional/RecordLevelIndexTestBase.scala | 4 +- .../functional/TestRecordLevelIndex.scala | 27 ++- 11 files changed, 431 insertions(+), 59 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 1bf6f6b01389..57bc6a9ecd61 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo try { String latestValidFilePath = baseFileToMerge.getFileName(); writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime()); + writeStatus.getStat().setPrevBaseFile(latestValidFilePath); HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime, new StoragePath(config.getBasePath()), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 256b509690be..ce9505a068f3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1056,14 +1056,14 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), - dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig()); + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getWritesFileIdEncoding(), + dataWriteConfig.getMetadataConfig()); // Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code // to the HoodieTableMetadataUtil class in hudi-common. if (dataWriteConfig.isRecordIndexEnabled()) { - HoodieData updatesFromWriteStatuses = getRecordIndexUpserts(writeStatus); - HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata); - partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), updatesFromWriteStatuses.union(additionalUpdates)); + HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata); + partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates)); } updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); @@ -1080,7 +1080,8 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), - dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig()); + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), + dataWriteConfig.getWritesFileIdEncoding(), dataWriteConfig.getMetadataConfig()); HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata); partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), records.union(additionalUpdates)); updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); @@ -1667,51 +1668,6 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map getRecordIndexUpserts(HoodieData writeStatuses) { - return writeStatuses.flatMap(writeStatus -> { - List recordList = new LinkedList<>(); - for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) { - if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) { - if (recordDelegate.getIgnoreIndexUpdate()) { - continue; - } - HoodieRecord hoodieRecord; - Option newLocation = recordDelegate.getNewLocation(); - if (newLocation.isPresent()) { - if (recordDelegate.getCurrentLocation().isPresent()) { - // This is an update, no need to update index if the location has not changed - // newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's - // instantTime refers to the current commit which was completed. - if (!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId())) { - final String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.", - recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get()); - LOG.error(msg); - throw new HoodieMetadataException(msg); - } - // for updates, we can skip updating RLI partition in MDT - } else { - // Insert new record case - hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate( - recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(), - newLocation.get().getFileId(), newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding()); - recordList.add(hoodieRecord); - } - } else { - // Delete existing index for a deleted record - hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey()); - recordList.add(hoodieRecord); - } - } - } - return recordList.iterator(); - }); - } - private HoodieData getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) { try (HoodieMetadataFileSystemView fsView = getMetadataView()) { List> partitionBaseFilePairs = replaceCommitMetadata diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java new file mode 100644 index 000000000000..bba81829ade2 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.BaseFileUtils; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestBaseFileUtils extends HoodieClientTestBase { + + @Test + public void testGeneratingRLIRecordsFromBaseFile() throws IOException { + + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + HoodieWriteConfig writeConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build(); + + // metadata enabled with only FILES partition + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // Insert + String commitTime = client.createNewInstantTime(); + List records1 = dataGen.generateInserts(commitTime, 100); + client.startCommitWithTime(commitTime); + List writeStatuses1 = client.insert(jsc.parallelize(records1, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses1); + + // assert RLI records for a base file from 1st commit + String finalCommitTime = commitTime; + Map recordKeyToPartitionMapping1 = new HashMap<>(); + Map recordKeyToFileIdMapping1 = new HashMap<>(); + Map> partitionToFileIdMapping = new HashMap<>(); + Map fileIdToFileNameMapping1 = new HashMap<>(); + writeStatuses1.forEach(writeStatus -> { + assertEquals(writeStatus.getStat().getNumDeletes(), 0); + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { + fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); + } + + Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); + String fileId = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getFileId(); + recordKeyToPartitionMapping1.put(key, partition); + recordKeyToFileIdMapping1.put(key, fileId); + partitionToFileIdMapping.computeIfAbsent(partition, new Function>() { + @Override + public Set apply(String s) { + Set fileIds = new HashSet<>(); + fileIds.add(s); + return fileIds; + } + }).add(fileId); + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + Map expectedRecordToPartitionMapping1 = new HashMap<>(); + records1.forEach(record -> expectedRecordToPartitionMapping1.put(record.getRecordKey(), record.getPartitionPath())); + + assertEquals(expectedRecordToPartitionMapping1, recordKeyToPartitionMapping1); + + // lets update some records and assert RLI records. + commitTime = client.createNewInstantTime(); + client.startCommitWithTime(commitTime); + String finalCommitTime2 = commitTime; + List deletes2 = dataGen.generateUniqueDeleteRecords(commitTime, 30); + List updates2 = dataGen.generateUniqueUpdates(commitTime, 30); + List inserts2 = dataGen.generateInserts(commitTime, 30); + List records2 = new ArrayList<>(); + records2.addAll(inserts2); + records2.addAll(updates2); + records2.addAll(deletes2); + + List writeStatuses2 = client.upsert(jsc.parallelize(records2, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses2); + + List expectedRLIInserts = inserts2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List expectedRLIDeletes = deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List actualRLIInserts = new ArrayList<>(); + List actualRLIDeletes = new ArrayList<>(); + + writeStatuses2.forEach(writeStatus -> { + assertTrue(writeStatus.getStat().getNumDeletes() != 0); + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping1.get(writeStatFileId)); + + Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), + writeConfig.getWritesFileIdEncoding(), finalCommitTime2, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualRLIDeletes.add(key); + } else { + actualRLIInserts.add(key); + } + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + Collections.sort(expectedRLIInserts); + Collections.sort(actualRLIInserts); + Collections.sort(expectedRLIDeletes); + Collections.sort(actualRLIDeletes); + assertEquals(expectedRLIInserts, actualRLIInserts); + assertEquals(expectedRLIDeletes, actualRLIDeletes); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 3c98a510317d..6d7ca6d51828 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -159,6 +159,9 @@ public class HoodieWriteStat implements Serializable { @Nullable private Long maxEventTime; + @Nullable + private String prevBaseFile; + @Nullable private RuntimeStats runtimeStats; @@ -327,6 +330,14 @@ public void setFileSizeInBytes(long fileSizeInBytes) { this.fileSizeInBytes = fileSizeInBytes; } + public String getPrevBaseFile() { + return prevBaseFile; + } + + public void setPrevBaseFile(String prevBaseFile) { + this.prevBaseFile = prevBaseFile; + } + public Long getMinEventTime() { return minEventTime; } @@ -370,9 +381,9 @@ public void setPath(StoragePath basePath, StoragePath path) { @Override public String toString() { return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit - + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites - + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath - + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats) + + '\'' + ", prevBaseFile=" + prevBaseFile + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + + ", numUpdateWrites=" + numUpdateWrites + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + + totalWriteErrors + ", tempPath='" + tempPath + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats) + '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords + ", totalLogFilesCompacted=" + totalLogFilesCompacted + ", totalLogSizeCompacted=" + totalLogSizeCompacted + ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + ", totalLogBlocks=" + totalLogBlocks diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 10aa50efe3e1..33fa3ebe2da5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -659,6 +659,10 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpec */ protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord); + public Set getDeletedRecordKeys() { + throw new HoodieException("Inherited class needs to override to provide a concrete implementation"); + } + /** * Process the set of log blocks belonging to the last instant which is read fully. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8b8d43449c67..661ef6cdaf85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -33,7 +33,10 @@ import org.apache.avro.Schema; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -42,6 +45,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { private final LogRecordScannerCallback callback; + private final Set deletedRecordKeys = new HashSet<>(); private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, @@ -83,7 +87,12 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws Except @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - // no - op + deletedRecordKeys.add(deleteRecord.getRecordKey()); + } + + @Override + public Set getDeletedRecordKeys() { + return deletedRecordKeys; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java new file mode 100644 index 000000000000..6361133750aa --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.FileFormatUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import static java.util.stream.Collectors.toList; + +public class BaseFileUtils { + + /** + * Generates RLI Metadata records for base files. + * If base file is a added to a new file group, all record keys are treated as inserts. + * If a base file is added to an existing file group, we read previous base file in addition to the latest base file of interest. Find deleted records and generate RLI Metadata records + * for the same in addition to new insert records. + * @param basePath base path of the table. + * @param writeStat {@link HoodieWriteStat} of interest. + * @param writesFileIdEncoding fileID encoding for the table. + * @param instantTime instant time of interest. + * @param storage instance of {@link HoodieStorage}. + * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition. + * @throws IOException + */ + public static Iterator generateRLIMetadataHoodieRecordsForBaseFile(String basePath, + HoodieWriteStat writeStat, + Integer writesFileIdEncoding, + String instantTime, + HoodieStorage storage) throws IOException { + String partition = writeStat.getPartitionPath(); + String latestFileName = writeStat.getPath().substring(writeStat.getPath().lastIndexOf("/") + 1); + String previousFileName = writeStat.getPrevBaseFile(); + String fileId = FSUtils.getFileId(latestFileName); + Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); + if (writeStat.getNumDeletes() == 0) { // if there are no deletes, reading only the file added as part of current commit metadata would suffice. + return new ArrayList(recordKeysFromLatestBaseFile).stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate((String) recordKey, partition, fileId, + instantTime, writesFileIdEncoding)).iterator(); + } else { + // read from previous base file and find difference to also generate delete records. + // we will return new inserts and deletes from this code block + Set recordKeysFromPreviousBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName); + List toReturn = recordKeysFromPreviousBaseFile.stream() + .filter(recordKey -> { + // deleted record + return !recordKeysFromLatestBaseFile.contains(recordKey); + }).map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()); + + toReturn.addAll(recordKeysFromLatestBaseFile.stream() + .filter(recordKey -> { + // new inserts + return !recordKeysFromPreviousBaseFile.contains(recordKey); + }).map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, + instantTime, writesFileIdEncoding)).collect(toList())); + return toReturn.iterator(); + } + } + + private static Set getRecordKeysFromBaseFile(HoodieStorage storage, String basePath, String partition, String fileName) throws IOException { + StoragePath dataFilePath = new StoragePath(basePath, StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR) + fileName); + FileFormatUtils fileFormatUtils = HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET); + return fileFormatUtils.readRowKeys(storage, dataFilePath); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index fab10c288376..53c4978bb645 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -46,6 +46,9 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.function.SerializableBiFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -54,6 +57,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -135,6 +139,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.stream.Collectors.toList; import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; @@ -366,6 +371,7 @@ public static Map> convertMetadataToRecords(Hoo boolean isColumnStatsIndexEnabled, int columnStatsIndexParallelism, List targetColumnsForColumnStatsIndex, + Integer writesFileIdEncoding, HoodieMetadataConfig metadataConfig) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = context.parallelize( @@ -387,6 +393,10 @@ public static Map> convertMetadataToRecords(Hoo final HoodieData partitionStatsRDD = convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient, metadataConfig); partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), partitionStatsRDD); } + if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) { + partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig, + dataMetaClient, writesFileIdEncoding, instantTime)); + } return partitionToRecordsMap; } @@ -759,6 +769,78 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi }); } + static HoodieData convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext, + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + int writesFileIdEncoding, + String instantTime) { + + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + + if (allWriteStats.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + try { + int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); + String basePath = dataTableMetaClient.getBasePath().toString(); + // we might need to set some additional variables if we need to process log files. + boolean anyLogFilesWithDeleteBlocks = allWriteStats.stream().anyMatch(writeStat -> { + String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); + return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0; + }); + Option latestCommitTimestamp = Option.empty(); + Option writerSchemaOpt = Option.empty(); + if (anyLogFilesWithDeleteBlocks) { + // if we have a log file w/ pure deletes. + latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getCompletionTime()); + writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); + } + int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); + StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); + Option finalWriterSchemaOpt = writerSchemaOpt; + Option finalLatestCommitTimestamp = latestCommitTimestamp; + HoodieData recordIndexRecords = engineContext.parallelize(allWriteStats, parallelism) + .flatMap(writeStat -> { + HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); + // handle base files + if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); + } else { + // for logs, we only need to process delete blocks for RLI + if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) { + Set deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient, + finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); + return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); + } + // ignore log file data blocks. + return new ArrayList().iterator(); + } + }); + + // there are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) + // lets do reduce by key to ignore the deleted entry. + return recordIndexRecords.mapToPair( + (SerializablePairFunction) t -> Pair.of(t.getKey(), t)) + .reduceByKey((SerializableBiFunction) (record1, record2) -> { + boolean isRecord1Deleted = record1.getData() instanceof EmptyHoodieRecordPayload; + boolean isRecord2Deleted = record2.getData() instanceof EmptyHoodieRecordPayload; + if (isRecord1Deleted && !isRecord2Deleted) { + return record2; + } else if (!isRecord1Deleted && isRecord2Deleted) { + return record1; + } else { + throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " + + record1.getData().toString() + ", record 2 : " + record2.getData().toString()); + } + }, parallelism).values(); + } catch (Exception e) { + throw new HoodieException("Failed to generate column stats records for metadata table", e); + } + } + private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, Map> partitionToFilesMap) { InstantGenerator factory = dataTableMetaClient.getInstantGenerator(); @@ -1311,6 +1393,29 @@ public static List> getLogFileColumnRangeM return Collections.emptyList(); } + private static Set getDeletedRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, int maxBufferSize, + String latestCommitTimestamp) throws IOException { + if (writerSchemaOpt.isPresent()) { + // read log file records without merging + List records = new ArrayList<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTimestamp) + .withReaderSchema(writerSchemaOpt.get()) + .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallback(records::add) + .build(); + scanner.scan(); + // HoodieUnMergedLogRecordScanner will expose deleted record keys + return scanner.getDeletedRecordKeys(); + } + return Collections.emptySet(); + } + /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by * the {@link LogicalTypes.Decimal} Avro logical type diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java index a99e48482990..54d6b7a7ab79 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java @@ -53,6 +53,12 @@ public void testSetPaths() { writeStat.setPath(basePath, finalizeFilePath); assertEquals(finalizeFilePath, new StoragePath(basePath, writeStat.getPath())); + // test prev base file + StoragePath prevBaseFilePath = new StoragePath(partitionPath, FSUtils.makeBaseFileName(instantTime, "2-0-3", fileName, + HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension())); + writeStat.setPrevBaseFile(prevBaseFilePath.toString()); + assertEquals(prevBaseFilePath.toString(), writeStat.getPrevBaseFile()); + // test for null tempFilePath writeStat = new HoodieWriteStat(); writeStat.setPath(basePath, finalizeFilePath); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 0254ff590e0b..596689a62497 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -321,9 +321,9 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { writeConfig.getRecordIndexMinFileGroupCount, writeConfig.getRecordIndexMaxFileGroupCount, writeConfig.getRecordIndexGrowthFactor, writeConfig.getRecordIndexMaxFileGroupSizeBytes) assertEquals(estimatedFileGroupCount, getFileGroupCountForRecordIndex(writeConfig)) - val prevDf = mergedDfList.last.drop("tip_history") + val prevDf = mergedDfList.last.drop("tip_history", "_hoodie_is_deleted") val nonMatchingRecords = readDf.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", - "_hoodie_partition_path", "_hoodie_file_name", "tip_history") + "_hoodie_partition_path", "_hoodie_file_name", "tip_history", "_hoodie_is_deleted") .join(prevDf, prevDf.columns, "leftanti") assertEquals(0, nonMatchingRecords.count()) assertEquals(readDf.count(), prevDf.count()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index f0333c515a12..f032eebb1cfb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -72,7 +72,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition() val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition() - // batch1 inserts + // batch1 inserts (5 records) val instantTime1 = getNewInstantTime() val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1, 5)).asScala.toSeq var operation = INSERT_OPERATION_OPT_VAL @@ -112,7 +112,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true") val instantTime3 = getNewInstantTime() - // batch3. updates to partition2 + // batch3. update 2 records from newly inserted records from commit 2 to partition2 val latestBatch3 = recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq val latestBatchDf3 = spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1)) latestBatchDf3.cache() @@ -241,7 +241,28 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { .save(basePath) val prevDf = mergedDfList.last mergedDfList = mergedDfList :+ prevDf.except(deleteDf) - validateDataAndRecordIndices(hudiOpts) + validateDataAndRecordIndices(hudiOpts, deleteDf) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + insertDf.cache() + + val deleteDf = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(dataGen.generateUniqueDeleteRecords(getNewInstantTime, 1)).asScala, 1)) + deleteDf.cache() + val recordKeyToDelete = deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String] + deleteDf.write.format("org.apache.hudi") + .options(hudiOpts) + .mode(SaveMode.Append) + .save(basePath) + val prevDf = mergedDfList.last + mergedDfList = mergedDfList :+ prevDf.filter(row => row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete) + validateDataAndRecordIndices(hudiOpts, deleteDf) } @ParameterizedTest From 6e281a3f53a941751ed1758245b5ffdef76967c8 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sun, 17 Nov 2024 10:37:12 -0800 Subject: [PATCH 2/6] Fixing tests --- .../HoodieBackedTableMetadataWriter.java | 1 - .../client/functional/TestBaseFileUtils.java | 164 -------------- .../functional/TestRLIRecordGeneration.java | 214 ++++++++++++++++++ .../log/HoodieUnMergedLogRecordScanner.java | 1 - .../apache/hudi/metadata/BaseFileUtils.java | 8 +- .../metadata/HoodieTableMetadataUtil.java | 12 +- 6 files changed, 224 insertions(+), 176 deletions(-) delete mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index ce9505a068f3..07313150ead1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieIndexMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordDelegate; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java deleted file mode 100644 index bba81829ade2..000000000000 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestBaseFileUtils.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.client.functional; - -import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.EmptyHoodieRecordPayload; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metadata.BaseFileUtils; -import org.apache.hudi.metadata.HoodieMetadataPayload; -import org.apache.hudi.testutils.HoodieClientTestBase; - -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestBaseFileUtils extends HoodieClientTestBase { - - @Test - public void testGeneratingRLIRecordsFromBaseFile() throws IOException { - - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - HoodieWriteConfig writeConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build(); - - // metadata enabled with only FILES partition - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { - // Insert - String commitTime = client.createNewInstantTime(); - List records1 = dataGen.generateInserts(commitTime, 100); - client.startCommitWithTime(commitTime); - List writeStatuses1 = client.insert(jsc.parallelize(records1, 1), commitTime).collect(); - assertNoWriteErrors(writeStatuses1); - - // assert RLI records for a base file from 1st commit - String finalCommitTime = commitTime; - Map recordKeyToPartitionMapping1 = new HashMap<>(); - Map recordKeyToFileIdMapping1 = new HashMap<>(); - Map> partitionToFileIdMapping = new HashMap<>(); - Map fileIdToFileNameMapping1 = new HashMap<>(); - writeStatuses1.forEach(writeStatus -> { - assertEquals(writeStatus.getStat().getNumDeletes(), 0); - // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { - fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); - } - - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), - writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); - String key = rliRecord.getRecordKey(); - String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); - String fileId = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getFileId(); - recordKeyToPartitionMapping1.put(key, partition); - recordKeyToFileIdMapping1.put(key, fileId); - partitionToFileIdMapping.computeIfAbsent(partition, new Function>() { - @Override - public Set apply(String s) { - Set fileIds = new HashSet<>(); - fileIds.add(s); - return fileIds; - } - }).add(fileId); - } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); - } - }); - - Map expectedRecordToPartitionMapping1 = new HashMap<>(); - records1.forEach(record -> expectedRecordToPartitionMapping1.put(record.getRecordKey(), record.getPartitionPath())); - - assertEquals(expectedRecordToPartitionMapping1, recordKeyToPartitionMapping1); - - // lets update some records and assert RLI records. - commitTime = client.createNewInstantTime(); - client.startCommitWithTime(commitTime); - String finalCommitTime2 = commitTime; - List deletes2 = dataGen.generateUniqueDeleteRecords(commitTime, 30); - List updates2 = dataGen.generateUniqueUpdates(commitTime, 30); - List inserts2 = dataGen.generateInserts(commitTime, 30); - List records2 = new ArrayList<>(); - records2.addAll(inserts2); - records2.addAll(updates2); - records2.addAll(deletes2); - - List writeStatuses2 = client.upsert(jsc.parallelize(records2, 1), commitTime).collect(); - assertNoWriteErrors(writeStatuses2); - - List expectedRLIInserts = inserts2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); - List expectedRLIDeletes = deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); - List actualRLIInserts = new ArrayList<>(); - List actualRLIDeletes = new ArrayList<>(); - - writeStatuses2.forEach(writeStatus -> { - assertTrue(writeStatus.getStat().getNumDeletes() != 0); - // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping1.get(writeStatFileId)); - - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), - writeConfig.getWritesFileIdEncoding(), finalCommitTime2, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); - String key = rliRecord.getRecordKey(); - if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { - actualRLIDeletes.add(key); - } else { - actualRLIInserts.add(key); - } - } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); - } - }); - - Collections.sort(expectedRLIInserts); - Collections.sort(actualRLIInserts); - Collections.sort(expectedRLIDeletes); - Collections.sort(actualRLIDeletes); - assertEquals(expectedRLIInserts, actualRLIInserts); - assertEquals(expectedRLIDeletes, actualRLIDeletes); - } - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java new file mode 100644 index 000000000000..9ad4003525b7 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.BaseFileUtils; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestRLIRecordGeneration extends HoodieClientTestBase { + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) throws IOException { + cleanupClients(); + initMetaClient(tableType); + cleanupTimelineService(); + initTimelineService(); + + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + HoodieWriteConfig writeConfig = tableType == HoodieTableType.COPY_ON_WRITE ? getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build() + : getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2) + .withInlineCompaction(true) + .compactionSmallFileSize(0).build()).build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // Insert + String commitTime = client.createNewInstantTime(); + List records1 = dataGen.generateInserts(commitTime, 100); + client.startCommitWithTime(commitTime); + List writeStatuses1 = client.insert(jsc.parallelize(records1, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses1); + + // assert RLI records for a base file from 1st commit + String finalCommitTime = commitTime; + Map recordKeyToPartitionMapping1 = new HashMap<>(); + Map fileIdToFileNameMapping1 = new HashMap<>(); + writeStatuses1.forEach(writeStatus -> { + assertEquals(writeStatus.getStat().getNumDeletes(), 0); + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { + fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); + } + + Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); + recordKeyToPartitionMapping1.put(key, partition); + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + Map expectedRecordToPartitionMapping1 = new HashMap<>(); + records1.forEach(record -> expectedRecordToPartitionMapping1.put(record.getRecordKey(), record.getPartitionPath())); + + assertEquals(expectedRecordToPartitionMapping1, recordKeyToPartitionMapping1); + + // lets update some records and assert RLI records. + commitTime = client.createNewInstantTime(); + client.startCommitWithTime(commitTime); + String finalCommitTime2 = commitTime; + List deletes2 = dataGen.generateUniqueDeleteRecords(commitTime, 30); + List updates2 = dataGen.generateUniqueUpdates(commitTime, 30); + List inserts2 = dataGen.generateInserts(commitTime, 30); + List records2 = new ArrayList<>(); + records2.addAll(inserts2); + records2.addAll(updates2); + records2.addAll(deletes2); + + List writeStatuses2 = client.upsert(jsc.parallelize(records2, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses2); + + if (tableType == HoodieTableType.COPY_ON_WRITE) { + List expectedRLIInserts = inserts2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List expectedRLIDeletes = deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, finalCommitTime2, writeConfig, expectedRLIInserts, expectedRLIDeletes); + } else { + // trigger 2nd commit followed by compaction. + commitTime = client.createNewInstantTime(); + client.startCommitWithTime(commitTime); + String finalCommitTime3 = commitTime; + List deletes3 = dataGen.generateUniqueDeleteRecords(commitTime, 30); + List updates3 = dataGen.generateUniqueUpdates(commitTime, 30); + List inserts3 = dataGen.generateInserts(commitTime, 30); + List records3 = new ArrayList<>(); + records3.addAll(inserts3); + records3.addAll(updates3); + records3.addAll(deletes3); + + List writeStatuses3 = client.upsert(jsc.parallelize(records3, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses3); + + // trigger compaction + Option compactionInstantOpt = client.scheduleCompaction(Option.empty()); + assertTrue(compactionInstantOpt.isPresent()); + List compactionWriteStats = (List) client.compact(compactionInstantOpt.get()).getWriteStats().get(); + + List expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + // since deletes are lazily realized when compaction kicks in, we need to account for deletes from 2nd commit as well. + expectedRLIDeletes.addAll(deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList())); + List actualRLIDeletes = new ArrayList<>(); + + compactionWriteStats.forEach(writeStat -> { + try { + Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat, + writeConfig.getWritesFileIdEncoding(), finalCommitTime3, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualRLIDeletes.add(key); + } + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + // it may not be easy to assert inserts to RLI. bcoz, if there are no deletes, both inserts and updates to data table result in RLI records. + // but if there are deleted, we only ingest inserts and deletes from data table to RLI partition. + Collections.sort(expectedRLIDeletes); + Collections.sort(actualRLIDeletes); + assertEquals(expectedRLIDeletes, actualRLIDeletes); + } + } + } + + private void generateRliRecordsAndAssert(List writeStatuses, Map fileIdToFileNameMapping, String commitTime, + HoodieWriteConfig writeConfig, List expectedRLIInserts, List expectedRLIDeletes) { + List actualRLIDeletes = new ArrayList<>(); + List actualRLIInserts = new ArrayList<>(); + writeStatuses.forEach(writeStatus -> { + assertTrue(writeStatus.getStat().getNumDeletes() != 0); + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); + + Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), + writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualRLIDeletes.add(key); + } else { + actualRLIInserts.add(key); + } + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + Collections.sort(expectedRLIInserts); + Collections.sort(actualRLIInserts); + Collections.sort(expectedRLIDeletes); + Collections.sort(actualRLIDeletes); + assertEquals(expectedRLIInserts, actualRLIInserts); + assertEquals(expectedRLIDeletes, actualRLIDeletes); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 661ef6cdaf85..09b767f04a88 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -33,7 +33,6 @@ import org.apache.avro.Schema; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java index 6361133750aa..20038fdf840d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java @@ -60,11 +60,12 @@ public static Iterator generateRLIMetadataHoodieRecordsForBaseFile String instantTime, HoodieStorage storage) throws IOException { String partition = writeStat.getPartitionPath(); - String latestFileName = writeStat.getPath().substring(writeStat.getPath().lastIndexOf("/") + 1); + String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); String previousFileName = writeStat.getPrevBaseFile(); String fileId = FSUtils.getFileId(latestFileName); Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); if (writeStat.getNumDeletes() == 0) { // if there are no deletes, reading only the file added as part of current commit metadata would suffice. + // this means that both inserts and updates from latest base file might result in RLI records. return new ArrayList(recordKeysFromLatestBaseFile).stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate((String) recordKey, partition, fileId, instantTime, writesFileIdEncoding)).iterator(); } else { @@ -81,8 +82,9 @@ public static Iterator generateRLIMetadataHoodieRecordsForBaseFile .filter(recordKey -> { // new inserts return !recordKeysFromPreviousBaseFile.contains(recordKey); - }).map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, - instantTime, writesFileIdEncoding)).collect(toList())); + }).map(recordKey -> + HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, + instantTime, writesFileIdEncoding)).collect(toList())); return toReturn.iterator(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 53c4978bb645..c1f561a705a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -793,10 +793,9 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine }); Option latestCommitTimestamp = Option.empty(); Option writerSchemaOpt = Option.empty(); - if (anyLogFilesWithDeleteBlocks) { - // if we have a log file w/ pure deletes. - latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getCompletionTime()); - writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); + if (anyLogFilesWithDeleteBlocks) { // if we have a log file w/ pure deletes. + latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()); + writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); } int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); @@ -807,8 +806,8 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); // handle base files if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); - } else { + return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); + } else { // for logs, we only need to process delete blocks for RLI if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) { Set deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient, @@ -1407,7 +1406,6 @@ private static Set getDeletedRecordKeys(String filePath, HoodieTableMeta .withLatestInstantTime(latestCommitTimestamp) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient) - .withLogRecordScannerCallback(records::add) .build(); scanner.scan(); // HoodieUnMergedLogRecordScanner will expose deleted record keys From a8f2488058711c4eff1421e2dbdda4965a0cfeea Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 19 Nov 2024 16:19:50 -0800 Subject: [PATCH 3/6] addressing comments --- .../log/AbstractHoodieLogRecordScanner.java | 4 --- .../log/HoodieUnMergedLogRecordScanner.java | 34 +++++++++++++------ .../metadata/HoodieTableMetadataUtil.java | 19 ++++++----- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 33fa3ebe2da5..10aa50efe3e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -659,10 +659,6 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpec */ protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord); - public Set getDeletedRecordKeys() { - throw new HoodieException("Inherited class needs to override to provide a concrete implementation"); - } - /** * Process the set of log blocks belonging to the last instant which is read fully. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 09b767f04a88..e807be2bcee1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; @@ -33,9 +34,7 @@ import org.apache.avro.Schema; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; /** @@ -44,17 +43,19 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { private final LogRecordScannerCallback callback; - private final Set deletedRecordKeys = new HashSet<>(); + private final CallbackForDeletedKeys callbackForDeletedKeys; private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema, + LogRecordScannerCallback callback, CallbackForDeletedKeys callbackForDeletedKeys, + Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; + this.callbackForDeletedKeys = callbackForDeletedKeys; } /** @@ -86,12 +87,9 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws Except @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - deletedRecordKeys.add(deleteRecord.getRecordKey()); - } - - @Override - public Set getDeletedRecordKeys() { - return deletedRecordKeys; + if (callbackForDeletedKeys != null) { + callbackForDeletedKeys.apply(deleteRecord.getHoodieKey()); + } } /** @@ -103,6 +101,14 @@ public interface LogRecordScannerCallback { void apply(HoodieRecord record) throws Exception; } + /** + * A callback for log record scanner to consume deleted HoodieKeys. + */ + @FunctionalInterface + public interface CallbackForDeletedKeys { + void apply(HoodieKey deletedKey); + } + /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ @@ -118,6 +124,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { private Option instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; + private CallbackForDeletedKeys callbackForDeletedKeys; private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; private HoodieTableMetaClient hoodieTableMetaClient; @@ -181,6 +188,11 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { return this; } + public Builder withLogRecordScannerCallbackForDeletedKeys(CallbackForDeletedKeys callbackForDeletedKeys) { + this.callbackForDeletedKeys = callbackForDeletedKeys; + return this; + } + @Override public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; @@ -205,7 +217,7 @@ public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, - latestInstantTime, reverseReader, bufferSize, callback, instantRange, + latestInstantTime, reverseReader, bufferSize, callback, callbackForDeletedKeys, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c1f561a705a9..5626e88b0fe6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -787,13 +787,13 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); String basePath = dataTableMetaClient.getBasePath().toString(); // we might need to set some additional variables if we need to process log files. - boolean anyLogFilesWithDeleteBlocks = allWriteStats.stream().anyMatch(writeStat -> { + boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> { String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); - return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0; + return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0; }); Option latestCommitTimestamp = Option.empty(); Option writerSchemaOpt = Option.empty(); - if (anyLogFilesWithDeleteBlocks) { // if we have a log file w/ pure deletes. + if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes. latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()); writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); } @@ -808,9 +808,10 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); } else { - // for logs, we only need to process delete blocks for RLI - if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) { - Set deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient, + // for logs, we only need to process log files containing deletes + if (writeStat.getNumDeletes() > 0) { + StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); + Set deletedRecordKeys = getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); } @@ -1397,7 +1398,7 @@ private static Set getDeletedRecordKeys(String filePath, HoodieTableMeta String latestCommitTimestamp) throws IOException { if (writerSchemaOpt.isPresent()) { // read log file records without merging - List records = new ArrayList<>(); + List deletedKeys = new ArrayList<>(); HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() .withStorage(datasetMetaClient.getStorage()) .withBasePath(datasetMetaClient.getBasePath()) @@ -1406,10 +1407,10 @@ private static Set getDeletedRecordKeys(String filePath, HoodieTableMeta .withLatestInstantTime(latestCommitTimestamp) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> deletedKeys.add(deletedKey.getRecordKey())) .build(); scanner.scan(); - // HoodieUnMergedLogRecordScanner will expose deleted record keys - return scanner.getDeletedRecordKeys(); + return deletedKeys.stream().collect(Collectors.toSet()); } return Collections.emptySet(); } From c21af1c76cc152f30201a5db02916b18d5f9b5ba Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 20 Nov 2024 09:24:22 -0800 Subject: [PATCH 4/6] Addressing pending feedback from danny --- .../functional/TestRLIRecordGeneration.java | 114 +++++++++++++----- .../log/HoodieUnMergedLogRecordScanner.java | 4 +- ...s.java => BaseFileRecordParsingUtils.java} | 10 +- .../metadata/HoodieTableMetadataUtil.java | 10 +- ...va => TestBaseFileRecordParsingUtils.java} | 2 +- 5 files changed, 94 insertions(+), 46 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/metadata/{BaseFileUtils.java => BaseFileRecordParsingUtils.java} (89%) rename hudi-common/src/test/java/org/apache/hudi/common/util/{TestBaseFileUtils.java => TestBaseFileRecordParsingUtils.java} (98%) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java index 9ad4003525b7..b85ec1bddc82 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java @@ -22,19 +22,25 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metadata.BaseFileUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metadata.BaseFileRecordParsingUtils; import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.avro.Schema; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -89,7 +95,7 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); } - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); while (rliRecordsItr.hasNext()) { HoodieRecord rliRecord = rliRecordsItr.next(); @@ -125,7 +131,11 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro if (tableType == HoodieTableType.COPY_ON_WRITE) { List expectedRLIInserts = inserts2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); List expectedRLIDeletes = deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); - generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, finalCommitTime2, writeConfig, expectedRLIInserts, expectedRLIDeletes); + List actualInserts = new ArrayList<>(); + List actualDeletes = new ArrayList<>(); + generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, finalCommitTime2, writeConfig, actualInserts, actualDeletes); + + assertListEquality(expectedRLIInserts, expectedRLIDeletes, actualInserts, actualDeletes); } else { // trigger 2nd commit followed by compaction. commitTime = client.createNewInstantTime(); @@ -142,19 +152,41 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro List writeStatuses3 = client.upsert(jsc.parallelize(records3, 1), commitTime).collect(); assertNoWriteErrors(writeStatuses3); + List expectedRLIInserts = inserts3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List actualInserts = new ArrayList<>(); + List actualDeletes = new ArrayList<>(); + generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus -> !FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) + .collect(Collectors.toList()), Collections.emptyMap(), finalCommitTime3, writeConfig, actualInserts, actualDeletes); + + // process log files. + String latestCommitTimestamp = metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime(); + Option writerSchemaOpt = tryResolveSchemaForTable(metaClient); + writeStatuses3.stream().filter(writeStatus -> FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) + .forEach(writeStatus -> { + try { + actualDeletes.addAll(HoodieTableMetadataUtil.getDeletedRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt, + writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp)); + } catch (IOException e) { + throw new HoodieIOException("Failed w/ IOException ", e); + } + }); + + assertListEquality(expectedRLIInserts, expectedRLIDeletes, actualInserts, actualDeletes); + // trigger compaction Option compactionInstantOpt = client.scheduleCompaction(Option.empty()); assertTrue(compactionInstantOpt.isPresent()); List compactionWriteStats = (List) client.compact(compactionInstantOpt.get()).getWriteStats().get(); - List expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); // since deletes are lazily realized when compaction kicks in, we need to account for deletes from 2nd commit as well. expectedRLIDeletes.addAll(deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList())); List actualRLIDeletes = new ArrayList<>(); compactionWriteStats.forEach(writeStat -> { try { - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat, + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat, writeConfig.getWritesFileIdEncoding(), finalCommitTime3, metaClient.getStorage()); while (rliRecordsItr.hasNext()) { HoodieRecord rliRecord = rliRecordsItr.next(); @@ -177,38 +209,56 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro } } + private void assertListEquality(List expectedRLIInserts, List expectedRLIDeletes, List actualInserts, + List actualDeletes) { + Collections.sort(expectedRLIInserts); + Collections.sort(actualInserts); + Collections.sort(expectedRLIDeletes); + Collections.sort(actualDeletes); + assertEquals(expectedRLIInserts, actualInserts); + assertEquals(expectedRLIDeletes, actualDeletes); + } + + private static Option tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) { + if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { + return Option.empty(); + } + + try { + TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient); + return Option.of(schemaResolver.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e); + } + } + private void generateRliRecordsAndAssert(List writeStatuses, Map fileIdToFileNameMapping, String commitTime, - HoodieWriteConfig writeConfig, List expectedRLIInserts, List expectedRLIDeletes) { - List actualRLIDeletes = new ArrayList<>(); - List actualRLIInserts = new ArrayList<>(); + HoodieWriteConfig writeConfig, List actualInserts, + List actualDeletes) { writeStatuses.forEach(writeStatus -> { - assertTrue(writeStatus.getStat().getNumDeletes() != 0); - // Fetch record keys for all - try { - String writeStatFileId = writeStatus.getFileId(); - assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); - - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), - writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); - String key = rliRecord.getRecordKey(); - if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { - actualRLIDeletes.add(key); - } else { - actualRLIInserts.add(key); + if (!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) { + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping.isEmpty()) { + assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); } + + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), + writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualDeletes.add(key); + } else { + actualInserts.add(key); + } + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); } - } catch (IOException e) { - throw new HoodieException("Should not have failed ", e); } }); - - Collections.sort(expectedRLIInserts); - Collections.sort(actualRLIInserts); - Collections.sort(expectedRLIDeletes); - Collections.sort(actualRLIDeletes); - assertEquals(expectedRLIInserts, actualRLIInserts); - assertEquals(expectedRLIDeletes, actualRLIDeletes); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index e807be2bcee1..9b02a83ba6ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -82,7 +82,9 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws Except // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into queue of BoundedInMemoryExecutor. // Just call callback without merging - callback.apply(hoodieRecord.copy()); + if (callback != null) { + callback.apply(hoodieRecord.copy()); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java similarity index 89% rename from hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java rename to hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java index 20038fdf840d..6986754bc951 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java @@ -32,14 +32,13 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; import static java.util.stream.Collectors.toList; -public class BaseFileUtils { +public class BaseFileRecordParsingUtils { /** * Generates RLI Metadata records for base files. @@ -64,10 +63,9 @@ public static Iterator generateRLIMetadataHoodieRecordsForBaseFile String previousFileName = writeStat.getPrevBaseFile(); String fileId = FSUtils.getFileId(latestFileName); Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); - if (writeStat.getNumDeletes() == 0) { // if there are no deletes, reading only the file added as part of current commit metadata would suffice. - // this means that both inserts and updates from latest base file might result in RLI records. - return new ArrayList(recordKeysFromLatestBaseFile).stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate((String) recordKey, partition, fileId, - instantTime, writesFileIdEncoding)).iterator(); + if (previousFileName == null) { + return recordKeysFromLatestBaseFile.stream().map(recordKey -> (HoodieRecord)HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, + instantTime, writesFileIdEncoding)).collect(toList()).iterator(); } else { // read from previous base file and find difference to also generate delete records. // we will return new inserts and deletes from this code block diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 5626e88b0fe6..0af90fee90df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -791,28 +791,25 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0; }); - Option latestCommitTimestamp = Option.empty(); Option writerSchemaOpt = Option.empty(); if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes. - latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()); writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); } int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); Option finalWriterSchemaOpt = writerSchemaOpt; - Option finalLatestCommitTimestamp = latestCommitTimestamp; HoodieData recordIndexRecords = engineContext.parallelize(allWriteStats, parallelism) .flatMap(writeStat -> { HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); // handle base files if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); + return BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); } else { // for logs, we only need to process log files containing deletes if (writeStat.getNumDeletes() > 0) { StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); Set deletedRecordKeys = getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient, - finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); + finalWriterSchemaOpt, maxBufferSize, instantTime); return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); } // ignore log file data blocks. @@ -1393,7 +1390,8 @@ public static List> getLogFileColumnRangeM return Collections.emptyList(); } - private static Set getDeletedRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, + @VisibleForTesting + public static Set getDeletedRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, Option writerSchemaOpt, int maxBufferSize, String latestCommitTimestamp) throws IOException { if (writerSchemaOpt.isPresent()) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileRecordParsingUtils.java similarity index 98% rename from hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileRecordParsingUtils.java index e73aaebc4edd..c0de706f9947 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileRecordParsingUtils.java @@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -public class TestBaseFileUtils { +public class TestBaseFileRecordParsingUtils { private static final String PARTITION_PATH = "partition"; private static final String COLUMN_NAME = "columnName"; From 9534f3339ef29d425e5f7c0f0625eaa9996b4ef8 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 21 Nov 2024 13:12:27 -0800 Subject: [PATCH 5/6] Fixing Secondary Index Record generation to not rely on WriteStatus --- .../HoodieBackedTableMetadataWriter.java | 21 +++--- .../metadata/BaseFileRecordParsingUtils.java | 32 +++++++++ .../metadata/HoodieTableMetadataUtil.java | 69 +++++++++++++++++++ .../TestSecondaryIndexPruning.scala | 7 +- 4 files changed, 111 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 07313150ead1..3697e4617a55 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1065,7 +1065,7 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates)); } updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); - updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); + updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime); return partitionToRecordMap; }); closeInternal(); @@ -1127,7 +1127,8 @@ private HoodieData getFunctionalIndexUpdates(HoodieCommitMetadata return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime); } - private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, HoodieData writeStatus) { + private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, + String instantTime) { // If write operation type based on commit metadata is COMPACT or CLUSTER then no need to update, // because these operations do not change the secondary key - record key mapping. if (commitMetadata.getOperationType() == WriteOperationType.COMPACT @@ -1141,7 +1142,7 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, .forEach(partition -> { HoodieData secondaryIndexRecords; try { - secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, writeStatus); + secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, instantTime); } catch (Exception e) { throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e); } @@ -1149,20 +1150,14 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, }); } - private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieData writeStatus) throws Exception { + private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception { List>>> partitionFilePairs = getPartitionFilePairs(commitMetadata); // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of // the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this // operation is going to be expensive (in CPU, memory and IO) - List keysToRemove = new ArrayList<>(); - writeStatus.collectAsList().forEach(status -> { - status.getWrittenRecordDelegates().forEach(recordDelegate -> { - // Consider those keys which were either updated or deleted in this commit - if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) { - keysToRemove.add(recordDelegate.getRecordKey()); - } - }); - }); + List keysToRemove = HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), + dataMetaClient, instantTime); + HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition); // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to // This is obtained by scanning the entire secondary index partition in the metadata table diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java index 6986754bc951..cff6a79dc434 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -87,6 +88,37 @@ public static Iterator generateRLIMetadataHoodieRecordsForBaseFile } } + public static List getRecordKeysDeletedOrUpdated(String basePath, + HoodieWriteStat writeStat, + HoodieStorage storage) throws IOException { + String partition = writeStat.getPartitionPath(); + String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); + String previousFileName = writeStat.getPrevBaseFile(); + Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); + if (previousFileName == null) { + // if this is a new base file for a new file group, everything is an insert. + return Collections.emptyList(); + } else { + // read from previous base file and find difference to also generate delete records. + // we will return updates and deletes from this code block + Set recordKeysFromPreviousBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName); + List toReturn = recordKeysFromPreviousBaseFile.stream() + .filter(recordKey -> { + // deleted record + return !recordKeysFromLatestBaseFile.contains(recordKey); + }).collect(toList()); + + toReturn.addAll(recordKeysFromLatestBaseFile.stream() + .filter(recordKey -> { + // updates + return recordKeysFromPreviousBaseFile.contains(recordKey); + }).collect(toList())); + return toReturn; + } + } + + + private static Set getRecordKeysFromBaseFile(HoodieStorage storage, String basePath, String partition, String fileName) throws IOException { StoragePath dataFilePath = new StoragePath(basePath, StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR) + fileName); FileFormatUtils fileFormatUtils = HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 0af90fee90df..fe52519db2ea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -838,6 +838,51 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine } } + static List getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + String instantTime) { + + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + + if (allWriteStats.isEmpty()) { + return Collections.emptyList(); + } + + try { + int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); + String basePath = dataTableMetaClient.getBasePath().toString(); + // we might need to set some additional variables if we need to process log files. + boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> { + String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); + return FSUtils.isLogFile(fileName); + }); + Option writerSchemaOpt = Option.empty(); + if (anyLogFiles) { + writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); + } + int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); + StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); + Option finalWriterSchemaOpt = writerSchemaOpt; + return engineContext.parallelize(allWriteStats, parallelism) + .flatMap(writeStat -> { + HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); + // handle base files + if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, storage).iterator(); + } else { + // for logs, every entry is either an update or a delete + StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); + return getRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime).iterator(); + } + }).collectAsList(); + } catch (Exception e) { + throw new HoodieException("Failed to generate column stats records for metadata table", e); + } + } + private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, Map> partitionToFilesMap) { InstantGenerator factory = dataTableMetaClient.getInstantGenerator(); @@ -1413,6 +1458,30 @@ public static Set getDeletedRecordKeys(String filePath, HoodieTableMetaC return Collections.emptySet(); } + @VisibleForTesting + public static Set getRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, int maxBufferSize, + String latestCommitTimestamp) throws IOException { + if (writerSchemaOpt.isPresent()) { + // read log file records without merging + Set allRecordKeys = new HashSet<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTimestamp) + .withReaderSchema(writerSchemaOpt.get()) + .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey())) + .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey())) + .build(); + scanner.scan(); + return allRecordKeys; + } + return Collections.emptySet(); + } + /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by * the {@link LogicalTypes.Decimal} Avro logical type diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 606a5186faf6..1849f6fefd2c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -1080,12 +1080,11 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { ) verifyQueryPredicate(hudiOpts, "not_record_key_col") - // update the secondary key column by insert. - spark.sql(s"insert into $tableName values (5, 'row2', 'efg', 'p2')") + // update the secondary key column by update. + spark.sql(s"update $tableName set not_record_key_col = 'efg' where record_key_col = 'row2'") confirmLastCommitType(ActionType.replacecommit) // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( - Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), Seq(s"efg${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)) @@ -1096,7 +1095,6 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), - Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"fgh${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)) @@ -1105,7 +1103,6 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { confirmLastCommitType(ActionType.replacecommit) // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( - Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"fgh${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false) ) From fedc6a5898fc7df25e94ca5758734abbd04a06d7 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 22 Nov 2024 12:27:09 -0800 Subject: [PATCH 6/6] Addressing feedback --- .../log/HoodieUnMergedLogRecordScanner.java | 20 ++++---- .../metadata/HoodieTableMetadataUtil.java | 47 ++++++++++++------- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 9b02a83ba6ae..90bcbdc9b0f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -43,11 +43,11 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { private final LogRecordScannerCallback callback; - private final CallbackForDeletedKeys callbackForDeletedKeys; + private final DeletionCallback deletionCallback; private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, CallbackForDeletedKeys callbackForDeletedKeys, + LogRecordScannerCallback callback, DeletionCallback deletionCallback, Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { @@ -55,7 +55,7 @@ private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, L false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; - this.callbackForDeletedKeys = callbackForDeletedKeys; + this.deletionCallback = deletionCallback; } /** @@ -89,8 +89,8 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws Except @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - if (callbackForDeletedKeys != null) { - callbackForDeletedKeys.apply(deleteRecord.getHoodieKey()); + if (deletionCallback != null) { + deletionCallback.apply(deleteRecord.getHoodieKey()); } } @@ -107,7 +107,7 @@ public interface LogRecordScannerCallback { * A callback for log record scanner to consume deleted HoodieKeys. */ @FunctionalInterface - public interface CallbackForDeletedKeys { + public interface DeletionCallback { void apply(HoodieKey deletedKey); } @@ -126,7 +126,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { private Option instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; - private CallbackForDeletedKeys callbackForDeletedKeys; + private DeletionCallback deletionCallback; private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; private HoodieTableMetaClient hoodieTableMetaClient; @@ -190,8 +190,8 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { return this; } - public Builder withLogRecordScannerCallbackForDeletedKeys(CallbackForDeletedKeys callbackForDeletedKeys) { - this.callbackForDeletedKeys = callbackForDeletedKeys; + public Builder withLogRecordScannerCallbackForDeletedKeys(DeletionCallback deletionCallback) { + this.deletionCallback = deletionCallback; return this; } @@ -219,7 +219,7 @@ public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, - latestInstantTime, reverseReader, bufferSize, callback, callbackForDeletedKeys, instantRange, + latestInstantTime, reverseReader, bufferSize, callback, deletionCallback, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index fe52519db2ea..bce187a49b05 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -819,25 +819,36 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine // there are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) // lets do reduce by key to ignore the deleted entry. - return recordIndexRecords.mapToPair( - (SerializablePairFunction) t -> Pair.of(t.getKey(), t)) - .reduceByKey((SerializableBiFunction) (record1, record2) -> { - boolean isRecord1Deleted = record1.getData() instanceof EmptyHoodieRecordPayload; - boolean isRecord2Deleted = record2.getData() instanceof EmptyHoodieRecordPayload; - if (isRecord1Deleted && !isRecord2Deleted) { - return record2; - } else if (!isRecord1Deleted && isRecord2Deleted) { - return record1; - } else { - throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " - + record1.getData().toString() + ", record 2 : " + record2.getData().toString()); - } - }, parallelism).values(); + return reduceByKeys(recordIndexRecords, parallelism); } catch (Exception e) { - throw new HoodieException("Failed to generate column stats records for metadata table", e); + throw new HoodieException("Failed to generate RLI records for metadata table", e); } } + /** + * There are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) + * So, this method performs reduce by key to ignore the deleted entry. + * @param recordIndexRecords hoodie records after rli index lookup. + * @param parallelism parallelism to use. + * @return + */ + private static HoodieData reduceByKeys(HoodieData recordIndexRecords, int parallelism) { + return recordIndexRecords.mapToPair( + (SerializablePairFunction) t -> Pair.of(t.getKey(), t)) + .reduceByKey((SerializableBiFunction) (record1, record2) -> { + boolean isRecord1Deleted = record1.getData() instanceof EmptyHoodieRecordPayload; + boolean isRecord2Deleted = record2.getData() instanceof EmptyHoodieRecordPayload; + if (isRecord1Deleted && !isRecord2Deleted) { + return record2; + } else if (!isRecord1Deleted && isRecord2Deleted) { + return record1; + } else { + throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " + + record1.getData().toString() + ", record 2 : " + record2.getData().toString()); + } + }, parallelism).values(); + } + static List getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, HoodieCommitMetadata commitMetadata, HoodieMetadataConfig metadataConfig, @@ -879,7 +890,7 @@ static List getRecordKeysDeletedOrUpdated(HoodieEngineContext engineCont } }).collectAsList(); } catch (Exception e) { - throw new HoodieException("Failed to generate column stats records for metadata table", e); + throw new HoodieException("Failed to fetch deleted record keys while preparing MDT records", e); } } @@ -1441,7 +1452,7 @@ public static Set getDeletedRecordKeys(String filePath, HoodieTableMetaC String latestCommitTimestamp) throws IOException { if (writerSchemaOpt.isPresent()) { // read log file records without merging - List deletedKeys = new ArrayList<>(); + Set deletedKeys = new HashSet<>(); HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() .withStorage(datasetMetaClient.getStorage()) .withBasePath(datasetMetaClient.getBasePath()) @@ -1453,7 +1464,7 @@ public static Set getDeletedRecordKeys(String filePath, HoodieTableMetaC .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> deletedKeys.add(deletedKey.getRecordKey())) .build(); scanner.scan(); - return deletedKeys.stream().collect(Collectors.toSet()); + return deletedKeys; } return Collections.emptySet(); }