Skip to content

Commit

Permalink
map joiner fastpath
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Aug 28, 2024
1 parent c4455d1 commit bf8b902
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public SegmentedLongArray(ArraySegmentRecycler memoryRecycler, long numLongs) {
public void set(long index, long value) {
int segmentIndex = (int)(index >> log2OfSegmentSize);
int longInSegment = (int)(index & bitmask);
unsafe.putLong(segments[segmentIndex], (long) Unsafe.ARRAY_LONG_BASE_OFFSET + (8 * longInSegment), value);
try {
unsafe.putLong(segments[segmentIndex], (long) Unsafe.ARRAY_LONG_BASE_OFFSET + (8 * longInSegment), value);
} catch (ArrayIndexOutOfBoundsException e) {
throw e;
}

/// duplicate the longs here so that we can read faster.
if(longInSegment == 0 && segmentIndex != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,38 +75,65 @@ public void copyRecords() {

HollowMapTypeDataElements source = from[fromIndex];

if (fromOrdinal <= from[fromIndex].maxOrdinal) {
long mapSize = 0;
if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shards resulting from skipping type shards with no additions, mapSize remains 0
long startBucket = getAbsoluteBucketStart(source, fromOrdinal);
long endBucket = source.mapPointerAndSizeData.getElementValue((long)fromOrdinal * source.bitsPerFixedLengthMapPortion, source.bitsPerMapPointer);

long numBuckets = endBucket - startBucket;
if (false) {
// if (to.bitsPerKeyElement == source.bitsPerKeyElement && to.bitsPerValueElement == source.bitsPerValueElement) { // SNAP: TODO: test with this TRUE AND FALSE, for both fast and slow paths

// if (to.bitsPerKeyElement == source.bitsPerKeyElement && to.bitsPerValueElement == source.bitsPerValueElement) {
// long bitsPerMapEntry = to.bitsPerMapEntry;
// FixedLengthElementArray a1 = new FixedLengthElementArray(to.memoryRecycler, bitsPerMapEntry * numBuckets);
// FixedLengthElementArray a2 = new FixedLengthElementArray(to.memoryRecycler, bitsPerMapEntry * numBuckets);
//
// a1.copyBits(source.entryData, startBucket * bitsPerMapEntry, 0, numBuckets * bitsPerMapEntry);
// long targetBucketOffset = 0;
// for (long bucket=startBucket;bucket<endBucket;bucket++) {
// long bucketKey = source.entryData.getElementValue(bucket * source.bitsPerMapEntry, source.bitsPerKeyElement);
// long bucketValue = source.entryData.getElementValue(bucket * source.bitsPerMapEntry + source.bitsPerKeyElement, source.bitsPerValueElement);
// if(bucketKey == source.emptyBucketKeyValue)
// bucketKey = to.emptyBucketKeyValue; // since empty bucket key value can be non-uniform across shards
// a2.setElementValue(targetBucketOffset, to.bitsPerKeyElement, bucketKey);
// a2.setElementValue(targetBucketOffset + to.bitsPerKeyElement, to.bitsPerValueElement, bucketValue);
// targetBucketOffset += to.bitsPerMapEntry;
// }
//
// for(int i=0; i<bitsPerMapEntry*numBuckets; i++) {
// if (a1.getElementValue(i, 0) != a2.getElementValue(i, 0)) {
// throw new IllegalStateException("Mismatch in map data");
// }
// }
//
// a1.destroy(to.memoryRecycler);
// a2.destroy(to.memoryRecycler);
// }

// if (false) { // SNAP: TODO: test the slow path
if (to.bitsPerKeyElement == source.bitsPerKeyElement && to.bitsPerValueElement == source.bitsPerValueElement) {
// emptyBucketKeyValue will also be uniform
long bitsPerMapEntry = to.bitsPerMapEntry;
long targetBucketOffset = bucketCounter * bitsPerMapEntry;
// can only do one map record at a time at most, unlike delta
// SNAP: TODO: bulk copy can also be applied to splitter
to.entryData.copyBits(source.entryData, startBucket * bitsPerMapEntry, targetBucketOffset, numBuckets * bitsPerMapEntry);
to.entryData.copyBits(source.entryData, startBucket * bitsPerMapEntry, bucketCounter * bitsPerMapEntry, numBuckets * bitsPerMapEntry);
bucketCounter += numBuckets;
} else {
for (long bucket = startBucket; bucket < endBucket; bucket++) {
long bucketKey = source.entryData.getElementValue(bucket * source.bitsPerMapEntry, source.bitsPerKeyElement);
long bucketValue = source.entryData.getElementValue(bucket * source.bitsPerMapEntry + source.bitsPerKeyElement, source.bitsPerValueElement);
if (bucketKey == source.emptyBucketKeyValue)
bucketKey = to.emptyBucketKeyValue; // since empty bucket key value can be non-uniform across shards
long targetBucketOffset = bucketCounter * to.bitsPerMapEntry;
to.entryData.setElementValue(targetBucketOffset, to.bitsPerKeyElement, bucketKey);
to.entryData.setElementValue(targetBucketOffset + to.bitsPerKeyElement, to.bitsPerValueElement, bucketValue);
bucketCounter ++;
}
}
// SNAP: TODO: this is slow copy, but we could also do fast copy for equal width joins
for (long bucket=startBucket;bucket<endBucket;bucket++) {
long targetBucketOffset = bucketCounter * to.bitsPerMapEntry;
long bucketKey = source.entryData.getElementValue(bucket * source.bitsPerMapEntry, source.bitsPerKeyElement);
long bucketValue = source.entryData.getElementValue(bucket * source.bitsPerMapEntry + source.bitsPerKeyElement, source.bitsPerValueElement);
if(bucketKey == source.emptyBucketKeyValue)
bucketKey = to.emptyBucketKeyValue; // since empty bucket key value can be non-uniform across shards
to.entryData.setElementValue(targetBucketOffset, to.bitsPerKeyElement, bucketKey);
to.entryData.setElementValue(targetBucketOffset + to.bitsPerKeyElement, to.bitsPerValueElement, bucketValue);
bucketCounter++;
}
} // else: lopsided shards could result for consumers that skip type shards with no additions, that gets handled
// by not writing anything to elementData, and writing the cached value of bucketCounter to listPointerData
// SNAP: TODO: write a test for lopsided list shards (similar to for list and set types). Theres one in object joiner tests.

mapSize = source.mapPointerAndSizeData.getElementValue((long) (fromOrdinal * source.bitsPerFixedLengthMapPortion) + source.bitsPerMapPointer, source.bitsPerMapSizeValue);
}

// SNAP: TODO: write a test for lopsided list shards (similar to for list and set types). Theres one in object joiner tests.
to.mapPointerAndSizeData.setElementValue( (long) ordinal * to.bitsPerFixedLengthMapPortion, to.bitsPerMapPointer, bucketCounter);
long mapSize = source.mapPointerAndSizeData.getElementValue((long) (fromOrdinal * source.bitsPerFixedLengthMapPortion) + source.bitsPerMapPointer, source.bitsPerMapSizeValue);
to.mapPointerAndSizeData.setElementValue((long) (ordinal * to.bitsPerFixedLengthMapPortion) + to.bitsPerMapPointer, to.bitsPerMapSizeValue, mapSize);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void copyRecords() {

HollowSetTypeDataElements source = from[fromIndex];

if (fromOrdinal <= from[fromIndex].maxOrdinal) {
long setSize = 0;
if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shards resulting from skipping type shards with no additions, setSize remains 0
long startBucket = getAbsoluteBucketStart(source, fromOrdinal);
long endBucket = source.setPointerAndSizeData.getElementValue((long) fromOrdinal * source.bitsPerFixedLengthSetPortion, source.bitsPerSetPointer);

Expand All @@ -81,12 +82,11 @@ public void copyRecords() {
to.elementData.setElementValue(bucketCounter * to.bitsPerElement, to.bitsPerElement, bucketOrdinal);
bucketCounter++;
}
} // else: lopsided shards could result for consumers that skip type shards with no additions, that gets handled
// by not writing anything to elementData, and writing the cached value of bucketCounter to listPointerData
// SNAP: TODO: write a test for lopsided list shards (similar to for list types). Theres one in object joiner tests.

setSize = source.setPointerAndSizeData.getElementValue((long) (fromOrdinal * source.bitsPerFixedLengthSetPortion) + source.bitsPerSetPointer, source.bitsPerSetSizeValue);
}
// SNAP: TODO: write a test for lopsided list shards (similar to for list types). Theres one in object joiner tests.
to.setPointerAndSizeData.setElementValue((long) ordinal * to.bitsPerFixedLengthSetPortion, to.bitsPerSetPointer, bucketCounter);
long setSize = source.setPointerAndSizeData.getElementValue((long) (fromOrdinal * source.bitsPerFixedLengthSetPortion) + source.bitsPerSetPointer, source.bitsPerSetSizeValue);
to.setPointerAndSizeData.setElementValue((long) (ordinal * to.bitsPerFixedLengthSetPortion) + to.bitsPerSetPointer, to.bitsPerSetSizeValue, setSize);
}
}
Expand Down

0 comments on commit bf8b902

Please sign in to comment.