Skip to content

Commit

Permalink
Merge pull request #582 from apache/quotient-filter
Browse files Browse the repository at this point in the history
implemented merge
  • Loading branch information
AlexanderSaydakov authored Jul 17, 2024
2 parents e1f3d97 + 9d3a4cd commit 36a675d
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
import java.util.Queue;
import java.util.Set;

import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.SketchesException;
import org.apache.datasketches.filters.common.BitArray;
import org.apache.datasketches.filters.common.HeapBitArray;

public class QuotientFilter extends Filter {

public static final float DEFAULT_LOAD_FACTOR = 0.8f;
public static final double DEFAULT_LOAD_FACTOR = 0.8;

int lgQ_;
int numFingerprintBits_;
float loadFactor_;
double loadFactor_;
int numEntries_;
int numExpansions_;
BitArray bitArray_;
Expand All @@ -50,7 +51,7 @@ public QuotientFilter(final int lgQ, final int numFingerprintBits) {
this(lgQ, numFingerprintBits, DEFAULT_LOAD_FACTOR);
}

public QuotientFilter(final int lgQ, final int numFingerprintBits, final float loadFactor) {
public QuotientFilter(final int lgQ, final int numFingerprintBits, final double loadFactor) {
lgQ_ = lgQ;
numFingerprintBits_ = numFingerprintBits;
loadFactor_ = loadFactor;
Expand Down Expand Up @@ -83,12 +84,6 @@ public int getFingerprintLength() {
return numFingerprintBits_;
}

// QuotientFilter(final int powerOfTwo, final int numBitsPerEntry, final BitArray bitArray) {
// powerOfTwoSize_ = powerOfTwo;
// numBitsPerEntry_ = numBitsPerEntry;
// bitArray_ = bitArray;
// }

void expand() {
if (getFingerprintLength() < 2) throw new SketchesException("for expansion value must have at least 2 bits");
final QuotientFilter other = new QuotientFilter(lgQ_ + 1, numFingerprintBits_ - 1, loadFactor_);
Expand Down Expand Up @@ -150,6 +145,14 @@ public double getUtilization() {
return numEntries_ / (double) getNumSlots();
}

public int getLgQ() {
return lgQ_;
}

public double getLoadFactor() {
return loadFactor_;
}

// returns the number of slots in the filter without the extension/buffer slots
public long getNumSlots() {
return 1L << lgQ_;
Expand Down Expand Up @@ -220,7 +223,7 @@ protected boolean compare(final long index, final long fingerprint) {

// modify the flags and fingerprint of a given slot
void modifySlot(final boolean isOccupied, final boolean isContinuation, final boolean isShifted,
final long index, final long fingerprint) {
final long index, final long fingerprint) {
modifySlot(isOccupied, isContinuation, isShifted, index);
setFingerprint(index, fingerprint);
}
Expand All @@ -229,13 +232,14 @@ void modifySlot(final boolean isOccupied, final boolean isContinuation, final bo
public void printFilterSummary() {
final long slots = getNumSlots();
final long numBits = slots * getNumBitsPerEntry();
System.out.println("slots: " + slots);
System.out.println("bits: " + numBits);
System.out.println("bits/entry: " + numBits / (double)numEntries_);
System.out.println("FP length: " + getFingerprintLength());
System.out.println("entries: " + numEntries_);
System.out.println("expansions: " + numExpansions_);
System.out.println("load: " + numEntries_ / (double)(slots));
System.out.println("lgQ: " + lgQ_);
System.out.println("FP length: " + getFingerprintLength());
System.out.println("load factor: " + getLoadFactor());
System.out.println("bits: " + numBits);
System.out.println("bits/entry: " + numBits / (double)numEntries_);
System.out.println("entries: " + numEntries_);
System.out.println("expansions: " + numExpansions_);
System.out.println("load: " + numEntries_ / (double)(slots));
computeStatistics();
//System.out.println("num runs: \t\t" + num_runs);
//System.out.println("avg run length: \t" + avg_run_length);
Expand Down Expand Up @@ -320,7 +324,7 @@ long findFirstFingerprintInRun(long index, final long fingerprint) {
if (fingerprintAtIndex == fingerprint) {
return index;
} else if (fingerprintAtIndex > fingerprint) {
return ~index;
return ~index;
}
index = (index + 1) & getSlotMask();
} while (isContinuation(index));
Expand Down Expand Up @@ -392,7 +396,7 @@ boolean insert(final long fingerprint, final long index) {
}

void insertFingerprintAndPushAllElse(long fingerprint, long index, final long canonical,
final boolean isNewRun, final boolean isRunStart) {
final boolean isNewRun, final boolean isRunStart) {
// in the first shifted entry set isContinuation flag if inserting at the start of the existing run
// otherwise just shift the existing flag as it is
boolean forceContinuation = !isNewRun && isRunStart;
Expand Down Expand Up @@ -632,4 +636,26 @@ public void computeStatistics() {
avgClusterLength_ = sumClusterLengths / numClusters_;
}

}
public void merge(final QuotientFilter other) {
if (lgQ_ + numFingerprintBits_ != other.lgQ_ + other.numFingerprintBits_) {
throw new SketchesArgumentException("incompatible sketches in merge");
}
long i = 0;
if (!other.isSlotEmpty(i)) { i = other.findClusterStart(i); }

final Queue<Long> fifo = new LinkedList<Long>();
long count = 0;
while (count < other.numEntries_) {
if (!other.isSlotEmpty(i)) {
if (other.isOccupied(i)) { fifo.add(i); }
final long quotient = fifo.element();
final long fingerprint = other.getFingerprint(i);
final long hash = quotient << other.getFingerprintLength() | fingerprint;
_insert(hash);
count++;
}
i = (i + 1) & other.getSlotMask();
if (!fifo.isEmpty() && ! other.isContinuation(i)) { fifo.remove(); }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
*/

package org.apache.datasketches.filters.quotientfilter;
//import java.util.concurrent.ThreadLocalRandom;

import static org.apache.datasketches.filters.quotientfilter.QuotientFilter.DEFAULT_LOAD_FACTOR;
import org.apache.datasketches.common.SketchesArgumentException;

/**
Expand Down Expand Up @@ -60,11 +59,11 @@ public static byte suggestFingerprintLength(double targetFalsePositiveProb) {
* @param maxDistinctItems The maximum number of distinct items that can be inserted into the filter.
* @return The log-base-2 of the number of slots in the filter.
*/
public static byte suggestLgNumSlots(long maxDistinctItems) {
public static byte suggestLgNumSlots(long maxDistinctItems, double loadFactor) {
if (maxDistinctItems <= 0) {
throw new SketchesArgumentException("maxDistinctItems must be strictly positive");
}
byte result = (byte) Math.ceil(Math.log(maxDistinctItems / 0.9) / Math.log(2));
byte result = (byte) Math.ceil(Math.log(maxDistinctItems / loadFactor) / Math.log(2));
if (result < 31) {
return result;
} else {
Expand All @@ -73,19 +72,27 @@ public static byte suggestLgNumSlots(long maxDistinctItems) {
}
}

public static byte suggestLgNumSlots(long maxDistinctItems) {
return suggestLgNumSlots(maxDistinctItems, DEFAULT_LOAD_FACTOR);
}

/*
Returns the largest number of unique items that can be inserted into the filter.
We use a predefined load factor of 0.9 compared to the number of slots as 2^j.
@param lgNumSlots The log-base-2 of the number of slots in the filter
@return The maximum number of items that can be inserted into the filter
*/
public static long suggestMaxNumItemsFromNumSlots(byte lgNumSlots) {
public static long suggestMaxNumItemsFromNumSlots(int lgNumSlots, double loadFactor) {
if (lgNumSlots <= 0) {
throw new SketchesArgumentException("lgNumSlots must be at least 1.");
} else if (lgNumSlots >= 31) {
throw new SketchesArgumentException("lgNumSlots cannot exceed 2^31 - 1.");
}
return (long) Math.floor(0.9 * Math.pow(2, lgNumSlots));
return (long) (loadFactor * (1L<<lgNumSlots));
}

public static long suggestMaxNumItemsFromNumSlots(byte lgNumSlots) {
return suggestMaxNumItemsFromNumSlots(lgNumSlots, DEFAULT_LOAD_FACTOR);
}


Expand All @@ -95,21 +102,29 @@ public static long suggestMaxNumItemsFromNumSlots(byte lgNumSlots) {
* The results are returned as a QFPair object.
*
* @param maxDistinctItems The maximum number of distinct items that can be inserted into the filter.
* @param loadFactor The load factor to use when calculating the number of slots.
* @param targetFalsePositiveProb The desired false positive probability per item.
* @return A QFPair object containing the suggested number of slots (lgNumSlots) and the suggested fingerprint length.
* @throws SketchesArgumentException if the input parameters are not valid.
*/
public static QFPair suggestParamsFromMaxDistinctsFPP(long maxDistinctItems, double targetFalsePositiveProb) {
validateAccuracyInputs(maxDistinctItems, targetFalsePositiveProb);
byte lgNumSlots = suggestLgNumSlots(maxDistinctItems);
public static QFPair suggestParamsFromMaxDistinctsFPP(long maxDistinctItems, double loadFactor, double targetFalsePositiveProb) {
validateAccuracyInputs(maxDistinctItems, loadFactor, targetFalsePositiveProb);
byte lgNumSlots = suggestLgNumSlots(maxDistinctItems, loadFactor);
byte fingerprintLength = suggestFingerprintLength(targetFalsePositiveProb);
return new QFPair(lgNumSlots, fingerprintLength);
}

private static void validateAccuracyInputs(final long maxDistinctItems, final double targetFalsePositiveProb) {
public static QFPair suggestParamsFromMaxDistinctsFPP(long maxDistinctItems, double targetFalsePositiveProb) {
return suggestParamsFromMaxDistinctsFPP(maxDistinctItems, DEFAULT_LOAD_FACTOR, targetFalsePositiveProb);
}

private static void validateAccuracyInputs(final long maxDistinctItems, final double loadFactor, final double targetFalsePositiveProb) {
if (maxDistinctItems <= 0) {
throw new SketchesArgumentException("maxDistinctItems must be strictly positive");
}
if (loadFactor <=0.0 || loadFactor >= 1.0) {
throw new SketchesArgumentException("loadFactor must be larger than 0 and less than 1");
}
if (targetFalsePositiveProb <= 0.0 || targetFalsePositiveProb > 1.0) {
throw new SketchesArgumentException("targetFalsePositiveProb must be a valid probability and strictly greater than 0");
}
Expand All @@ -130,4 +145,4 @@ public QFPair(byte lgNumSlots, byte fingerprintLength) {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public static void testSuggestLgNumSlots(){
QuotientFilterBuilder qfb = new QuotientFilterBuilder();

// invalid number of items
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(0,0.9));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(-1, 0.9));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(5000000000L, 0.9));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(0));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(-1));
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestLgNumSlots(5000000000L));
Expand All @@ -55,7 +58,9 @@ public static void testSuggestLgNumSlots(){

for (int i = 0; i < numItems.length; i++) {
long num = numItems[i];
byte result = qfb.suggestLgNumSlots(num);
byte result = qfb.suggestLgNumSlots(num, 0.9);
assertEquals(result, results[i]);
result = qfb.suggestLgNumSlots(num);
assertEquals(result, results[i]);
}
}
Expand All @@ -70,12 +75,15 @@ public static void testSuggestMaxNumItems(){
assertThrows(SketchesArgumentException.class, () -> QuotientFilterBuilder.suggestMaxNumItemsFromNumSlots((byte)32));


byte[] lgNumSlots = {1, 2, 3, 6, 10, 15, 25, 30,};
long[] results = {1, 3, 7, 57, 921, 29491, 30198988, 966367641} ;
int[] lgNumSlots = {1, 2, 3, 6, 10, 15, 25, 30,};
long[] results_ninety_pc = {1, 3, 7, 57, 921, 29491, 30198988, 966367641} ;
long[] results_eighty_pc = {1, 3, 6, 51, 819, 26214, 26843545, 858993459} ;

for (int i = 0; i < lgNumSlots.length; i++) {
long result = qfb.suggestMaxNumItemsFromNumSlots(lgNumSlots[i]);
assertEquals(result, results[i]);
long result_ninety = qfb.suggestMaxNumItemsFromNumSlots(lgNumSlots[i], 0.9);
long result_eighty = qfb.suggestMaxNumItemsFromNumSlots(lgNumSlots[i], 0.8);
assertEquals(result_ninety, results_ninety_pc[i]);
assertEquals(result_eighty, results_eighty_pc[i]);
}
}

Expand All @@ -96,18 +104,26 @@ public static void testSuggestParamsFromMaxDistinctsFPP(){
double[] fpp = {1E-10, 1E-2, 1e-7} ;

// expected outcomes
byte[] expected_lgNumSlots = {1, 10, 30} ;
byte[] expected_lgNumSlotsNinety = {1, 10, 30} ;
byte[] expected_lgNumSlotsEighty = {1, 11, 30} ;
byte[] expected_fingerprintLength = {34, 7, 24} ;

for (int i = 0; i < numItems.length; i++) {
QuotientFilterBuilder.QFPair pair = qfb.suggestParamsFromMaxDistinctsFPP(numItems[i], fpp[i]);
QuotientFilterBuilder.QFPair pair = qfb.suggestParamsFromMaxDistinctsFPP(numItems[i], 0.9, fpp[i]);
lgNumSlots = pair.lgNumSlots;
fingerprintLength = pair.fingerprintLength;
assertEquals(expected_lgNumSlotsNinety[i], lgNumSlots);
assertEquals(expected_fingerprintLength[i], fingerprintLength);

// 80% load
pair = qfb.suggestParamsFromMaxDistinctsFPP(numItems[i], fpp[i]);
lgNumSlots = pair.lgNumSlots;
fingerprintLength = pair.fingerprintLength;
assertEquals(expected_lgNumSlots[i], lgNumSlots);
assertEquals(expected_lgNumSlotsEighty[i], lgNumSlots);
assertEquals(expected_fingerprintLength[i], fingerprintLength);
}
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

package org.apache.datasketches.filters.quotientfilter;
import org.apache.datasketches.common.SketchesArgumentException;
import org.testng.annotations.Test;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -308,4 +309,57 @@ public void expansion() {
assertTrue(positives < 6);
}

@Test
public void mergeEmpty() {
final QuotientFilter qf1 = new QuotientFilter(4, 3);
final QuotientFilter qf2 = new QuotientFilter(4, 3);
qf1.merge(qf2);

assertEquals(qf1.getLgQ(), 4);
assertEquals(qf1.getFingerprintLength(), 3);
assertEquals(qf1.getNumEntries(), 0);
}

@Test
public void merge() {
final QuotientFilter qf1 = new QuotientFilter(16, 13);
final QuotientFilter qf2 = new QuotientFilter(16, 13);
final int n = 50000;
for (int i = 0; i < n / 2; i++) {
qf1.insert(i);
qf2.insert(i + n / 2);
}
qf1.merge(qf2);

assertEquals(qf1.getNumExpansions(), 0);
assertTrue(qf1.getNumEntries() > n * 0.99); // allow a few hash collisions

// query the same keys
int positives = 0;
for (int i = 0; i < n; i++) { if (qf1.search(i)) { positives++; } }
assertEquals(positives, n);

// query novel keys
positives = 0;
for (int i = 0; i < n; i++) { if (qf1.search(i + n)) { positives++; } }
assertTrue(positives < 4);
}

@Test
public void mergeDifferentConfiguration() {
final QuotientFilter qf1 = new QuotientFilter(3, 4);
final QuotientFilter qf2 = new QuotientFilter(4, 3);
qf1.insert(4);
qf2.insert(4);
qf1.merge(qf2);
assertEquals(qf1.getNumEntries(), 1);
}

@Test(expectedExceptions = SketchesArgumentException.class)
public void mergeIncompatible() {
final QuotientFilter qf1 = new QuotientFilter(4, 4);
final QuotientFilter qf2 = new QuotientFilter(4, 3);
qf1.merge(qf2);
}

}

0 comments on commit 36a675d

Please sign in to comment.