diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsJoiner.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsJoiner.java index ed656a07d..df85de1b4 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsJoiner.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsJoiner.java @@ -43,7 +43,7 @@ public void populateStats() { startElement = 0; endElement = from[fromIndex].listPointerData.getElementValue(0, from[fromIndex].bitsPerListPointer); } else { - long endFixedLengthOffset = (long)fromOrdinal * from[fromIndex].bitsPerListPointer; + long endFixedLengthOffset = (long) fromOrdinal * from[fromIndex].bitsPerListPointer; long startFixedLengthOffset = endFixedLengthOffset - from[fromIndex].bitsPerListPointer; startElement = from[fromIndex].listPointerData.getElementValue(startFixedLengthOffset, from[fromIndex].bitsPerListPointer); endElement = from[fromIndex].listPointerData.getElementValue(endFixedLengthOffset, from[fromIndex].bitsPerListPointer); @@ -86,7 +86,7 @@ public void copyRecords() { // by not writing anything to elementData, and writing the cached value of elementCounter to listPointerData // SNAP: TODO: write a test for lopsided list shards. Theres one in object joiner tests. - to.listPointerData.setElementValue(to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter); + to.listPointerData.setElementValue((long) to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter); } } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java index df097eeee..bb2a20f06 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java @@ -39,7 +39,7 @@ public void populateStats() { startElement = 0; endElement = from.listPointerData.getElementValue(0, from.bitsPerListPointer); } else { - long endFixedLengthOffset = (long)ordinal * from.bitsPerListPointer; + long endFixedLengthOffset = (long) ordinal * from.bitsPerListPointer; long startFixedLengthOffset = endFixedLengthOffset - from.bitsPerListPointer; startElement = from.listPointerData.getElementValue(startFixedLengthOffset, from.bitsPerListPointer); endElement = from.listPointerData.getElementValue(endFixedLengthOffset, from.bitsPerListPointer); @@ -61,7 +61,7 @@ public void populateStats() { target.bitsPerListPointer = maxShardTotalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(maxShardTotalOfListSizes); target.listPointerData = FixedLengthDataFactory.get((long)target.bitsPerListPointer * (target.maxOrdinal + 1), target.memoryMode, target.memoryRecycler); - target.elementData = FixedLengthDataFactory.get((long)target.bitsPerElement * totalOfListSizes[toIndex], target.memoryMode, target.memoryRecycler); + target.elementData = FixedLengthDataFactory.get(target.bitsPerElement * totalOfListSizes[toIndex], target.memoryMode, target.memoryRecycler); target.totalNumberOfElements = totalOfListSizes[toIndex]; // useful for heap usage stats } @@ -96,7 +96,7 @@ public void copyRecords() { target.elementData.setElementValue(elementCounter[toIndex] * target.bitsPerElement, target.bitsPerElement, elementOrdinal); elementCounter[toIndex]++; } - target.listPointerData.setElementValue(target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]); + target.listPointerData.setElementValue((long) target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]); } } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoiner.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoiner.java index a919442e0..43770979e 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoiner.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoiner.java @@ -1,5 +1,8 @@ package com.netflix.hollow.core.read.engine.map; +import static com.netflix.hollow.core.read.engine.map.HollowMapTypeReadStateShard.getAbsoluteBucketStart; + +import com.netflix.hollow.core.memory.FixedLengthDataFactory; import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsJoiner; @@ -22,9 +25,89 @@ public void init() { @Override public void populateStats() { + for(int fromIndex=0;fromIndex to.bitsPerKeyElement) { + to.bitsPerKeyElement = source.bitsPerKeyElement; + } + if (source.bitsPerValueElement > to.bitsPerValueElement) { + to.bitsPerValueElement = source.bitsPerValueElement; + } + if (source.bitsPerMapSizeValue > to.bitsPerMapSizeValue) { + to.bitsPerMapSizeValue = source.bitsPerMapSizeValue; + } + } + to.emptyBucketKeyValue = (1 << to.bitsPerKeyElement) - 1; + + long totalOfMapBuckets = 0; + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + HollowMapTypeDataElements source = from[fromIndex]; + + long startBucket = getAbsoluteBucketStart(source, fromOrdinal); + long endBucket = source.mapPointerAndSizeData.getElementValue((long)fromOrdinal * source.bitsPerFixedLengthMapPortion, source.bitsPerMapPointer); + long numBuckets = endBucket - startBucket; + + totalOfMapBuckets += numBuckets; + } + + to.totalNumberOfBuckets = totalOfMapBuckets; + to.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(to.totalNumberOfBuckets); + to.bitsPerFixedLengthMapPortion = to.bitsPerMapSizeValue + to.bitsPerMapPointer; + to.bitsPerMapEntry = to.bitsPerKeyElement + to.bitsPerValueElement; + + to.mapPointerAndSizeData = FixedLengthDataFactory.get((long)to.bitsPerFixedLengthMapPortion * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler); + to.entryData = FixedLengthDataFactory.get((long)to.bitsPerMapEntry * to.totalNumberOfBuckets, to.memoryMode, to.memoryRecycler); } @Override public void copyRecords() { + long bucketCounter = 0; + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + HollowMapTypeDataElements source = from[fromIndex]; + + if (fromOrdinal <= from[fromIndex].maxOrdinal) { + long startBucket = getAbsoluteBucketStart(source, fromOrdinal); + long endBucket = source.mapPointerAndSizeData.getElementValue((long)fromOrdinal * source.bitsPerFixedLengthMapPortion, source.bitsPerMapPointer); + + long numBuckets = endBucket - startBucket; + if (false) { + // if (to.bitsPerKeyElement == source.bitsPerKeyElement && to.bitsPerValueElement == source.bitsPerValueElement) { // SNAP: TODO: test with this TRUE AND FALSE, for both fast and slow paths + // emptyBucketKeyValue will also be uniform + long bitsPerMapEntry = to.bitsPerMapEntry; + long targetBucketOffset = bucketCounter * bitsPerMapEntry; + // can only do one map record at a time at most, unlike delta + // SNAP: TODO: bulk copy can also be applied to splitter + to.entryData.copyBits(source.entryData, startBucket * bitsPerMapEntry, targetBucketOffset, numBuckets * bitsPerMapEntry); + bucketCounter += numBuckets; + } + // SNAP: TODO: this is slow copy, but we could also do fast copy for equal width joins + for (long bucket=startBucket;bucket> toOrdinalShift; + to[toIndex].maxOrdinal = toOrdinal; + + long startBucket = getAbsoluteBucketStart(from, ordinal); + long endBucket = from.mapPointerAndSizeData.getElementValue((long)ordinal * from.bitsPerFixedLengthMapPortion, from.bitsPerMapPointer); + long numBuckets = endBucket - startBucket; + + shardTotalOfMapBuckets[toIndex] += numBuckets; + if(shardTotalOfMapBuckets[toIndex] > maxShardTotalOfMapBuckets) { + maxShardTotalOfMapBuckets = shardTotalOfMapBuckets[toIndex]; + } + } + + for(int toIndex=0;toIndex> toOrdinalShift; + + long startBucket = getAbsoluteBucketStart(from, ordinal); + long endBucket =from.mapPointerAndSizeData.getElementValue((long) ordinal * from.bitsPerFixedLengthMapPortion, from.bitsPerMapPointer); + HollowMapTypeDataElements target = to[toIndex]; + for (long bucket=startBucket;bucket> toOrdinalShift; long startBucket = getAbsoluteBucketStart(from, ordinal); - long endBucket = from.setPointerAndSizeData.getElementValue((long)ordinal * from.bitsPerFixedLengthSetPortion, from.bitsPerSetPointer); + long endBucket = from.setPointerAndSizeData.getElementValue((long) ordinal * from.bitsPerFixedLengthSetPortion, from.bitsPerSetPointer); HollowSetTypeDataElements target = to[toIndex]; for (long bucket=startBucket;bucket new IllegalArgumentException("Array is empty")); + // populate write state with that many ordinals + super.populateWriteStateEngine(numKeyValueOrdinals); + for(int[][] map : maps) { + HollowMapWriteRecord rec = new HollowMapWriteRecord(); + for (int[] entry : map) { + assertEquals(2, entry.length); // key value pair + rec.addEntry(entry[0], entry[1]); + } + writeStateEngine.add("TestMap", rec); + } + } + + protected HollowMapTypeReadState populateTypeStateWith(int[][][] maps) throws IOException { + populateWriteStateEngine(maps); + roundTripSnapshot(); + return (HollowMapTypeReadState) readStateEngine.getTypeState("TestMap"); + } + + protected void assertDataUnchanged(HollowMapTypeReadState typeState, int[][][] maps) { + int numMapRecords = maps.length; + for(int i=0;i expected = convertToMap(maps[i]); + Map actual = readMap(typeState, i); + assertEquals(expected, actual); + + // System.out.println(obj.toString()); // SNAP: TODO: cleanup + // assertEquals(i, obj.get("longField")); + // assertEquals("Value"+i, obj.getString("stringField")); + // assertEquals((double)i, obj.getDouble("doubleField"), 0); + // if (typeState.getSchema().numFields() == 4) { // filtered + // assertEquals(i, obj.getInt("intField")); + // } + } + } + + public static Map readMap(HollowMapTypeReadState typeState, int ordinal) { + Map result = new HashMap<>(); + HollowMapEntryOrdinalIterator iter = typeState.ordinalIterator(ordinal); + boolean hasMore = iter.next(); + while (hasMore) { + int key = iter.getKey(); + int value = iter.getValue(); + result.put(key, value); + hasMore = iter.next(); + } + return result; + } + + public static Map convertToMap(int[][] array) { + Map map = new HashMap<>(); + + for (int[] pair : array) { + if (pair.length == 2) { + map.put(pair[0], pair[1]); + } else { + throw new IllegalArgumentException("Each sub-array must have exactly 2 elements."); + } + } + + return map; + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java new file mode 100644 index 000000000..b49a765ba --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java @@ -0,0 +1,139 @@ +package com.netflix.hollow.core.read.engine.map; + +import static org.junit.Assert.assertEquals; + +import com.netflix.hollow.api.consumer.HollowConsumer; +import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever; +import com.netflix.hollow.core.memory.MemoryMode; +import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.tools.checksum.HollowChecksum; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.BitSet; +import org.junit.Test; + +public class HollowMapTypeDataElementsSplitJoinTest extends AbstractHollowMapTypeDataElementsSplitJoinTest { + + @Test + public void testSplitThenJoin() throws IOException { + int[][][] maps = new int[][][] { + { {33321, 1}, {2, 2}, {32224, 3} }, + { {1, 31442}, {2, 1}, {3, 2} }, + { {1002, 2} }, + { {0, 134} }, + }; + + // 1->2->1, 1->4->1, ... + for (int listRecord=0;listRecord