diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 1bf6f6b01389..57bc6a9ecd61 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo try { String latestValidFilePath = baseFileToMerge.getFileName(); writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime()); + writeStatus.getStat().setPrevBaseFile(latestValidFilePath); HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime, new StoragePath(config.getBasePath()), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 256b509690be..3697e4617a55 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieIndexMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordDelegate; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; @@ -1056,17 +1055,17 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), - dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig()); + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getWritesFileIdEncoding(), + dataWriteConfig.getMetadataConfig()); // Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code // to the HoodieTableMetadataUtil class in hudi-common. if (dataWriteConfig.isRecordIndexEnabled()) { - HoodieData updatesFromWriteStatuses = getRecordIndexUpserts(writeStatus); - HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata); - partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), updatesFromWriteStatuses.union(additionalUpdates)); + HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata); + partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates)); } updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); - updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); + updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime); return partitionToRecordMap; }); closeInternal(); @@ -1080,7 +1079,8 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), - dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig()); + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), + dataWriteConfig.getWritesFileIdEncoding(), dataWriteConfig.getMetadataConfig()); HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata); partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), records.union(additionalUpdates)); updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); @@ -1127,7 +1127,8 @@ private HoodieData getFunctionalIndexUpdates(HoodieCommitMetadata return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime); } - private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, HoodieData writeStatus) { + private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, + String instantTime) { // If write operation type based on commit metadata is COMPACT or CLUSTER then no need to update, // because these operations do not change the secondary key - record key mapping. if (commitMetadata.getOperationType() == WriteOperationType.COMPACT @@ -1141,7 +1142,7 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, .forEach(partition -> { HoodieData secondaryIndexRecords; try { - secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, writeStatus); + secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, instantTime); } catch (Exception e) { throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e); } @@ -1149,20 +1150,14 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, }); } - private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieData writeStatus) throws Exception { + private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception { List>>> partitionFilePairs = getPartitionFilePairs(commitMetadata); // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of // the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this // operation is going to be expensive (in CPU, memory and IO) - List keysToRemove = new ArrayList<>(); - writeStatus.collectAsList().forEach(status -> { - status.getWrittenRecordDelegates().forEach(recordDelegate -> { - // Consider those keys which were either updated or deleted in this commit - if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) { - keysToRemove.add(recordDelegate.getRecordKey()); - } - }); - }); + List keysToRemove = HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), + dataMetaClient, instantTime); + HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition); // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to // This is obtained by scanning the entire secondary index partition in the metadata table @@ -1667,51 +1662,6 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map getRecordIndexUpserts(HoodieData writeStatuses) { - return writeStatuses.flatMap(writeStatus -> { - List recordList = new LinkedList<>(); - for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) { - if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) { - if (recordDelegate.getIgnoreIndexUpdate()) { - continue; - } - HoodieRecord hoodieRecord; - Option newLocation = recordDelegate.getNewLocation(); - if (newLocation.isPresent()) { - if (recordDelegate.getCurrentLocation().isPresent()) { - // This is an update, no need to update index if the location has not changed - // newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's - // instantTime refers to the current commit which was completed. - if (!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId())) { - final String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.", - recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get()); - LOG.error(msg); - throw new HoodieMetadataException(msg); - } - // for updates, we can skip updating RLI partition in MDT - } else { - // Insert new record case - hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate( - recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(), - newLocation.get().getFileId(), newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding()); - recordList.add(hoodieRecord); - } - } else { - // Delete existing index for a deleted record - hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey()); - recordList.add(hoodieRecord); - } - } - } - return recordList.iterator(); - }); - } - private HoodieData getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) { try (HoodieMetadataFileSystemView fsView = getMetadataView()) { List> partitionBaseFilePairs = replaceCommitMetadata diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java new file mode 100644 index 000000000000..b85ec1bddc82 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metadata.BaseFileRecordParsingUtils; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestRLIRecordGeneration extends HoodieClientTestBase { + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) throws IOException { + cleanupClients(); + initMetaClient(tableType); + cleanupTimelineService(); + initTimelineService(); + + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + HoodieWriteConfig writeConfig = tableType == HoodieTableType.COPY_ON_WRITE ? getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build() + : getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2) + .withInlineCompaction(true) + .compactionSmallFileSize(0).build()).build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // Insert + String commitTime = client.createNewInstantTime(); + List records1 = dataGen.generateInserts(commitTime, 100); + client.startCommitWithTime(commitTime); + List writeStatuses1 = client.insert(jsc.parallelize(records1, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses1); + + // assert RLI records for a base file from 1st commit + String finalCommitTime = commitTime; + Map recordKeyToPartitionMapping1 = new HashMap<>(); + Map fileIdToFileNameMapping1 = new HashMap<>(); + writeStatuses1.forEach(writeStatus -> { + assertEquals(writeStatus.getStat().getNumDeletes(), 0); + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) { + fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); + } + + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); + recordKeyToPartitionMapping1.put(key, partition); + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + Map expectedRecordToPartitionMapping1 = new HashMap<>(); + records1.forEach(record -> expectedRecordToPartitionMapping1.put(record.getRecordKey(), record.getPartitionPath())); + + assertEquals(expectedRecordToPartitionMapping1, recordKeyToPartitionMapping1); + + // lets update some records and assert RLI records. + commitTime = client.createNewInstantTime(); + client.startCommitWithTime(commitTime); + String finalCommitTime2 = commitTime; + List deletes2 = dataGen.generateUniqueDeleteRecords(commitTime, 30); + List updates2 = dataGen.generateUniqueUpdates(commitTime, 30); + List inserts2 = dataGen.generateInserts(commitTime, 30); + List records2 = new ArrayList<>(); + records2.addAll(inserts2); + records2.addAll(updates2); + records2.addAll(deletes2); + + List writeStatuses2 = client.upsert(jsc.parallelize(records2, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses2); + + if (tableType == HoodieTableType.COPY_ON_WRITE) { + List expectedRLIInserts = inserts2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List expectedRLIDeletes = deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List actualInserts = new ArrayList<>(); + List actualDeletes = new ArrayList<>(); + generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, finalCommitTime2, writeConfig, actualInserts, actualDeletes); + + assertListEquality(expectedRLIInserts, expectedRLIDeletes, actualInserts, actualDeletes); + } else { + // trigger 2nd commit followed by compaction. + commitTime = client.createNewInstantTime(); + client.startCommitWithTime(commitTime); + String finalCommitTime3 = commitTime; + List deletes3 = dataGen.generateUniqueDeleteRecords(commitTime, 30); + List updates3 = dataGen.generateUniqueUpdates(commitTime, 30); + List inserts3 = dataGen.generateInserts(commitTime, 30); + List records3 = new ArrayList<>(); + records3.addAll(inserts3); + records3.addAll(updates3); + records3.addAll(deletes3); + + List writeStatuses3 = client.upsert(jsc.parallelize(records3, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses3); + + List expectedRLIInserts = inserts3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + List actualInserts = new ArrayList<>(); + List actualDeletes = new ArrayList<>(); + generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus -> !FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) + .collect(Collectors.toList()), Collections.emptyMap(), finalCommitTime3, writeConfig, actualInserts, actualDeletes); + + // process log files. + String latestCommitTimestamp = metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime(); + Option writerSchemaOpt = tryResolveSchemaForTable(metaClient); + writeStatuses3.stream().filter(writeStatus -> FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) + .forEach(writeStatus -> { + try { + actualDeletes.addAll(HoodieTableMetadataUtil.getDeletedRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt, + writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp)); + } catch (IOException e) { + throw new HoodieIOException("Failed w/ IOException ", e); + } + }); + + assertListEquality(expectedRLIInserts, expectedRLIDeletes, actualInserts, actualDeletes); + + // trigger compaction + Option compactionInstantOpt = client.scheduleCompaction(Option.empty()); + assertTrue(compactionInstantOpt.isPresent()); + List compactionWriteStats = (List) client.compact(compactionInstantOpt.get()).getWriteStats().get(); + + expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()); + // since deletes are lazily realized when compaction kicks in, we need to account for deletes from 2nd commit as well. + expectedRLIDeletes.addAll(deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList())); + List actualRLIDeletes = new ArrayList<>(); + + compactionWriteStats.forEach(writeStat -> { + try { + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat, + writeConfig.getWritesFileIdEncoding(), finalCommitTime3, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualRLIDeletes.add(key); + } + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + }); + + // it may not be easy to assert inserts to RLI. bcoz, if there are no deletes, both inserts and updates to data table result in RLI records. + // but if there are deleted, we only ingest inserts and deletes from data table to RLI partition. + Collections.sort(expectedRLIDeletes); + Collections.sort(actualRLIDeletes); + assertEquals(expectedRLIDeletes, actualRLIDeletes); + } + } + } + + private void assertListEquality(List expectedRLIInserts, List expectedRLIDeletes, List actualInserts, + List actualDeletes) { + Collections.sort(expectedRLIInserts); + Collections.sort(actualInserts); + Collections.sort(expectedRLIDeletes); + Collections.sort(actualDeletes); + assertEquals(expectedRLIInserts, actualInserts); + assertEquals(expectedRLIDeletes, actualDeletes); + } + + private static Option tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) { + if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { + return Option.empty(); + } + + try { + TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient); + return Option.of(schemaResolver.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e); + } + } + + private void generateRliRecordsAndAssert(List writeStatuses, Map fileIdToFileNameMapping, String commitTime, + HoodieWriteConfig writeConfig, List actualInserts, + List actualDeletes) { + writeStatuses.forEach(writeStatus -> { + if (!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath()))) { + // Fetch record keys for all + try { + String writeStatFileId = writeStatus.getFileId(); + if (!fileIdToFileNameMapping.isEmpty()) { + assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId)); + } + + Iterator rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), + writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); + while (rliRecordsItr.hasNext()) { + HoodieRecord rliRecord = rliRecordsItr.next(); + String key = rliRecord.getRecordKey(); + if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + actualDeletes.add(key); + } else { + actualInserts.add(key); + } + } + } catch (IOException e) { + throw new HoodieException("Should not have failed ", e); + } + } + }); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 3c98a510317d..6d7ca6d51828 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -159,6 +159,9 @@ public class HoodieWriteStat implements Serializable { @Nullable private Long maxEventTime; + @Nullable + private String prevBaseFile; + @Nullable private RuntimeStats runtimeStats; @@ -327,6 +330,14 @@ public void setFileSizeInBytes(long fileSizeInBytes) { this.fileSizeInBytes = fileSizeInBytes; } + public String getPrevBaseFile() { + return prevBaseFile; + } + + public void setPrevBaseFile(String prevBaseFile) { + this.prevBaseFile = prevBaseFile; + } + public Long getMinEventTime() { return minEventTime; } @@ -370,9 +381,9 @@ public void setPath(StoragePath basePath, StoragePath path) { @Override public String toString() { return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit - + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites - + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath - + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats) + + '\'' + ", prevBaseFile=" + prevBaseFile + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + + ", numUpdateWrites=" + numUpdateWrites + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + + totalWriteErrors + ", tempPath='" + tempPath + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats) + '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords + ", totalLogFilesCompacted=" + totalLogFilesCompacted + ", totalLogSizeCompacted=" + totalLogSizeCompacted + ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + ", totalLogBlocks=" + totalLogBlocks diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8b8d43449c67..90bcbdc9b0f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; @@ -42,16 +43,19 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { private final LogRecordScannerCallback callback; + private final DeletionCallback deletionCallback; private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema, + LogRecordScannerCallback callback, DeletionCallback deletionCallback, + Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; + this.deletionCallback = deletionCallback; } /** @@ -78,12 +82,16 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws Except // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into queue of BoundedInMemoryExecutor. // Just call callback without merging - callback.apply(hoodieRecord.copy()); + if (callback != null) { + callback.apply(hoodieRecord.copy()); + } } @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - // no - op + if (deletionCallback != null) { + deletionCallback.apply(deleteRecord.getHoodieKey()); + } } /** @@ -95,6 +103,14 @@ public interface LogRecordScannerCallback { void apply(HoodieRecord record) throws Exception; } + /** + * A callback for log record scanner to consume deleted HoodieKeys. + */ + @FunctionalInterface + public interface DeletionCallback { + void apply(HoodieKey deletedKey); + } + /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ @@ -110,6 +126,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { private Option instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; + private DeletionCallback deletionCallback; private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; private HoodieTableMetaClient hoodieTableMetaClient; @@ -173,6 +190,11 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { return this; } + public Builder withLogRecordScannerCallbackForDeletedKeys(DeletionCallback deletionCallback) { + this.deletionCallback = deletionCallback; + return this; + } + @Override public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; @@ -197,7 +219,7 @@ public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, - latestInstantTime, reverseReader, bufferSize, callback, instantRange, + latestInstantTime, reverseReader, bufferSize, callback, deletionCallback, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java new file mode 100644 index 000000000000..cff6a79dc434 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.FileFormatUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import static java.util.stream.Collectors.toList; + +public class BaseFileRecordParsingUtils { + + /** + * Generates RLI Metadata records for base files. + * If base file is a added to a new file group, all record keys are treated as inserts. + * If a base file is added to an existing file group, we read previous base file in addition to the latest base file of interest. Find deleted records and generate RLI Metadata records + * for the same in addition to new insert records. + * @param basePath base path of the table. + * @param writeStat {@link HoodieWriteStat} of interest. + * @param writesFileIdEncoding fileID encoding for the table. + * @param instantTime instant time of interest. + * @param storage instance of {@link HoodieStorage}. + * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition. + * @throws IOException + */ + public static Iterator generateRLIMetadataHoodieRecordsForBaseFile(String basePath, + HoodieWriteStat writeStat, + Integer writesFileIdEncoding, + String instantTime, + HoodieStorage storage) throws IOException { + String partition = writeStat.getPartitionPath(); + String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); + String previousFileName = writeStat.getPrevBaseFile(); + String fileId = FSUtils.getFileId(latestFileName); + Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); + if (previousFileName == null) { + return recordKeysFromLatestBaseFile.stream().map(recordKey -> (HoodieRecord)HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, + instantTime, writesFileIdEncoding)).collect(toList()).iterator(); + } else { + // read from previous base file and find difference to also generate delete records. + // we will return new inserts and deletes from this code block + Set recordKeysFromPreviousBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName); + List toReturn = recordKeysFromPreviousBaseFile.stream() + .filter(recordKey -> { + // deleted record + return !recordKeysFromLatestBaseFile.contains(recordKey); + }).map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()); + + toReturn.addAll(recordKeysFromLatestBaseFile.stream() + .filter(recordKey -> { + // new inserts + return !recordKeysFromPreviousBaseFile.contains(recordKey); + }).map(recordKey -> + HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, + instantTime, writesFileIdEncoding)).collect(toList())); + return toReturn.iterator(); + } + } + + public static List getRecordKeysDeletedOrUpdated(String basePath, + HoodieWriteStat writeStat, + HoodieStorage storage) throws IOException { + String partition = writeStat.getPartitionPath(); + String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); + String previousFileName = writeStat.getPrevBaseFile(); + Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); + if (previousFileName == null) { + // if this is a new base file for a new file group, everything is an insert. + return Collections.emptyList(); + } else { + // read from previous base file and find difference to also generate delete records. + // we will return updates and deletes from this code block + Set recordKeysFromPreviousBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName); + List toReturn = recordKeysFromPreviousBaseFile.stream() + .filter(recordKey -> { + // deleted record + return !recordKeysFromLatestBaseFile.contains(recordKey); + }).collect(toList()); + + toReturn.addAll(recordKeysFromLatestBaseFile.stream() + .filter(recordKey -> { + // updates + return recordKeysFromPreviousBaseFile.contains(recordKey); + }).collect(toList())); + return toReturn; + } + } + + + + private static Set getRecordKeysFromBaseFile(HoodieStorage storage, String basePath, String partition, String fileName) throws IOException { + StoragePath dataFilePath = new StoragePath(basePath, StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR) + fileName); + FileFormatUtils fileFormatUtils = HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET); + return fileFormatUtils.readRowKeys(storage, dataFilePath); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index fab10c288376..bce187a49b05 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -46,6 +46,9 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.function.SerializableBiFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -54,6 +57,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -135,6 +139,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.stream.Collectors.toList; import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; @@ -366,6 +371,7 @@ public static Map> convertMetadataToRecords(Hoo boolean isColumnStatsIndexEnabled, int columnStatsIndexParallelism, List targetColumnsForColumnStatsIndex, + Integer writesFileIdEncoding, HoodieMetadataConfig metadataConfig) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = context.parallelize( @@ -387,6 +393,10 @@ public static Map> convertMetadataToRecords(Hoo final HoodieData partitionStatsRDD = convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient, metadataConfig); partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), partitionStatsRDD); } + if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) { + partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig, + dataMetaClient, writesFileIdEncoding, instantTime)); + } return partitionToRecordsMap; } @@ -759,6 +769,131 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi }); } + static HoodieData convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext, + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + int writesFileIdEncoding, + String instantTime) { + + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + + if (allWriteStats.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + try { + int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); + String basePath = dataTableMetaClient.getBasePath().toString(); + // we might need to set some additional variables if we need to process log files. + boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> { + String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); + return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0; + }); + Option writerSchemaOpt = Option.empty(); + if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes. + writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); + } + int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); + StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); + Option finalWriterSchemaOpt = writerSchemaOpt; + HoodieData recordIndexRecords = engineContext.parallelize(allWriteStats, parallelism) + .flatMap(writeStat -> { + HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); + // handle base files + if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); + } else { + // for logs, we only need to process log files containing deletes + if (writeStat.getNumDeletes() > 0) { + StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); + Set deletedRecordKeys = getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient, + finalWriterSchemaOpt, maxBufferSize, instantTime); + return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); + } + // ignore log file data blocks. + return new ArrayList().iterator(); + } + }); + + // there are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) + // lets do reduce by key to ignore the deleted entry. + return reduceByKeys(recordIndexRecords, parallelism); + } catch (Exception e) { + throw new HoodieException("Failed to generate RLI records for metadata table", e); + } + } + + /** + * There are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) + * So, this method performs reduce by key to ignore the deleted entry. + * @param recordIndexRecords hoodie records after rli index lookup. + * @param parallelism parallelism to use. + * @return + */ + private static HoodieData reduceByKeys(HoodieData recordIndexRecords, int parallelism) { + return recordIndexRecords.mapToPair( + (SerializablePairFunction) t -> Pair.of(t.getKey(), t)) + .reduceByKey((SerializableBiFunction) (record1, record2) -> { + boolean isRecord1Deleted = record1.getData() instanceof EmptyHoodieRecordPayload; + boolean isRecord2Deleted = record2.getData() instanceof EmptyHoodieRecordPayload; + if (isRecord1Deleted && !isRecord2Deleted) { + return record2; + } else if (!isRecord1Deleted && isRecord2Deleted) { + return record1; + } else { + throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " + + record1.getData().toString() + ", record 2 : " + record2.getData().toString()); + } + }, parallelism).values(); + } + + static List getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + String instantTime) { + + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + + if (allWriteStats.isEmpty()) { + return Collections.emptyList(); + } + + try { + int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); + String basePath = dataTableMetaClient.getBasePath().toString(); + // we might need to set some additional variables if we need to process log files. + boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> { + String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); + return FSUtils.isLogFile(fileName); + }); + Option writerSchemaOpt = Option.empty(); + if (anyLogFiles) { + writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); + } + int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); + StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); + Option finalWriterSchemaOpt = writerSchemaOpt; + return engineContext.parallelize(allWriteStats, parallelism) + .flatMap(writeStat -> { + HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); + // handle base files + if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, storage).iterator(); + } else { + // for logs, every entry is either an update or a delete + StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); + return getRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime).iterator(); + } + }).collectAsList(); + } catch (Exception e) { + throw new HoodieException("Failed to fetch deleted record keys while preparing MDT records", e); + } + } + private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, Map> partitionToFilesMap) { InstantGenerator factory = dataTableMetaClient.getInstantGenerator(); @@ -1311,6 +1446,53 @@ public static List> getLogFileColumnRangeM return Collections.emptyList(); } + @VisibleForTesting + public static Set getDeletedRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, int maxBufferSize, + String latestCommitTimestamp) throws IOException { + if (writerSchemaOpt.isPresent()) { + // read log file records without merging + Set deletedKeys = new HashSet<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTimestamp) + .withReaderSchema(writerSchemaOpt.get()) + .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> deletedKeys.add(deletedKey.getRecordKey())) + .build(); + scanner.scan(); + return deletedKeys; + } + return Collections.emptySet(); + } + + @VisibleForTesting + public static Set getRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, int maxBufferSize, + String latestCommitTimestamp) throws IOException { + if (writerSchemaOpt.isPresent()) { + // read log file records without merging + Set allRecordKeys = new HashSet<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTimestamp) + .withReaderSchema(writerSchemaOpt.get()) + .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey())) + .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey())) + .build(); + scanner.scan(); + return allRecordKeys; + } + return Collections.emptySet(); + } + /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by * the {@link LogicalTypes.Decimal} Avro logical type diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java index a99e48482990..54d6b7a7ab79 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java @@ -53,6 +53,12 @@ public void testSetPaths() { writeStat.setPath(basePath, finalizeFilePath); assertEquals(finalizeFilePath, new StoragePath(basePath, writeStat.getPath())); + // test prev base file + StoragePath prevBaseFilePath = new StoragePath(partitionPath, FSUtils.makeBaseFileName(instantTime, "2-0-3", fileName, + HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension())); + writeStat.setPrevBaseFile(prevBaseFilePath.toString()); + assertEquals(prevBaseFilePath.toString(), writeStat.getPrevBaseFile()); + // test for null tempFilePath writeStat = new HoodieWriteStat(); writeStat.setPath(basePath, finalizeFilePath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileRecordParsingUtils.java similarity index 98% rename from hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileRecordParsingUtils.java index e73aaebc4edd..c0de706f9947 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileRecordParsingUtils.java @@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -public class TestBaseFileUtils { +public class TestBaseFileRecordParsingUtils { private static final String PARTITION_PATH = "partition"; private static final String COLUMN_NAME = "columnName"; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 0254ff590e0b..596689a62497 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -321,9 +321,9 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { writeConfig.getRecordIndexMinFileGroupCount, writeConfig.getRecordIndexMaxFileGroupCount, writeConfig.getRecordIndexGrowthFactor, writeConfig.getRecordIndexMaxFileGroupSizeBytes) assertEquals(estimatedFileGroupCount, getFileGroupCountForRecordIndex(writeConfig)) - val prevDf = mergedDfList.last.drop("tip_history") + val prevDf = mergedDfList.last.drop("tip_history", "_hoodie_is_deleted") val nonMatchingRecords = readDf.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", - "_hoodie_partition_path", "_hoodie_file_name", "tip_history") + "_hoodie_partition_path", "_hoodie_file_name", "tip_history", "_hoodie_is_deleted") .join(prevDf, prevDf.columns, "leftanti") assertEquals(0, nonMatchingRecords.count()) assertEquals(readDf.count(), prevDf.count()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index f0333c515a12..f032eebb1cfb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -72,7 +72,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition() val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition() - // batch1 inserts + // batch1 inserts (5 records) val instantTime1 = getNewInstantTime() val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1, 5)).asScala.toSeq var operation = INSERT_OPERATION_OPT_VAL @@ -112,7 +112,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true") val instantTime3 = getNewInstantTime() - // batch3. updates to partition2 + // batch3. update 2 records from newly inserted records from commit 2 to partition2 val latestBatch3 = recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq val latestBatchDf3 = spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1)) latestBatchDf3.cache() @@ -241,7 +241,28 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { .save(basePath) val prevDf = mergedDfList.last mergedDfList = mergedDfList :+ prevDf.except(deleteDf) - validateDataAndRecordIndices(hudiOpts) + validateDataAndRecordIndices(hudiOpts, deleteDf) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + insertDf.cache() + + val deleteDf = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(dataGen.generateUniqueDeleteRecords(getNewInstantTime, 1)).asScala, 1)) + deleteDf.cache() + val recordKeyToDelete = deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String] + deleteDf.write.format("org.apache.hudi") + .options(hudiOpts) + .mode(SaveMode.Append) + .save(basePath) + val prevDf = mergedDfList.last + mergedDfList = mergedDfList :+ prevDf.filter(row => row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete) + validateDataAndRecordIndices(hudiOpts, deleteDf) } @ParameterizedTest diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 606a5186faf6..1849f6fefd2c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -1080,12 +1080,11 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { ) verifyQueryPredicate(hudiOpts, "not_record_key_col") - // update the secondary key column by insert. - spark.sql(s"insert into $tableName values (5, 'row2', 'efg', 'p2')") + // update the secondary key column by update. + spark.sql(s"update $tableName set not_record_key_col = 'efg' where record_key_col = 'row2'") confirmLastCommitType(ActionType.replacecommit) // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( - Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), Seq(s"efg${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)) @@ -1096,7 +1095,6 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), - Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"fgh${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)) @@ -1105,7 +1103,6 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { confirmLastCommitType(ActionType.replacecommit) // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( - Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"fgh${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false) )