Skip to content

Commit

Permalink
use iteration in set ops, wrap compressed sketch and unpack in iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexanderSaydakov committed Jan 26, 2025
1 parent 0424666 commit adff890
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 104 deletions.
34 changes: 27 additions & 7 deletions src/main/java/org/apache/datasketches/theta/AnotBimpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.apache.datasketches.theta;

import static org.apache.datasketches.common.Util.exactLog2OfLong;
import static org.apache.datasketches.thetacommon.HashOperations.convertToHashTable;
import static org.apache.datasketches.thetacommon.HashOperations.checkThetaCorruption;
import static org.apache.datasketches.thetacommon.HashOperations.continueCondition;
import static org.apache.datasketches.thetacommon.HashOperations.hashSearch;
import static org.apache.datasketches.thetacommon.HashOperations.hashSearchOrInsert;
import static org.apache.datasketches.thetacommon.HashOperations.minLgHashTableSize;

import java.util.Arrays;

Expand Down Expand Up @@ -124,7 +127,7 @@ public CompactSketch aNotB(final Sketch skA, final Sketch skB, final boolean dst

if (skB.isEmpty()) {
return skA.compact(dstOrdered, dstMem);
}
}
ThetaUtil.checkSeedHashes(skB.getSeedHash(), seedHash_);
//Both skA & skB are not empty

Expand Down Expand Up @@ -162,14 +165,12 @@ private static long[] getResultHashArr( //returns a new array
final long[] hashArrA,
final Sketch skB) {

//Rebuild/get hashtable of skB
// Rebuild or get hashtable of skB
final long[] hashTableB; //read only
final long[] thetaCache = skB.getCache();
final int countB = skB.getRetainedEntries(true);
if (skB instanceof CompactSketch) {
hashTableB = convertToHashTable(thetaCache, countB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD);
hashTableB = convertToHashTable(skB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD);
} else {
hashTableB = thetaCache;
hashTableB = skB.getCache();
}

//build temporary result arrays of skA
Expand All @@ -191,6 +192,25 @@ private static long[] getResultHashArr( //returns a new array
return Arrays.copyOfRange(tmpHashArrA, 0, nonMatches);
}

private static long[] convertToHashTable(
final Sketch sketch,
final long thetaLong,
final double rebuildThreshold) {
final int lgArrLongs = minLgHashTableSize(sketch.getRetainedEntries(true), rebuildThreshold);
final int arrLongs = 1 << lgArrLongs;
final long[] hashTable = new long[arrLongs];
checkThetaCorruption(thetaLong);
HashIterator it = sketch.iterator();
while (it.next()) {
final long hash = it.get();
if (continueCondition(thetaLong, hash) ) {
continue;
}
hashSearchOrInsert(hashTable, lgArrLongs, hash);
}
return hashTable;
}

private void reset() {
thetaLong_ = Long.MAX_VALUE;
empty_ = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static CompactSketch memoryToCompact(
final long hash = srcMem.getLong(srcPreLongs << 3);
final SingleItemSketch sis = new SingleItemSketch(hash, srcSeedHash);
if (dstMem != null) {
dstMem.putByteArray(0, sis.toByteArray(),0, 16);
dstMem.putByteArray(0, sis.toByteArray(), 0, 16);
return new DirectCompactSketch(dstMem);
} else { //heap
return sis;
Expand Down
18 changes: 7 additions & 11 deletions src/main/java/org/apache/datasketches/theta/CompactSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4;
import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4;
import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4;
import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits;
import static org.apache.datasketches.theta.SingleItemSketch.otherCheckForSingleItem;

import org.apache.datasketches.common.Family;
Expand Down Expand Up @@ -189,7 +190,8 @@ private static CompactSketch wrap(final Memory srcMem, final long seed, final bo
if (serVer == 4) {
// not wrapping the compressed format since currently we cannot take advantage of
// decompression during iteration because set operations reach into memory directly
return heapifyV4(srcMem, seed, enforceSeed);
return DirectCompactCompressedSketch.wrapInstance(srcMem,
enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
}
else if (serVer == 3) {
if (PreambleUtil.isEmptyFlag(srcMem)) {
Expand Down Expand Up @@ -274,10 +276,6 @@ private int computeMinLeadingZeros() {
return Long.numberOfLeadingZeros(ored);
}

private static int wholeBytesToHoldBits(final int bits) {
return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0);
}

private byte[] toByteArrayV4() {
final int preambleLongs = isEstimationMode() ? 2 : 1;
final int entryBits = 64 - computeMinLeadingZeros();
Expand All @@ -286,8 +284,8 @@ private byte[] toByteArrayV4() {
// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
final int numEntriesBytes = wholeBytesToHoldBits(32 - Integer.numberOfLeadingZeros(getRetainedEntries()));

final int size = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits);
final byte[] bytes = new byte[size];
final int sizeBytes = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits);
final byte[] bytes = new byte[sizeBytes];
final WritableMemory mem = WritableMemory.writableWrap(bytes);
int offsetBytes = 0;
mem.putByte(offsetBytes++, (byte) preambleLongs);
Expand Down Expand Up @@ -334,12 +332,10 @@ private byte[] toByteArrayV4() {

private static CompactSketch heapifyV4(final Memory srcMem, final long seed, final boolean enforceSeed) {
final int preLongs = extractPreLongs(srcMem);
final int flags = extractFlags(srcMem);
final int entryBits = extractEntryBitsV4(srcMem);
final int numEntriesBytes = extractNumEntriesBytesV4(srcMem);
final short seedHash = (short) extractSeedHash(srcMem);
final boolean isEmpty = (flags & EMPTY_FLAG_MASK) > 0;
if (enforceSeed && !isEmpty) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
if (enforceSeed) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
int offsetBytes = 8;
long theta = Long.MAX_VALUE;
if (preLongs > 1) {
Expand Down Expand Up @@ -374,7 +370,7 @@ private static CompactSketch heapifyV4(final Memory srcMem, final long seed, fin
entries[i] += previous;
previous = entries[i];
}
return new HeapCompactSketch(entries, isEmpty, seedHash, numEntries, theta, true);
return new HeapCompactSketch(entries, false, seedHash, numEntries, theta, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ public int getCurrentBytes() {

@Override
public double getEstimate() {
if (otherCheckForSingleItem(mem_)) { return 1; }
final int preLongs = extractPreLongs(mem_);
final int curCount = (preLongs == 1) ? 0 : extractCurCount(mem_);
final long thetaLong = (preLongs > 2) ? extractThetaLong(mem_) : Long.MAX_VALUE;
return Sketch.estimate(thetaLong, curCount);
return Sketch.estimate(getThetaLong(), getRetainedEntries());
}

@Override
Expand Down Expand Up @@ -142,10 +138,8 @@ public HashIterator iterator() {

@Override
public byte[] toByteArray() {
final int curCount = getRetainedEntries(true);
checkIllegalCurCountAndEmpty(isEmpty(), curCount);
final int preLongs = extractPreLongs(mem_);
final int outBytes = (curCount + preLongs) << 3;
checkIllegalCurCountAndEmpty(isEmpty(), getRetainedEntries());
final int outBytes = getCurrentBytes();
final byte[] byteArrOut = new byte[outBytes];
mem_.getByteArray(0, byteArrOut, 0, outBytes);
return byteArrOut;
Expand Down
59 changes: 36 additions & 23 deletions src/main/java/org/apache/datasketches/theta/IntersectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ else if (curCount_ < 0 && sketchInEntries > 0) {
else { //On the heap, allocate a HT
hashTable_ = new long[1 << lgArrLongs_];
}
moveDataToTgt(sketchIn.getCache(), curCount_);
moveDataToTgt(sketchIn);
} //end of state 5

//state 7
Expand Down Expand Up @@ -434,8 +434,6 @@ long getThetaLong() {
private void performIntersect(final Sketch sketchIn) {
// curCount and input data are nonzero, match against HT
assert curCount_ > 0 && !empty_;
final long[] cacheIn = sketchIn.getCache();
final int arrLongsIn = cacheIn.length;
final long[] hashTable;
if (wmem_ != null) {
final int htLen = 1 << lgArrLongs_;
Expand All @@ -448,27 +446,16 @@ private void performIntersect(final Sketch sketchIn) {
final long[] matchSet = new long[ min(curCount_, sketchIn.getRetainedEntries(true)) ];

int matchSetCount = 0;
if (sketchIn.isOrdered()) {
//ordered compact, which enables early stop
for (int i = 0; i < arrLongsIn; i++ ) {
final long hashIn = cacheIn[i];
//if (hashIn <= 0L) continue; //<= 0 should not happen
if (hashIn >= thetaLong_) {
break; //early stop assumes that hashes in input sketch are ordered!
}
HashIterator it = sketchIn.iterator();
while (it.next()) {
final long hashIn = it.get();
if (hashIn < thetaLong_) {
final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn);
if (foundIdx == -1) { continue; }
matchSet[matchSetCount++] = hashIn;
}
}
else {
//either unordered compact or hash table
for (int i = 0; i < arrLongsIn; i++ ) {
final long hashIn = cacheIn[i];
if (hashIn <= 0L || hashIn >= thetaLong_) { continue; }
final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn);
if (foundIdx == -1) { continue; }
matchSet[matchSetCount++] = hashIn;
if (foundIdx != -1) {
matchSet[matchSetCount++] = hashIn;
}
} else {
if (sketchIn.isOrdered()) { break; } // early stop
}
}
//reduce effective array size to minimum
Expand Down Expand Up @@ -515,6 +502,32 @@ private void moveDataToTgt(final long[] arr, final int count) {
assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count;
}

private void moveDataToTgt(final Sketch sketch) {
int count = sketch.getRetainedEntries();
int tmpCnt = 0;
if (wmem_ != null) { //Off Heap puts directly into mem
final int preBytes = CONST_PREAMBLE_LONGS << 3;
final int lgArrLongs = lgArrLongs_;
final long thetaLong = thetaLong_;
HashIterator it = sketch.iterator();
while (it.next()) {
final long hash = it.get();
if (continueCondition(thetaLong, hash)) { continue; }
hashInsertOnlyMemory(wmem_, lgArrLongs, hash, preBytes);
tmpCnt++;
}
} else { //On Heap. Assumes HT exists and is large enough
HashIterator it = sketch.iterator();
while (it.next()) {
final long hash = it.get();
if (continueCondition(thetaLong_, hash)) { continue; }
hashInsertOnly(hashTable_, lgArrLongs_, hash);
tmpCnt++;
}
}
assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count;
}

private void hardReset() {
resetCommon();
if (wmem_ != null) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/apache/datasketches/theta/PreambleUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,7 @@ private static void throwNotBigEnough(final long cap, final int required) {
+ ", Required: " + required);
}

static int wholeBytesToHoldBits(final int bits) {
return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0);
}
}
13 changes: 5 additions & 8 deletions src/main/java/org/apache/datasketches/theta/Sketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,8 @@ public String toString(final boolean sketchSummary, final boolean dataDetail, fi
final boolean hexMode) {
final StringBuilder sb = new StringBuilder();

final long[] cache = getCache();
int nomLongs = 0;
int arrLongs = cache.length;
int arrLongs = 0;
float p = 0;
int rf = 0;
final boolean updateSketch = this instanceof UpdateSketch;
Expand All @@ -473,12 +472,10 @@ public String toString(final boolean sketchSummary, final boolean dataDetail, fi
final int w = width > 0 ? width : 8; // default is 8 wide
if (curCount > 0) {
sb.append("### SKETCH DATA DETAIL");
for (int i = 0, j = 0; i < arrLongs; i++ ) {
final long h;
h = cache[i];
if (h <= 0 || h >= thetaLong) {
continue;
}
HashIterator it = iterator();
int j = 0;
while (it.next()) {
final long h = it.get();
if (j % w == 0) {
sb.append(LS).append(String.format(" %6d", j + 1));
}
Expand Down
55 changes: 10 additions & 45 deletions src/main/java/org/apache/datasketches/theta/UnionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,46 +320,17 @@ public void union(final Sketch sketchIn) {
}
//sketchIn is valid and not empty
ThetaUtil.checkSeedHashes(expectedSeedHash_, sketchIn.getSeedHash());
if (sketchIn instanceof SingleItemSketch) {
gadget_.hashUpdate(sketchIn.getCache()[0]);
return;
}
Sketch.checkSketchAndMemoryFlags(sketchIn);

unionThetaLong_ = min(min(unionThetaLong_, sketchIn.getThetaLong()), gadget_.getThetaLong()); //Theta rule
unionEmpty_ = false;
final int curCountIn = sketchIn.getRetainedEntries(true);
if (curCountIn > 0) {
if (sketchIn.isOrdered() && (sketchIn instanceof CompactSketch)) { //Use early stop
//Ordered, thus compact
if (sketchIn.hasMemory()) {
final Memory skMem = sketchIn.getMemory();
final int preambleLongs = skMem.getByte(PREAMBLE_LONGS_BYTE) & 0X3F;
for (int i = 0; i < curCountIn; i++ ) {
final int offsetBytes = preambleLongs + i << 3;
final long hashIn = skMem.getLong(offsetBytes);
if (hashIn >= unionThetaLong_) { break; } // "early stop"
gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed
}
}
else { //sketchIn is on the Java Heap or has array
final long[] cacheIn = sketchIn.getCache(); //not a copy!
for (int i = 0; i < curCountIn; i++ ) {
final long hashIn = cacheIn[i];
if (hashIn >= unionThetaLong_) { break; } // "early stop"
gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed
}
}
} //End ordered, compact
else { //either not-ordered compact or Hash Table form. A HT may have dirty values.
final long[] cacheIn = sketchIn.getCache(); //if off-heap this will be a copy
final int arrLongs = cacheIn.length;
for (int i = 0, c = 0; i < arrLongs && c < curCountIn; i++ ) {
final long hashIn = cacheIn[i];
if (hashIn <= 0L || hashIn >= unionThetaLong_) { continue; } //rejects dirty values
gadget_.hashUpdate(hashIn); //backdoor update, hash function is bypassed
c++; //ensures against invalid state inside the incoming sketch
}
HashIterator it = sketchIn.iterator();
while (it.next()) {
final long hash = it.get();
if (hash < unionThetaLong_ && hash < gadget_.getThetaLong()) {
gadget_.hashUpdate(hash); // backdoor update, hash function is bypassed
} else {
if (sketchIn.isOrdered()) { break; }
}
}
unionThetaLong_ = min(unionThetaLong_, gadget_.getThetaLong()); //Theta rule with gadget
Expand All @@ -379,11 +350,8 @@ public void union(final Memory skMem) {
final int fam = extractFamilyID(skMem);

if (serVer == 4) { // compressed ordered compact
// performance can be improved by decompression while performing the union
// potentially only partial decompression might be needed
ThetaUtil.checkSeedHashes(expectedSeedHash_, (short) extractSeedHash(skMem));
final CompactSketch csk = CompactSketch.wrap(skMem);
union(csk);
union(CompactSketch.wrap(skMem));
return;
}
if (serVer == 3) { //The OpenSource sketches (Aug 4, 2015) starts with serVer = 3
Expand All @@ -396,16 +364,13 @@ public void union(final Memory skMem) {
}
if (serVer == 2) { //older Sketch, which is compact and ordered
ThetaUtil.checkSeedHashes(expectedSeedHash_, (short)extractSeedHash(skMem));
final CompactSketch csk = ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_);
union(csk);
union(ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_));
return;
}
if (serVer == 1) { //much older Sketch, which is compact and ordered, no seedHash
final CompactSketch csk = ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_);
union(csk);
union(ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_));
return;
}

throw new SketchesArgumentException("SerVer is unknown: " + serVer);
}

Expand Down

0 comments on commit adff890

Please sign in to comment.