Skip to content

Commit

Permalink
Set split/join copyBuckets refactor and other cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Aug 30, 2024
1 parent e39cd47 commit 6724cf5
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ private void populateStats() {
historicalDataElements.maxOrdinal = removedEntryCount - 1;
historicalDataElements.totalNumberOfElements = totalElementCount;
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement; // SNAP: TODO: pick max across all shards

for (int i=0;i<stateEngineDataElements.length;i++) {
if (stateEngineDataElements[i].bitsPerElement > historicalDataElements.bitsPerElement) {
historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement;
}
}
ordinalMapping = new IntMap(removedEntryCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ public void copyRecords() {
elementCounter++;
}
}
} // else: lopsided shards could result for consumers that skip type shards with no additions, that gets handled
// by not writing anything to elementData, and writing the cached value of elementCounter to listPointerData
// SNAP: TODO: write a test for lopsided list shards. Theres one in object joiner tests.
} // else: lopsided shard for e.g. when consumers skip type shards with no additions.
// nothing is written to elementData and the cached value of elementCounter is written to listPointerData.

to.listPointerData.setElementValue((long) to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,19 @@ public void copyRecords() {
HollowListTypeDataElements target = to[toIndex];

if (from.bitsPerElement == target.bitsPerElement) {
// fastpath can bulk copy elements. emptyBucketValue is same since bitsPerElement is same
// fast path can bulk copy elements. emptyBucketValue is same since bitsPerElement is same
long numElements = endElement - startElement;
int bitsPerElement = from.bitsPerElement;
target.elementData.copyBits(from.elementData, startElement * bitsPerElement, elementCounter[toIndex] * bitsPerElement, numElements * bitsPerElement);
elementCounter[toIndex] += numElements;
} else {
// slow path(but more compact) not exercised until populateSats above supports split shard specific bitsPerElement
// (which would make sense to add once HollowListTypeWriteState's gatherStatistics supports assigning bitsPerElement at a shard level)
for (long element=startElement;element<endElement;element++) {
int elementOrdinal = (int)from.elementData.getElementValue(element * from.bitsPerElement, from.bitsPerElement);
target.elementData.setElementValue(elementCounter[toIndex] * target.bitsPerElement, target.bitsPerElement, elementOrdinal);
elementCounter[toIndex]++;
}
throw new RuntimeException("Unexpected for List type during split, expected during join"); // SNAP: TODO: remove, or keep for now so that we can test the slow path and in future when we make size optimizations then consumers will be backwards compatible
}

target.listPointerData.setElementValue((long) target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class HollowListTypeReadState extends HollowCollectionTypeReadState imple

private final HollowListSampler sampler;

final int shardNumberMask; // SNAP: TODO: elevated from private access for testing
final int shardNumberMask;
private final int shardOrdinalShift;
final HollowListTypeReadStateShard shards[]; // SNAP: TODO: elevated from private access for testing
final HollowListTypeReadStateShard shards[];

private int maxOrdinal;

Expand All @@ -73,9 +73,8 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem
this.shards = shards;
}

// SNAP: TODO: for testing
public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema, int numShards, HollowListTypeReadStateShard[] shards) {
super(stateEngine, memoryMode, schema);
public HollowListTypeReadState(MemoryMode memoryMode, HollowListSchema schema, int numShards, HollowListTypeReadStateShard[] shards) {
super(null, memoryMode, schema);
this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);
this.shardNumberMask = numShards - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,18 @@ private void populateStats() {
historicalDataElements.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(totalBucketCount);
historicalDataElements.bitsPerMapSizeValue = 64 - Long.numberOfLeadingZeros(maxSize);
historicalDataElements.bitsPerFixedLengthMapPortion = historicalDataElements.bitsPerMapPointer + historicalDataElements.bitsPerMapSizeValue;
historicalDataElements.bitsPerKeyElement = stateEngineDataElements[0].bitsPerKeyElement; // SNAP: TODO: pick max across all shards
historicalDataElements.bitsPerValueElement = stateEngineDataElements[0].bitsPerValueElement;
historicalDataElements.bitsPerMapEntry = stateEngineDataElements[0].bitsPerMapEntry;
historicalDataElements.emptyBucketKeyValue = stateEngineDataElements[0].emptyBucketKeyValue;
for (int i=0;i<stateEngineDataElements.length;i++) {
if (stateEngineDataElements[i].bitsPerKeyElement > historicalDataElements.bitsPerKeyElement) {
historicalDataElements.bitsPerKeyElement = stateEngineDataElements[i].bitsPerKeyElement;
historicalDataElements.emptyBucketKeyValue = stateEngineDataElements[i].emptyBucketKeyValue;
}
if (stateEngineDataElements[i].bitsPerValueElement > historicalDataElements.bitsPerValueElement) {
historicalDataElements.bitsPerValueElement = stateEngineDataElements[i].bitsPerValueElement;
}
if (stateEngineDataElements[i].bitsPerMapEntry > historicalDataElements.bitsPerMapEntry) {
historicalDataElements.bitsPerMapEntry = stateEngineDataElements[i].bitsPerMapEntry;
}
}
historicalDataElements.totalNumberOfBuckets = totalBucketCount;

ordinalMapping = new IntMap(removedEntryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,13 @@ public void copyRecords() {
long endBucket = source.getEndBucket(fromOrdinal);
long numBuckets = endBucket - startBucket;

// if (false) { // SNAP: TODO: test the slow path
if (to.bitsPerKeyElement == source.bitsPerKeyElement && to.bitsPerValueElement == source.bitsPerValueElement) {
// emptyBucketKeyValue will also be uniform
long bitsPerMapEntry = to.bitsPerMapEntry;
to.entryData.copyBits(source.entryData, startBucket * bitsPerMapEntry, bucketCounter * bitsPerMapEntry, numBuckets * bitsPerMapEntry);
bucketCounter += numBuckets;
} else {
for (long bucket = startBucket; bucket < endBucket; bucket++) {
for (long bucket=startBucket;bucket<endBucket;bucket++) {
long bucketKey = source.entryData.getElementValue(bucket * source.bitsPerMapEntry, source.bitsPerKeyElement);
long bucketValue = source.entryData.getElementValue(bucket * source.bitsPerMapEntry + source.bitsPerKeyElement, source.bitsPerValueElement);
if (bucketKey == source.emptyBucketKeyValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void populateStats() {
target.bitsPerKeyElement = from.bitsPerKeyElement;
target.bitsPerValueElement = from.bitsPerValueElement;
target.bitsPerMapSizeValue = from.bitsPerMapSizeValue;
target.emptyBucketKeyValue = from.emptyBucketKeyValue; // SNAP: TODO: test with some empty buckets too
target.emptyBucketKeyValue = from.emptyBucketKeyValue;

// recomputed based on split shards
target.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfMapBuckets);
Expand All @@ -78,25 +78,25 @@ public void copyRecords() {
long startBucket = from.getStartBucket(ordinal);
long endBucket = from.getEndBucket(ordinal);

// if (false) { // SNAP: TODO: test the slow path
if (target.bitsPerKeyElement == from.bitsPerKeyElement && target.bitsPerValueElement == from.bitsPerValueElement) {
// fast path can bulk copy buckets. emptyBucketKeyValue is same since bitsPerKeyElement is the same
long numBuckets = endBucket - startBucket;
// emptyBucketKeyValue will also be uniform
long bitsPerMapEntry = from.bitsPerMapEntry;
target.entryData.copyBits(from.entryData, startBucket * bitsPerMapEntry, bucketCounter[toIndex] * bitsPerMapEntry, numBuckets * bitsPerMapEntry);
bucketCounter[toIndex] += numBuckets;
} else {
throw new RuntimeException("Unexpected for Map type during split, expected during join"); // SNAP: TODO: remove
// for (long bucket=startBucket;bucket<endBucket;bucket++) {
// long bucketKey = from.entryData.getElementValue(bucket * from.bitsPerMapEntry, from.bitsPerKeyElement);
// long bucketValue = from.entryData.getElementValue(bucket * from.bitsPerMapEntry + from.bitsPerKeyElement, from.bitsPerValueElement);
// if(bucketKey == from.emptyBucketKeyValue)
// bucketKey = target.emptyBucketKeyValue;
// long targetBucketOffset = (bucketCounter[toIndex] * target.bitsPerMapEntry);
// target.entryData.setElementValue(targetBucketOffset, target.bitsPerKeyElement, bucketKey);
// target.entryData.setElementValue(targetBucketOffset + target.bitsPerKeyElement, target.bitsPerValueElement, bucketValue);
// bucketCounter[toIndex]++;
// }
// slow path(but more compact) not exercised until populateSats above supports split shard specific bitsPerKeyElement and bitsPerValueElement
// (which would make sense to add once HollowMapTypeWriteState's gatherStatistics supports assigning bitsPerKeyElement and bitsPerValueElement at a shard level)
for (long bucket=startBucket;bucket<endBucket;bucket++) {
long bucketKey = from.entryData.getElementValue(bucket * from.bitsPerMapEntry, from.bitsPerKeyElement);
long bucketValue = from.entryData.getElementValue(bucket * from.bitsPerMapEntry + from.bitsPerKeyElement, from.bitsPerValueElement);
if(bucketKey == from.emptyBucketKeyValue)
bucketKey = target.emptyBucketKeyValue;
long targetBucketOffset = (bucketCounter[toIndex] * target.bitsPerMapEntry);
target.entryData.setElementValue(targetBucketOffset, target.bitsPerKeyElement, bucketKey);
target.entryData.setElementValue(targetBucketOffset + target.bitsPerKeyElement, target.bitsPerValueElement, bucketValue);
bucketCounter[toIndex]++;
}
}

target.mapPointerAndSizeData.setElementValue((long) toOrdinal * target.bitsPerFixedLengthMapPortion, target.bitsPerMapPointer, bucketCounter[toIndex]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class HollowMapTypeReadState extends HollowTypeReadState implements Hollo

private final int shardNumberMask;
private final int shardOrdinalShift;
final HollowMapTypeReadStateShard shards[]; // SNAP: TODO: elevated from private access for testing
final HollowMapTypeReadStateShard shards[];

private HollowPrimaryKeyValueDeriver keyDeriver;

Expand All @@ -85,9 +85,8 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo

}

// SNAP: TODO: for testing
public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema, int numShards, HollowMapTypeReadStateShard[] shards) {
super(stateEngine, memoryMode, schema);
public HollowMapTypeReadState(MemoryMode memoryMode, HollowMapSchema schema, int numShards, HollowMapTypeReadStateShard[] shards) {
super(null, memoryMode, schema);
this.sampler = new HollowMapSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);
this.shardNumberMask = numShards - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ private void populateStats() {
historicalDataElements.bitsPerSetPointer = 64 - Long.numberOfLeadingZeros(totalBucketCount);
historicalDataElements.bitsPerSetSizeValue = 64 - Long.numberOfLeadingZeros(maxSize);
historicalDataElements.bitsPerFixedLengthSetPortion = historicalDataElements.bitsPerSetPointer + historicalDataElements.bitsPerSetSizeValue;
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement; // SNAP: TODO: pick max across all shards
historicalDataElements.emptyBucketValue = stateEngineDataElements[0].emptyBucketValue;
for (int i=0;i<stateEngineDataElements.length;i++) {
if (stateEngineDataElements[i].bitsPerElement > historicalDataElements.bitsPerElement) {
historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement;
historicalDataElements.emptyBucketValue = stateEngineDataElements[i].emptyBucketValue;
}
}
historicalDataElements.totalNumberOfBuckets = totalBucketCount;

ordinalMapping = new IntMap(removedEntryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,22 @@ public long getEndBucket(int ordinal) {
public int getBucketValue(long absoluteBucketIndex) {
return (int)elementData.getElementValue(absoluteBucketIndex * bitsPerElement, bitsPerElement);
}

public void copyBucketsFrom(long startBucket, HollowSetTypeDataElements src, long srcStartBucket, long srcEndBucket) {
if (bitsPerElement == src.bitsPerElement) {
// fast path can bulk copy buckets. emptyBucketValue is same since bitsPerElement is same
long numBuckets = srcEndBucket - srcStartBucket;
elementData.copyBits(src.elementData, srcStartBucket * bitsPerElement, startBucket * bitsPerElement, numBuckets * bitsPerElement);
} else {
// one bucket at a time
for (long bucket=srcStartBucket;bucket<srcEndBucket;bucket++) {
long bucketVal = src.elementData.getElementValue(bucket * src.bitsPerElement, src.bitsPerElement);
if(bucketVal == src.emptyBucketValue)
bucketVal = emptyBucketValue;
// SNAP: TODO: empty bucket should be encoded with all 1s, not copied as-is - in all collection spliitters/joiners
elementData.setElementValue(startBucket * bitsPerElement, bitsPerElement, bucketVal);
startBucket++;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,12 @@ public void copyRecords() {
long startBucket = source.getStartBucket(fromOrdinal);
long endBucket = source.getEndBucket(fromOrdinal);

if (to.bitsPerElement == source.bitsPerElement) {
// fastpath can bulk copy buckets. emptyBucketValue is same since bitsPerElement is same
long numBuckets = endBucket - startBucket;
int bitsPerElement = source.bitsPerElement;
to.elementData.copyBits(source.elementData, startBucket * bitsPerElement, bucketCounter * bitsPerElement, numBuckets * bitsPerElement);
bucketCounter += numBuckets;
} else {
// one bucket at a time
for (long bucket=startBucket;bucket<endBucket;bucket++) {
int bucketOrdinal = (int)source.elementData.getElementValue(bucket * source.bitsPerElement, source.bitsPerElement);
to.elementData.setElementValue(bucketCounter * to.bitsPerElement, to.bitsPerElement, bucketOrdinal);
bucketCounter++;
}
}
long numBuckets = endBucket - startBucket;
to.copyBucketsFrom(bucketCounter, source, startBucket, endBucket);
bucketCounter += numBuckets;

setSize = source.setPointerAndSizeData.getElementValue((long) (fromOrdinal * source.bitsPerFixedLengthSetPortion) + source.bitsPerSetPointer, source.bitsPerSetSizeValue);
}
// SNAP: TODO: write a test for lopsided list shards (similar to for list types). Theres one in object joiner tests.
to.setPointerAndSizeData.setElementValue((long) ordinal * to.bitsPerFixedLengthSetPortion, to.bitsPerSetPointer, bucketCounter);
to.setPointerAndSizeData.setElementValue((long) (ordinal * to.bitsPerFixedLengthSetPortion) + to.bitsPerSetPointer, to.bitsPerSetSizeValue, setSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,10 @@ public void copyRecords() {
long endBucket = from.getEndBucket(ordinal);
HollowSetTypeDataElements target = to[toIndex];

if (target.bitsPerElement == from.bitsPerElement && target.emptyBucketValue == from.emptyBucketValue) {
// fastpath
int bitsPerElement = from.bitsPerElement;
long numBuckets = endBucket - startBucket;
target.elementData.copyBits(from.elementData, startBucket * bitsPerElement, bucketCounter[toIndex] * bitsPerElement, numBuckets * bitsPerElement);
bucketCounter[toIndex] += numBuckets;
} else {
// for (long bucket=startBucket;bucket<endBucket;bucket++) {
// long bucketVal = from.elementData.getElementValue(bucket * from.bitsPerElement, from.bitsPerElement);
// if(bucketVal == from.emptyBucketValue)
// bucketVal = target.emptyBucketValue;
// target.elementData.setElementValue(bucketCounter[toIndex] * target.bitsPerElement, target.bitsPerElement, bucketVal);
// bucketCounter[toIndex]++;
// }
throw new RuntimeException("Unexpected for Set type during split, expected during join"); // SNAP: TODO: remove, or keep for now so that we can test the slow path and in future when we make size optimizations then consumers will be backwards compatible
long numBuckets = endBucket - startBucket;
target.copyBucketsFrom(bucketCounter[toIndex], from, startBucket, endBucket);
bucketCounter[toIndex] += numBuckets;

}
target.setPointerAndSizeData.setElementValue((long) toOrdinal * target.bitsPerFixedLengthSetPortion, target.bitsPerSetPointer, bucketCounter[toIndex]);
long setSize = from.setPointerAndSizeData.getElementValue((long) (ordinal * from.bitsPerFixedLengthSetPortion) + from.bitsPerSetPointer, from.bitsPerSetSizeValue);
target.setPointerAndSizeData.setElementValue((long) (toOrdinal * target.bitsPerFixedLengthSetPortion) + target.bitsPerSetPointer, target.bitsPerSetSizeValue, setSize);
Expand Down
Loading

0 comments on commit 6724cf5

Please sign in to comment.