Skip to content

Commit

Permalink
Adding more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 27, 2024
1 parent 6a2c820 commit 66775ee
Showing 1 changed file with 97 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
Expand Down Expand Up @@ -55,11 +56,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestRLIRecordGeneration extends HoodieClientTestBase {

Expand Down Expand Up @@ -276,6 +279,19 @@ public void testRecordGenerationAPIsForMOR() throws IOException {
expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);

// lets validate that if any log file contains inserts, fetching keys will fail.
HoodieWriteStat writeStat = writeStatuses3.get(1).getStat();
writeStat.setNumInserts(5);
commitMetadata = CommitUtils.buildMetadata(Collections.singletonList(writeStat), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

try {
HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3);
fail("Should not have reached here");
} catch (Exception e) {
// no op
}

// trigger compaction
Option<String> compactionInstantOpt = client.scheduleCompaction(Option.empty());
assertTrue(compactionInstantOpt.isPresent());
Expand All @@ -287,6 +303,87 @@ public void testRecordGenerationAPIsForMOR() throws IOException {
}
}

@Test
public void testReducedByKeysForRLIRecords() throws IOException {
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
cleanupClients();
initMetaClient(tableType);
cleanupTimelineService();
initTimelineService();

HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieWriteConfig writeConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
String commitTime = client.createNewInstantTime();
List<HoodieRecord> inserts = dataGen.generateInserts(commitTime, 100);
List<HoodieRecord> deletes = dataGen.generateUniqueDeleteRecords(commitTime, 20);
String randomFileId = UUID.randomUUID().toString() + "-0";
List<String> deletedRecordKeys = deletes.stream().map(record -> record.getRecordKey()).collect(Collectors.toList());
List<HoodieRecord> adjustedInserts = inserts.stream().filter(record -> !deletedRecordKeys.contains(record.getRecordKey())).collect(Collectors.toList());

List<HoodieRecord> insertRecords =
inserts.stream().map(record -> HoodieMetadataPayload.createRecordIndexUpdate(record.getRecordKey(), "abc", randomFileId, commitTime, 0))
.collect(Collectors.toList());
List<HoodieRecord> deleteRecords = inserts.stream().map(record -> HoodieMetadataPayload.createRecordIndexDelete(record.getRecordKey()))
.collect(Collectors.toList());

List<HoodieRecord> recordsToTest = new ArrayList<>();
recordsToTest.addAll(adjustedInserts);
recordsToTest.addAll(deleteRecords);
// happy paths. no dups. in and out are same.
List<HoodieRecord> actualRecords = HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList();
assertHoodieRecordListEquality(actualRecords, recordsToTest);

// few records has both inserts and deletes.
recordsToTest = new ArrayList<>();
recordsToTest.addAll(insertRecords);
recordsToTest.addAll(deleteRecords);
actualRecords = HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList();
List<HoodieRecord> expectedList = new ArrayList<>();
expectedList.addAll(insertRecords);
assertHoodieRecordListEquality(actualRecords, expectedList);

// few deletes are duplicates. we are allowed to have duplicate deletes.
recordsToTest = new ArrayList<>();
recordsToTest.addAll(adjustedInserts);
recordsToTest.addAll(deleteRecords);
recordsToTest.addAll(deleteRecords.subList(0, 10));
actualRecords = HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList();
expectedList = new ArrayList<>();
expectedList.addAll(adjustedInserts);
expectedList.addAll(deleteRecords);
assertHoodieRecordListEquality(actualRecords, expectedList);

// test failure case. same record having 2 inserts should fail.
recordsToTest = new ArrayList<>();
recordsToTest.addAll(adjustedInserts);
recordsToTest.addAll(adjustedInserts.subList(0, 5));
try {
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2), 2).collectAsList();
fail("Should not have reached here");
} catch (Exception e) {
// expected. no-op
assertTrue(e.getCause() instanceof HoodieIOException);
}
}
}

private void assertHoodieRecordListEquality(List<HoodieRecord> actualList, List<HoodieRecord> expectedList) {
assertEquals(expectedList.size(), actualList.size());
List<String> expectedInsertRecordKeys = expectedList.stream().filter(record -> !(record.getData() instanceof EmptyHoodieRecordPayload))
.map(record -> record.getRecordKey()).collect(Collectors.toList());
List<String> expectedDeletedRecordKeys = expectedList.stream().filter(record -> (record.getData() instanceof EmptyHoodieRecordPayload))
.map(record -> record.getRecordKey()).collect(Collectors.toList());

List<String> actualInsertRecordKeys = actualList.stream().filter(record -> !(record.getData() instanceof EmptyHoodieRecordPayload))
.map(record -> record.getRecordKey()).collect(Collectors.toList());
List<String> actualDeletedRecordKeys = actualList.stream().filter(record -> (record.getData() instanceof EmptyHoodieRecordPayload))
.map(record -> record.getRecordKey()).collect(Collectors.toList());

assertListEquality(expectedInsertRecordKeys, actualInsertRecordKeys);
assertListEquality(expectedDeletedRecordKeys, actualDeletedRecordKeys);
}

private void assertListEquality(List<String> list1, List<String> list2) {
Collections.sort(list1);
Collections.sort(list2);
Expand Down

0 comments on commit 66775ee

Please sign in to comment.