Skip to content

Commit

Permalink
Fixing tests and adding java docs
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 27, 2024
1 parent 3e842e4 commit f15241b
Showing 1 changed file with 82 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestRLIRecordGeneration extends HoodieClientTestBase {

public class TestMetadataUtilRLIandSIRecordGeneration extends HoodieClientTestBase {

/**
* Tests various methods used for RLI and SI record generation flows.
* We test below methods
* BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(...). This is used for RLI record generation.
* BaseFileRecordParsingUtils.getRecordKeyStatuses(...) // This is used in both RLI and SI flow.
*
* @throws IOException
*/
@Test
public void testRecordGenerationAPIsForCOW() throws IOException {
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
Expand Down Expand Up @@ -98,6 +106,7 @@ public void testRecordGenerationAPIsForCOW() throws IOException {
fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1));
}

// poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch MDT RLI records for inserts and deletes.
Iterator<HoodieRecord> rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage());
while (rliRecordsItr.hasNext()) {
Expand Down Expand Up @@ -135,6 +144,7 @@ public void testRecordGenerationAPIsForCOW() throws IOException {
List<String> expectedDeletes = deletes2.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> actualInserts = new ArrayList<>();
List<String> actualDeletes = new ArrayList<>();
// only inserts and deletes will result in RLI records. lets validate that.
generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1, finalCommitTime2, writeConfig, actualInserts, actualDeletes);

assertListEquality(expectedInserts, actualInserts);
Expand All @@ -149,11 +159,24 @@ public void testRecordGenerationAPIsForCOW() throws IOException {
assertListEquality(expectedInserts, actualInserts);
assertListEquality(expectedDeletes, actualDeletes);
// we can't really assert equality for updates. bcoz, w/ COW, we might just rewrite an existing parquet file. So, more records will be deduced as updates.
// And so, we can only validate contains.
// And so, we are validating using contains.
expectedUpdates.forEach(entry -> assertTrue(actualUpdates.contains(entry)));
}
}

/**
* Tests various methods used for RLI and SI record generation flows w/ MOR table. here emphasis are given to log files.
* We test below methods
* BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(...). This is used for RLI record generation.
* HoodieTableMetadataUtil.getRecordKeys() // This is used in both RLI and SI flow.
* HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated() for HoodieCommitMetadata.
* <p>
* We also test few adhoc scenarios.
* - if any log files contains inserts, RLI and SI record generation should throw exception.
* - RLI do no generate any records for compaction operation.
*
* @throws IOException
*/
@Test
public void testRecordGenerationAPIsForMOR() throws IOException {
HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;
Expand Down Expand Up @@ -189,6 +212,7 @@ public void testRecordGenerationAPIsForMOR() throws IOException {
fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1));
}

// poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch MDT RLI records for inserts and deletes.
Iterator<HoodieRecord> rliRecordsItr = BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage());
while (rliRecordsItr.hasNext()) {
Expand All @@ -210,7 +234,6 @@ public void testRecordGenerationAPIsForMOR() throws IOException {
// lets update some records and assert RLI records.
commitTime = client.createNewInstantTime();
client.startCommitWithTime(commitTime);
String finalCommitTime2 = commitTime;
List<HoodieRecord> deletes2 = dataGen.generateUniqueDeleteRecords(commitTime, 30);
List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates(commitTime, 30);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime, 30);
Expand All @@ -221,8 +244,9 @@ public void testRecordGenerationAPIsForMOR() throws IOException {

List<WriteStatus> writeStatuses2 = client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
assertNoWriteErrors(writeStatuses2);
assertRLIandSIRecordGenerationAPIs(inserts2, updates2, deletes2, writeStatuses2, commitTime, writeConfig);

// trigger 2nd commit followed by compaction.
// trigger 2nd commit.
commitTime = client.createNewInstantTime();
client.startCommitWithTime(commitTime);
String finalCommitTime3 = commitTime;
Expand All @@ -236,53 +260,12 @@ public void testRecordGenerationAPIsForMOR() throws IOException {

List<WriteStatus> writeStatuses3 = client.upsert(jsc.parallelize(records3, 1), commitTime).collect();
assertNoWriteErrors(writeStatuses3);

List<String> expectedRLIInserts = inserts3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedUpdates = updates3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedUpatesAndDeletes = new ArrayList<>(expectedRLIDeletes);
expectedUpatesAndDeletes.addAll(expectedUpdates);

// lets also validate
List<String> actualInserts = new ArrayList<>();
List<String> actualDeletes = new ArrayList<>();
List<String> actualUpdatesAndDeletes = new ArrayList<>();
generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus -> !FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath())))
.collect(Collectors.toList()), Collections.emptyMap(), finalCommitTime3, writeConfig, actualInserts, actualDeletes);

// process log files.
String latestCommitTimestamp = metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime();
Option<Schema> writerSchemaOpt = tryResolveSchemaForTable(metaClient);
writeStatuses3.stream().filter(writeStatus -> FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath())))
.forEach(writeStatus -> {
try {
actualDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp, false, true));

actualUpdatesAndDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp, true, true));
} catch (IOException e) {
throw new HoodieIOException("Failed w/ IOException ", e);
}
});

assertListEquality(expectedRLIInserts, actualInserts);
assertListEquality(expectedRLIDeletes, actualDeletes);
assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

// validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated which is used in SI code path.
List<String> updatedOrDeletedKeys =
new ArrayList<>(HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3));
List<String> expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates);
expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);
assertRLIandSIRecordGenerationAPIs(inserts3, updates3, deletes3, writeStatuses3, finalCommitTime3, writeConfig);

// lets validate that if any log file contains inserts, fetching keys will fail.
HoodieWriteStat writeStat = writeStatuses3.get(1).getStat();
writeStat.setNumInserts(5);
commitMetadata = CommitUtils.buildMetadata(Collections.singletonList(writeStat), Collections.emptyMap(),
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.singletonList(writeStat), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

try {
Expand All @@ -297,12 +280,62 @@ public void testRecordGenerationAPIsForMOR() throws IOException {
assertTrue(compactionInstantOpt.isPresent());
HoodieWriteMetadata compactionWriteMetadata = client.compact(compactionInstantOpt.get());
HoodieCommitMetadata compactionCommitMetadata = (HoodieCommitMetadata) compactionWriteMetadata.getCommitMetadata().get();
// no RLI records should be generated for this.
// no RLI records should be generated for compaction operation.
assertTrue(HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords(context, compactionCommitMetadata, writeConfig.getMetadataConfig(),
metaClient, writeConfig.getWritesFileIdEncoding(), compactionInstantOpt.get()).isEmpty());
}
}

private void assertRLIandSIRecordGenerationAPIs(List<HoodieRecord> inserts3, List<HoodieRecord> updates3, List<HoodieRecord> deletes3,
List<WriteStatus> writeStatuses3, String finalCommitTime3, HoodieWriteConfig writeConfig) {
List<String> expectedRLIInserts = inserts3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedUpdates = updates3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedRLIDeletes = deletes3.stream().map(record -> record.getKey().getRecordKey()).collect(Collectors.toList());
List<String> expectedUpatesAndDeletes = new ArrayList<>(expectedRLIDeletes);
expectedUpatesAndDeletes.addAll(expectedUpdates);

// lets validate RLI record generation.
List<String> actualInserts = new ArrayList<>();
List<String> actualDeletes = new ArrayList<>();
List<String> actualUpdatesAndDeletes = new ArrayList<>();
generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus -> !FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath())))
.collect(Collectors.toList()), Collections.emptyMap(), finalCommitTime3, writeConfig, actualInserts, actualDeletes);

// lets also test HoodieTableMetadataUtil.getRecordKeys() for each individual log file touched as part of HoodieCommitMetadata.
// lets test only deletes and also test both validat and deleted keys for log files.
// we have disabled small file handling. And so, updates and deletes will definitely go into log files.
String latestCommitTimestamp = metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime();
Option<Schema> writerSchemaOpt = tryResolveSchemaForTable(metaClient);
List<String> finalActualDeletes = actualDeletes;
writeStatuses3.stream().filter(writeStatus -> FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(), writeStatus.getPartitionPath())))
.forEach(writeStatus -> {
try {
// used for RLI
finalActualDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp, false, true));

// used in SI flow
actualUpdatesAndDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
writeConfig.getMetadataConfig().getMaxReaderBufferSize(), latestCommitTimestamp, true, true));
} catch (IOException e) {
throw new HoodieIOException("Failed w/ IOException ", e);
}
});

assertListEquality(expectedRLIInserts, actualInserts);
assertListEquality(expectedRLIDeletes, actualDeletes);
assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

// validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated for entire CommitMetadata which is used in SI code path.
List<String> updatedOrDeletedKeys =
new ArrayList<>(HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3));
List<String> expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates);
expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);
}

@Test
public void testReducedByKeysForRLIRecords() throws IOException {
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
Expand Down

0 comments on commit f15241b

Please sign in to comment.