Skip to content

Commit

Permalink
map type split join
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Aug 28, 2024
1 parent d863681 commit c4455d1
Show file tree
Hide file tree
Showing 14 changed files with 494 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void populateStats() {
startElement = 0;
endElement = from[fromIndex].listPointerData.getElementValue(0, from[fromIndex].bitsPerListPointer);
} else {
long endFixedLengthOffset = (long)fromOrdinal * from[fromIndex].bitsPerListPointer;
long endFixedLengthOffset = (long) fromOrdinal * from[fromIndex].bitsPerListPointer;
long startFixedLengthOffset = endFixedLengthOffset - from[fromIndex].bitsPerListPointer;
startElement = from[fromIndex].listPointerData.getElementValue(startFixedLengthOffset, from[fromIndex].bitsPerListPointer);
endElement = from[fromIndex].listPointerData.getElementValue(endFixedLengthOffset, from[fromIndex].bitsPerListPointer);
Expand Down Expand Up @@ -86,7 +86,7 @@ public void copyRecords() {
// by not writing anything to elementData, and writing the cached value of elementCounter to listPointerData
// SNAP: TODO: write a test for lopsided list shards. Theres one in object joiner tests.

to.listPointerData.setElementValue(to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter);
to.listPointerData.setElementValue((long) to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void populateStats() {
startElement = 0;
endElement = from.listPointerData.getElementValue(0, from.bitsPerListPointer);
} else {
long endFixedLengthOffset = (long)ordinal * from.bitsPerListPointer;
long endFixedLengthOffset = (long) ordinal * from.bitsPerListPointer;
long startFixedLengthOffset = endFixedLengthOffset - from.bitsPerListPointer;
startElement = from.listPointerData.getElementValue(startFixedLengthOffset, from.bitsPerListPointer);
endElement = from.listPointerData.getElementValue(endFixedLengthOffset, from.bitsPerListPointer);
Expand All @@ -61,7 +61,7 @@ public void populateStats() {
target.bitsPerListPointer = maxShardTotalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(maxShardTotalOfListSizes);

target.listPointerData = FixedLengthDataFactory.get((long)target.bitsPerListPointer * (target.maxOrdinal + 1), target.memoryMode, target.memoryRecycler);
target.elementData = FixedLengthDataFactory.get((long)target.bitsPerElement * totalOfListSizes[toIndex], target.memoryMode, target.memoryRecycler);
target.elementData = FixedLengthDataFactory.get(target.bitsPerElement * totalOfListSizes[toIndex], target.memoryMode, target.memoryRecycler);

target.totalNumberOfElements = totalOfListSizes[toIndex]; // useful for heap usage stats
}
Expand Down Expand Up @@ -96,7 +96,7 @@ public void copyRecords() {
target.elementData.setElementValue(elementCounter[toIndex] * target.bitsPerElement, target.bitsPerElement, elementOrdinal);
elementCounter[toIndex]++;
}
target.listPointerData.setElementValue(target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]);
target.listPointerData.setElementValue((long) target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.netflix.hollow.core.read.engine.map;

import static com.netflix.hollow.core.read.engine.map.HollowMapTypeReadStateShard.getAbsoluteBucketStart;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsJoiner;


Expand All @@ -22,9 +25,89 @@ public void init() {

@Override
public void populateStats() {
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
int mappedMaxOrdinal = from[fromIndex].maxOrdinal == -1 ? -1 : (from[fromIndex].maxOrdinal * from.length) + fromIndex;
to.maxOrdinal = Math.max(to.maxOrdinal, mappedMaxOrdinal);

// uneven stats could be the case for consumers that skip type shards with no additions, so pick max across all shards
HollowMapTypeDataElements source = from[fromIndex];
if (source.bitsPerKeyElement > to.bitsPerKeyElement) {
to.bitsPerKeyElement = source.bitsPerKeyElement;
}
if (source.bitsPerValueElement > to.bitsPerValueElement) {
to.bitsPerValueElement = source.bitsPerValueElement;
}
if (source.bitsPerMapSizeValue > to.bitsPerMapSizeValue) {
to.bitsPerMapSizeValue = source.bitsPerMapSizeValue;
}
}
to.emptyBucketKeyValue = (1 << to.bitsPerKeyElement) - 1;

long totalOfMapBuckets = 0;
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

HollowMapTypeDataElements source = from[fromIndex];

long startBucket = getAbsoluteBucketStart(source, fromOrdinal);
long endBucket = source.mapPointerAndSizeData.getElementValue((long)fromOrdinal * source.bitsPerFixedLengthMapPortion, source.bitsPerMapPointer);
long numBuckets = endBucket - startBucket;

totalOfMapBuckets += numBuckets;
}

to.totalNumberOfBuckets = totalOfMapBuckets;
to.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(to.totalNumberOfBuckets);
to.bitsPerFixedLengthMapPortion = to.bitsPerMapSizeValue + to.bitsPerMapPointer;
to.bitsPerMapEntry = to.bitsPerKeyElement + to.bitsPerValueElement;

to.mapPointerAndSizeData = FixedLengthDataFactory.get((long)to.bitsPerFixedLengthMapPortion * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);
to.entryData = FixedLengthDataFactory.get((long)to.bitsPerMapEntry * to.totalNumberOfBuckets, to.memoryMode, to.memoryRecycler);
}

@Override
public void copyRecords() {
long bucketCounter = 0;
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

HollowMapTypeDataElements source = from[fromIndex];

if (fromOrdinal <= from[fromIndex].maxOrdinal) {
long startBucket = getAbsoluteBucketStart(source, fromOrdinal);
long endBucket = source.mapPointerAndSizeData.getElementValue((long)fromOrdinal * source.bitsPerFixedLengthMapPortion, source.bitsPerMapPointer);

long numBuckets = endBucket - startBucket;
if (false) {
// if (to.bitsPerKeyElement == source.bitsPerKeyElement && to.bitsPerValueElement == source.bitsPerValueElement) { // SNAP: TODO: test with this TRUE AND FALSE, for both fast and slow paths
// emptyBucketKeyValue will also be uniform
long bitsPerMapEntry = to.bitsPerMapEntry;
long targetBucketOffset = bucketCounter * bitsPerMapEntry;
// can only do one map record at a time at most, unlike delta
// SNAP: TODO: bulk copy can also be applied to splitter
to.entryData.copyBits(source.entryData, startBucket * bitsPerMapEntry, targetBucketOffset, numBuckets * bitsPerMapEntry);
bucketCounter += numBuckets;
}
// SNAP: TODO: this is slow copy, but we could also do fast copy for equal width joins
for (long bucket=startBucket;bucket<endBucket;bucket++) {
long targetBucketOffset = bucketCounter * to.bitsPerMapEntry;
long bucketKey = source.entryData.getElementValue(bucket * source.bitsPerMapEntry, source.bitsPerKeyElement);
long bucketValue = source.entryData.getElementValue(bucket * source.bitsPerMapEntry + source.bitsPerKeyElement, source.bitsPerValueElement);
if(bucketKey == source.emptyBucketKeyValue)
bucketKey = to.emptyBucketKeyValue; // since empty bucket key value can be non-uniform across shards
to.entryData.setElementValue(targetBucketOffset, to.bitsPerKeyElement, bucketKey);
to.entryData.setElementValue(targetBucketOffset + to.bitsPerKeyElement, to.bitsPerValueElement, bucketValue);
bucketCounter++;
}
} // else: lopsided shards could result for consumers that skip type shards with no additions, that gets handled
// by not writing anything to elementData, and writing the cached value of bucketCounter to listPointerData
// SNAP: TODO: write a test for lopsided list shards (similar to for list and set types). Theres one in object joiner tests.

to.mapPointerAndSizeData.setElementValue( (long) ordinal * to.bitsPerFixedLengthMapPortion, to.bitsPerMapPointer, bucketCounter);
long mapSize = source.mapPointerAndSizeData.getElementValue((long) (fromOrdinal * source.bitsPerFixedLengthMapPortion) + source.bitsPerMapPointer, source.bitsPerMapSizeValue);
to.mapPointerAndSizeData.setElementValue((long) (ordinal * to.bitsPerFixedLengthMapPortion) + to.bitsPerMapPointer, to.bitsPerMapSizeValue, mapSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.netflix.hollow.core.read.engine.map;


import static com.netflix.hollow.core.read.engine.map.HollowMapTypeReadStateShard.getAbsoluteBucketStart;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsSplitter;

/**
Expand All @@ -24,11 +28,75 @@ public void init() {

@Override
public void populateStats() {
long[] shardTotalOfMapBuckets = new long[numSplits];
long maxShardTotalOfMapBuckets = 0;

// count buckets per split
for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;
to[toIndex].maxOrdinal = toOrdinal;

long startBucket = getAbsoluteBucketStart(from, ordinal);
long endBucket = from.mapPointerAndSizeData.getElementValue((long)ordinal * from.bitsPerFixedLengthMapPortion, from.bitsPerMapPointer);
long numBuckets = endBucket - startBucket;

shardTotalOfMapBuckets[toIndex] += numBuckets;
if(shardTotalOfMapBuckets[toIndex] > maxShardTotalOfMapBuckets) {
maxShardTotalOfMapBuckets = shardTotalOfMapBuckets[toIndex];
}
}

for(int toIndex=0;toIndex<numSplits;toIndex++) {
HollowMapTypeDataElements target = to[toIndex];
// retained because these are computed based on max across all shards, splitting has no effect
target.bitsPerKeyElement = from.bitsPerKeyElement;
target.bitsPerValueElement = from.bitsPerValueElement;
target.bitsPerMapSizeValue = from.bitsPerMapSizeValue;
target.emptyBucketKeyValue = from.emptyBucketKeyValue; // SNAP: TODO: test with some empty buckets too

// recomputed based on split shards
target.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfMapBuckets);
target.totalNumberOfBuckets = shardTotalOfMapBuckets[toIndex];
target.bitsPerFixedLengthMapPortion = target.bitsPerMapSizeValue + target.bitsPerMapPointer;
target.bitsPerMapEntry = target.bitsPerKeyElement + target.bitsPerValueElement;

target.mapPointerAndSizeData = FixedLengthDataFactory.get((long)target.bitsPerFixedLengthMapPortion * (target.maxOrdinal + 1), target.memoryMode, target.memoryRecycler);
target.entryData = FixedLengthDataFactory.get((long)target.bitsPerMapEntry * shardTotalOfMapBuckets[toIndex], target.memoryMode, target.memoryRecycler);
}
}

@Override
public void copyRecords() {
int numSplits = to.length;
long bucketCounter[] = new long[numSplits];

// count buckets per split
for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;

long startBucket = getAbsoluteBucketStart(from, ordinal);
long endBucket =from.mapPointerAndSizeData.getElementValue((long) ordinal * from.bitsPerFixedLengthMapPortion, from.bitsPerMapPointer);

HollowMapTypeDataElements target = to[toIndex];
for (long bucket=startBucket;bucket<endBucket;bucket++) {
long bucketKey = from.entryData.getElementValue(bucket * from.bitsPerMapEntry, from.bitsPerKeyElement);
long bucketValue = from.entryData.getElementValue(bucket * from.bitsPerMapEntry + from.bitsPerKeyElement, from.bitsPerValueElement);
// SNAP: TODO: noop for map type?
if(bucketKey == from.emptyBucketKeyValue)
bucketKey = target.emptyBucketKeyValue;
// SNAP: TODO: bulk copy cos empty bucket value is the same and key/value bits are the same as from
long targetBucketOffset = (bucketCounter[toIndex] * target.bitsPerMapEntry);
target.entryData.setElementValue(targetBucketOffset, target.bitsPerKeyElement, bucketKey);
target.entryData.setElementValue(targetBucketOffset + target.bitsPerKeyElement, target.bitsPerValueElement, bucketValue);

bucketCounter[toIndex]++;
}

target.mapPointerAndSizeData.setElementValue((long) toOrdinal * target.bitsPerFixedLengthMapPortion, target.bitsPerMapPointer, bucketCounter[toIndex]);
long mapSize = from.mapPointerAndSizeData.getElementValue((long) (ordinal * from.bitsPerFixedLengthMapPortion) + from.bitsPerMapPointer, from.bitsPerMapSizeValue);
target.mapPointerAndSizeData.setElementValue((long) (toOrdinal * target.bitsPerFixedLengthMapPortion) + target.bitsPerMapPointer, target.bitsPerMapSizeValue, mapSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package com.netflix.hollow.core.read.engine.map;

import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;
import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE;

import com.netflix.hollow.api.sampling.DisabledSamplingDirector;
import com.netflix.hollow.api.sampling.HollowMapSampler;
import com.netflix.hollow.api.sampling.HollowSampler;
Expand Down Expand Up @@ -45,9 +48,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;
import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE;

/**
* A {@link HollowTypeReadState} for MAP type records.
*/
Expand All @@ -58,7 +58,7 @@ public class HollowMapTypeReadState extends HollowTypeReadState implements Hollo

private final int shardNumberMask;
private final int shardOrdinalShift;
private final HollowMapTypeReadStateShard shards[];
final HollowMapTypeReadStateShard shards[]; // SNAP: TODO: elevated from private access for testing

private HollowPrimaryKeyValueDeriver keyDeriver;

Expand All @@ -85,6 +85,19 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo

}

// SNAP: TODO: for testing
public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema, int numShards, HollowMapTypeReadStateShard[] shards) {
super(stateEngine, memoryMode, schema);
this.sampler = new HollowMapSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);
this.shardNumberMask = numShards - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);

if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");

this.shards = shards;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public long relativeBucket(int ordinal, int bucketIndex) {
return bucketValue;
}

private long getAbsoluteBucketStart(HollowMapTypeDataElements currentData, int ordinal) {
public static long getAbsoluteBucketStart(HollowMapTypeDataElements currentData, int ordinal) {
long startBucket = ordinal == 0 ? 0 : currentData.mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
return startBucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.memory.VariableLengthDataFactory;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsSplitter;
import com.netflix.hollow.core.schema.HollowObjectSchema;

Expand Down
Loading

0 comments on commit c4455d1

Please sign in to comment.