Skip to content

Commit

Permalink
adding more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 23, 2024
1 parent 0ed2635 commit 048c2a8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -38,6 +41,7 @@
import org.apache.hudi.metadata.BaseFileRecordParsingUtils;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -135,7 +139,8 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro
List<String> actualDeletes = new ArrayList<>();
generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, finalCommitTime2, writeConfig, actualInserts, actualDeletes);

assertListEquality(expectedRLIInserts, expectedRLIDeletes, actualInserts, actualDeletes);
assertListEquality(expectedRLIInserts, actualInserts);
assertListEquality(expectedRLIDeletes, actualDeletes);
} else {
// trigger 2nd commit followed by compaction.
commitTime = client.createNewInstantTime();
Expand All @@ -153,11 +158,17 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro
assertNoWriteErrors(writeStatuses3);

List<String> expectedRLIInserts = inserts3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedUpdates = updates3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedUpatesAndDeletes = new ArrayList<>(expectedRLIDeletes);
expectedUpatesAndDeletes.addAll(expectedUpdates);

// lets also validate
List<String> actualInserts = new ArrayList<>();
List<String> actualDeletes = new ArrayList<>();
List<String> actualUpdatesAndDeletes = new ArrayList<>();
generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus -> !FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath())))
.collect(Collectors.toList()), Collections.emptyMap(), finalCommitTime3, writeConfig, actualInserts, actualDeletes);
.collect(Collectors.toList()), Collections.emptyMap(), finalCommitTime3, writeConfig, actualInserts, actualDeletes);

// process log files.
String latestCommitTimestamp = metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime();
Expand All @@ -167,21 +178,33 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro
try {
actualDeletes.addAll(HoodieTableMetadataUtil.getDeletedRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp));

actualUpdatesAndDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp));
} catch (IOException e) {
throw new HoodieIOException("Failed w/ IOException ", e);
}
});

assertListEquality(expectedRLIInserts, expectedRLIDeletes, actualInserts, actualDeletes);
assertListEquality(expectedRLIInserts, actualInserts);
assertListEquality(expectedRLIDeletes, actualDeletes);
assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

List<String> updatedOrDeletedKeys =
new ArrayList<>(HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3));
List<String> expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates);
expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);

// trigger compaction
Option<String> compactionInstantOpt = client.scheduleCompaction(Option.empty());
assertTrue(compactionInstantOpt.isPresent());
List<HoodieWriteStat> compactionWriteStats = (List<HoodieWriteStat>) client.compact(compactionInstantOpt.get()).getWriteStats().get();
HoodieWriteMetadata compactionWriteMetadata = client.compact(compactionInstantOpt.get());
List<HoodieWriteStat> compactionWriteStats = (List<HoodieWriteStat>) compactionWriteMetadata.getWriteStats().get();

expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
// since deletes are lazily realized when compaction kicks in, we need to account for deletes from 2nd commit as well.
expectedRLIDeletes.addAll(deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList()));
List<String> actualRLIDeletes = new ArrayList<>();

compactionWriteStats.forEach(writeStat -> {
Expand All @@ -202,21 +225,15 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro

// it may not be easy to assert inserts to RLI. bcoz, if there are no deletes, both inserts and updates to data table result in RLI records.
// but if there are deleted, we only ingest inserts and deletes from data table to RLI partition.
Collections.sort(expectedRLIDeletes);
Collections.sort(actualRLIDeletes);
assertEquals(expectedRLIDeletes, actualRLIDeletes);
assertListEquality(expectedRLIDeletes, actualDeletes);
}
}
}

private void assertListEquality(List<String> expectedRLIInserts, List<String> expectedRLIDeletes, List<String> actualInserts,
List<String> actualDeletes) {
Collections.sort(expectedRLIInserts);
Collections.sort(actualInserts);
Collections.sort(expectedRLIDeletes);
Collections.sort(actualDeletes);
assertEquals(expectedRLIInserts, actualInserts);
assertEquals(expectedRLIDeletes, actualDeletes);
private void assertListEquality(List<String> list1, List<String> list2) {
Collections.sort(list1);
Collections.sort(list2);
assertEquals(list1, list2);
}

private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,8 @@ private static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> re
}, parallelism).values();
}

static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext,
@VisibleForTesting
public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext,
HoodieCommitMetadata commitMetadata,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TestBaseFileRecordParsingUtils {
public class TestBaseFileUtils {
private static final String PARTITION_PATH = "partition";
private static final String COLUMN_NAME = "columnName";

Expand Down

0 comments on commit 048c2a8

Please sign in to comment.