Skip to content

Commit

Permalink
cleanup for PR
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Aug 30, 2024
1 parent 3933c56 commit ebddd32
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public void testJoin() throws IOException {
}

@Test
// SNAP: TODO: achieve this at the read state level, need to read the set back out for comparison
// SNAP: TODO: consider cleaning this test up
public void testJoinDifferentFieldWidths() throws IOException {
HollowListTypeReadState typeReadStateSmall = populateTypeStateWith(new int[][] {{1}});
assertEquals(1, typeReadStateSmall.numShards());
Expand Down Expand Up @@ -90,6 +88,6 @@ public void testJoinDifferentFieldWidths() throws IOException {

// @Test
// public void testLopsidedShards() {
// // TODO: implement when producer supports enabling type sharding for List types
// // TODO: implement when producer allows enabling type sharding for List types
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public void testSplitThenJoin() throws IOException {
}
}

prettyPrintArray(listContents);

// 1->2->1, 1->4->1, ...
for (int listRecord=0;listRecord<numListRecords;listRecord++) {
HollowListTypeReadState typeReadState = populateTypeStateWith(listContents);
Expand All @@ -44,8 +42,6 @@ public void testSplitThenJoin() throws IOException {
HollowListTypeReadStateShard joinedShard = new HollowListTypeReadStateShard();
joinedShard.setCurrentData(joinedElements);

// SNAP: TODO: hack: constructor created for test, passed read state engine has reference to pre-join type state.
// can replace with checking equality on on data elements
HollowListTypeReadState resultTypeReadState = new HollowListTypeReadState(MemoryMode.ON_HEAP, typeReadState.getSchema(), 1, new HollowListTypeReadStateShard[] {joinedShard});

assertDataUnchanged(resultTypeReadState, listContents);
Expand All @@ -54,31 +50,6 @@ public void testSplitThenJoin() throws IOException {
}
}

static void prettyPrintArray(int[][] array) {
for (int[] row : array) {
for (int element : row) {
System.out.print(element + "\t");
}
System.out.println();
}
}

private void assertChecksumUnchanged(HollowListTypeReadState newTypeState, HollowListTypeReadState origTypeState, BitSet populatedOrdinals) {
HollowChecksum origCksum = new HollowChecksum();
HollowChecksum newCksum = new HollowChecksum();

for(int i=0;i<origTypeState.numShards();i++) {
origTypeState.shards[i].applyToChecksum(origCksum, populatedOrdinals, i, origTypeState.numShards());
// SNAP: TODO: this will be shardsVolatile, and consider moving this method into base class
}

for(int i=0;i<newTypeState.numShards();i++) {
newTypeState.shards[i].applyToChecksum(newCksum, populatedOrdinals, i, newTypeState.numShards());
}

assertEquals(newCksum, origCksum);
}


@Test
public void testSplitThenJoinWithEmptyJoin() throws IOException {
Expand All @@ -99,15 +70,13 @@ public void testSplitThenJoinWithEmptyJoin() throws IOException {
}

// manually invoked
@Test
// @Test
public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {

// SNAP: TODO: cleanup this whole method
String blobPath = "/Users/ssingh/workspace/blob-cache/vms-daintree/prod/"; // null; // dir where snapshot blob exists for e.g. "/tmp/";
long v = 20230611133921525l; // 0l; // snapshot version for e.g. 20230915162636001l;
String[] listTypesWithOneShard = {"ListOfInteger", "ListOfMoviePersonCharacter", "ListOfLiveOutputGroup", "ListOfVideo", "ListOfVideoEpisode",
"ListOfPhase", "ListOfRolloutPhaseWindow", "ListOfStrings", "ListOfTimecodeAnnotation"}; // null; // type name corresponding to an Object type with single shard for e.g. "Movie";
int[] numSplitsArray = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; //
String blobPath = null; // dir where snapshot blob exists for e.g. "/tmp/";
long v = 0l; // snapshot version for e.g. 20230915162636001l;
String[] listTypesWithOneShard = null; // type name corresponding to an Object type with single shard for e.g. "Movie";
int[] numSplitsArray = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};

HollowFilesystemBlobRetriever hollowBlobRetriever = new HollowFilesystemBlobRetriever(Paths.get(blobPath));
HollowConsumer c = HollowConsumer.withBlobRetriever(hollowBlobRetriever).build();
Expand All @@ -131,7 +100,7 @@ public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {

HollowListTypeReadStateShard joinedShard = new HollowListTypeReadStateShard();
joinedShard.setCurrentData(joinedElements);
// SNAP: TODO: refactor when constructor changes (this one takes readStateEngine which doesnt correspond to joinedShard)

HollowListTypeReadState resultTypeState = new HollowListTypeReadState(MemoryMode.ON_HEAP, typeState.getSchema(), 1, new HollowListTypeReadStateShard[]{joinedShard});

assertChecksumUnchanged(resultTypeState, typeState, typeState.getPopulatedOrdinals());
Expand All @@ -140,4 +109,19 @@ public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {
}
}
}

private void assertChecksumUnchanged(HollowListTypeReadState newTypeState, HollowListTypeReadState origTypeState, BitSet populatedOrdinals) {
HollowChecksum origCksum = new HollowChecksum();
HollowChecksum newCksum = new HollowChecksum();

for(int i=0;i<origTypeState.numShards();i++) {
origTypeState.shards[i].applyToChecksum(origCksum, populatedOrdinals, i, origTypeState.numShards());
}

for(int i=0;i<newTypeState.numShards();i++) {
newTypeState.shards[i].applyToChecksum(newCksum, populatedOrdinals, i, newTypeState.numShards());
}

assertEquals(newCksum, origCksum);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.netflix.hollow.core.read.engine.list;

import static org.junit.Assert.assertEquals;

import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.tools.checksum.HollowChecksum;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.BitSet;
import org.junit.Test;

public class VMSHollowListTypeDataElementsSplitJoinTest extends AbstractHollowListTypeDataElementsSplitJoinTest {

@Test
public void testSplitThenJoin() throws IOException {

int numListRecords = 100;
int[][] listContents = new int[numListRecords][];
for (int i=0;i<numListRecords;i++) {
listContents[i] = new int[i+1];
for (int j=0;j<i+1;j++) {
listContents[i][j] = j;
}
}

prettyPrintArray(listContents);

// 1->2->1, 1->4->1, ...
for (int listRecord=0;listRecord<numListRecords;listRecord++) {
HollowListTypeReadState typeReadState = populateTypeStateWith(listContents);
assertEquals(1, typeReadState.numShards());
assertEquals(numListRecords, typeReadState.getPopulatedOrdinals().cardinality());
assertDataUnchanged(typeReadState, listContents);

for (int numSplits : new int[]{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}) {
HollowListTypeDataElementsSplitter splitter = new HollowListTypeDataElementsSplitter(typeReadState.currentDataElements()[0], numSplits);
HollowListTypeDataElements[] splitElements = splitter.split();

HollowListTypeDataElementsJoiner joiner = new HollowListTypeDataElementsJoiner(splitElements);
HollowListTypeDataElements joinedElements = joiner.join();
HollowListTypeReadStateShard joinedShard = new HollowListTypeReadStateShard();
joinedShard.setCurrentData(joinedElements);

HollowListTypeReadState resultTypeReadState = new HollowListTypeReadState(MemoryMode.ON_HEAP, typeReadState.getSchema(), 1, new HollowListTypeReadStateShard[] {joinedShard});

assertDataUnchanged(resultTypeReadState, listContents);
assertChecksumUnchanged(resultTypeReadState, typeReadState, typeReadState.getPopulatedOrdinals());
}
}
}

static void prettyPrintArray(int[][] array) {
for (int[] row : array) {
for (int element : row) {
System.out.print(element + "\t");
}
System.out.println();
}
}

private void assertChecksumUnchanged(HollowListTypeReadState newTypeState, HollowListTypeReadState origTypeState, BitSet populatedOrdinals) {
HollowChecksum origCksum = new HollowChecksum();
HollowChecksum newCksum = new HollowChecksum();

for(int i=0;i<origTypeState.numShards();i++) {
origTypeState.shards[i].applyToChecksum(origCksum, populatedOrdinals, i, origTypeState.numShards());
}

for(int i=0;i<newTypeState.numShards();i++) {
newTypeState.shards[i].applyToChecksum(newCksum, populatedOrdinals, i, newTypeState.numShards());
}

assertEquals(newCksum, origCksum);
}


@Test
public void testSplitThenJoinWithEmptyJoin() throws IOException {
int numListRecords = 1;
int[][] listContents = {{1}};
HollowListTypeReadState typeReadState = populateTypeStateWith(listContents);
assertEquals(1, typeReadState.numShards());

HollowListTypeDataElementsSplitter splitter = new HollowListTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 4);
HollowListTypeDataElements[] splitBy4 = splitter.split();
assertEquals(-1, splitBy4[1].maxOrdinal);
assertEquals(-1, splitBy4[3].maxOrdinal);

HollowListTypeDataElementsJoiner joiner = new HollowListTypeDataElementsJoiner(new HollowListTypeDataElements[]{splitBy4[1], splitBy4[3]});
HollowListTypeDataElements joined = joiner.join();

assertEquals(-1, joined.maxOrdinal);
}

// manually invoked
@Test
public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {

String blobPath = "/Users/ssingh/workspace/blob-cache/vms-daintree/prod/"; // null; // dir where snapshot blob exists for e.g. "/tmp/";
long v = 20230611133921525l; // 0l; // snapshot version for e.g. 20230915162636001l;
String[] listTypesWithOneShard = {"ListOfInteger", "ListOfMoviePersonCharacter", "ListOfLiveOutputGroup", "ListOfVideo", "ListOfVideoEpisode",
"ListOfPhase", "ListOfRolloutPhaseWindow", "ListOfStrings", "ListOfTimecodeAnnotation"}; // null; // type name corresponding to an Object type with single shard for e.g. "Movie";
int[] numSplitsArray = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; //

HollowFilesystemBlobRetriever hollowBlobRetriever = new HollowFilesystemBlobRetriever(Paths.get(blobPath));
HollowConsumer c = HollowConsumer.withBlobRetriever(hollowBlobRetriever).build();
c.triggerRefreshTo(v);
HollowReadStateEngine readStateEngine = c.getStateEngine();

for (String listTypeWithOneShard : listTypesWithOneShard) {
for (int numSplits : numSplitsArray) {
if (blobPath==null || v==0l || listTypeWithOneShard==null) {
throw new IllegalArgumentException("These arguments need to be specified");
}
HollowListTypeReadState typeState = (HollowListTypeReadState) readStateEngine.getTypeState(listTypeWithOneShard);

assertEquals(1, typeState.numShards());

HollowListTypeDataElementsSplitter splitter = new HollowListTypeDataElementsSplitter(typeState.currentDataElements()[0], numSplits);
HollowListTypeDataElements[] splitElements = splitter.split();

HollowListTypeDataElementsJoiner joiner = new HollowListTypeDataElementsJoiner(splitElements);
HollowListTypeDataElements joinedElements = joiner.join();

HollowListTypeReadStateShard joinedShard = new HollowListTypeReadStateShard();
joinedShard.setCurrentData(joinedElements);

HollowListTypeReadState resultTypeState = new HollowListTypeReadState(MemoryMode.ON_HEAP, typeState.getSchema(), 1, new HollowListTypeReadStateShard[]{joinedShard});

assertChecksumUnchanged(resultTypeState, typeState, typeState.getPopulatedOrdinals());

System.out.println("Processed type " + listTypeWithOneShard + " with " + numSplits + " splits");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ protected void assertDataUnchanged(HollowMapTypeReadState typeState, int[][][] m
Map<Integer, Integer> expected = convertToMap(maps[i]);
Map<Integer, Integer> actual = readMap(typeState, i);
assertEquals(expected, actual);

// System.out.println(obj.toString()); // SNAP: TODO: cleanup
// assertEquals(i, obj.get("longField"));
// assertEquals("Value"+i, obj.getString("stringField"));
// assertEquals((double)i, obj.getDouble("doubleField"), 0);
// if (typeState.getSchema().numFields() == 4) { // filtered
// assertEquals(i, obj.getInt("intField"));
// }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public void testJoin() throws IOException {
}

@Test
// SNAP: TODO: also do this at the read state level
public void testJoinDifferentFieldWidths() throws IOException {
HollowMapTypeReadState typeReadStateSmall = populateTypeStateWith(new int[][][] {{{1,2}}});
assertEquals(1, typeReadStateSmall.numShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public void testSplitThenJoin() throws IOException {
HollowMapTypeReadStateShard joinedShard = new HollowMapTypeReadStateShard();
joinedShard.setCurrentData(joinedElements);

// SNAP: TODO: hack: constructor created for test, passed read state engine has reference to pre-join type state.
// can replace with checking equality on on data elements
HollowMapTypeReadState resultTypeReadState = new HollowMapTypeReadState(MemoryMode.ON_HEAP, typeReadState.getSchema(), 1, new HollowMapTypeReadStateShard[] {joinedShard});

assertDataUnchanged(resultTypeReadState, maps);
Expand Down Expand Up @@ -69,22 +67,12 @@ public void testSplitThenJoinWithEmptyJoin() throws IOException {
}

// manually invoked
@Test
// @Test
public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {

// SNAP: TODO: cleanup this whole method
String blobPath = "/Users/ssingh/workspace/blob-cache/vms-daintree/prod/"; // null; // dir where snapshot blob exists for e.g. "/tmp/";
long v = 20230611133921525l; // 0l; // snapshot version for e.g. 20230915162636001l;
String[] mapTypesWithOneShard = {
"MapOfDateWindowToListOfInteger",
"MapOfISOCountryToListOfRolloutPhaseWindow",
"MapOfIntegerToWindowPackageContractInfo",
"MapOfStringsToLanguageRestrictions",
"MapOfTimecodeAnnotationTagKeyToTimecodeAnnotationTagValue",
"MapOfTrickPlayTypeToTrickPlayItem",
"MapOfVRoleToListOfVPerson",
"MapOfISOCountryToAvailabilityWindow"
}; // null; // type name corresponding to an Object type with single shard for e.g. "Movie";
String blobPath = null; // dir where snapshot blob exists for e.g. "/tmp/";
long v = 0l; // snapshot version for e.g. 20230915162636001l;
String[] mapTypesWithOneShard = null; // type name corresponding to an Object type with single shard for e.g. "Movie";
int[] numSplitsArray = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};

HollowFilesystemBlobRetriever hollowBlobRetriever = new HollowFilesystemBlobRetriever(Paths.get(blobPath));
Expand All @@ -109,7 +97,7 @@ public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {

HollowMapTypeReadStateShard joinedShard = new HollowMapTypeReadStateShard();
joinedShard.setCurrentData(joinedElements);
// SNAP: TODO: refactor when constructor changes (this one takes readStateEngine which doesnt correspond to joinedShard)

HollowMapTypeReadState resultTypeState = new HollowMapTypeReadState(MemoryMode.ON_HEAP, typeState.getSchema(), 1, new HollowMapTypeReadStateShard[]{joinedShard});

assertChecksumUnchanged(resultTypeState, typeState, typeState.getPopulatedOrdinals());
Expand All @@ -125,7 +113,6 @@ private void assertChecksumUnchanged(HollowMapTypeReadState newTypeState, Hollow

for(int i=0;i<origTypeState.numShards();i++) {
origTypeState.shards[i].applyToChecksum(origCksum, populatedOrdinals, i, origTypeState.numShards());
// SNAP: TODO: this will be shardsVolatile, and consider moving this method into base class
}

for(int i=0;i<newTypeState.numShards();i++) {
Expand Down
Loading

0 comments on commit ebddd32

Please sign in to comment.