diff --git a/README.mdown b/README.mdown index b75d8f8af..86b118b8b 100644 --- a/README.mdown +++ b/README.mdown @@ -108,6 +108,9 @@ pages 693–703, London, UK, 2002. Springer-Verlag. Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm. Proceedings of the EDBT 2013 Conference, ACM, Genoa, Italy +* Ahmed Helmi, Jerémie Lumbroso, Conrado Martínez and Alfredo Viola. Data Streams as Random Permutations: +the Distinct Element Problem. 22nd International Meeting on Probabilistic, Combinatorial, and +Asymptotic Methods in the Analysis of Algorithms (AofA'12), 2012, Montreal, Canada. #### Top-K diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java new file mode 100644 index 000000000..94fcfaf83 --- /dev/null +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/Recordinality.java @@ -0,0 +1,163 @@ +package com.clearspring.analytics.stream.cardinality; + + +/** + * Java implementation of Recordinality (R) algorithm from this paper: + *

+ * http://www.dmtcs.org/pdfpapers/dmAQ0124.pdf + *

+ * Recordinality counts the number of records (more + * generally, k-records) in the sequence + *

+ * It depends in the underlying permutation of the first + * occurrences of distinct values, very different from the other + * estimators + *

+ * The Recordinality estimator => + * Z = k * (1 + 1/k)^(rk - k + 1) -1 + *

+ * E_n[Z] = n (It's an unbiased estimator of n) + *

+ * The accuracy of Recordinality in terms of SE, asymptotically, satisfacted: + *

+ * SE_n[Z] = sqrt( (n/ke)^(1/k) - 1 ) + *

+ * You can find more information in these slides: + *

+ * https://www.cs.upc.edu/~conrado/research/talks/aofa2012.pdf + *

+ *

+ * Users have different motivations to use different types of hashing functions. + * Rather than try to keep up with all available hash functions and to remove + * the concern of causing future binary incompatibilities this class allows clients + * to offer the value in hashed int or long form. This way clients are free + * to change their hash function on their own time line. We recommend using Google's + * Guava Murmur3_128 implementation as it provides good performance and speed when + * high precision is required. In our tests the 32bit MurmurHash function included + * in this project is faster and produces better results than the 32 bit murmur3 + * implementation google provides. + *

+ */ + + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.PriorityQueue; +import java.util.HashSet; +import com.clearspring.analytics.hash.MurmurHash; + + + +public class Recordinality implements ICardinality, Serializable { + private final int sampleSize; + private final PriorityQueue sampleSet = new PriorityQueue<>(); + private final HashSet elements = new HashSet<>(); + private long rk; + + + /** + * Initializes a new Recordinality instance with a configurable 'k'-size. + */ + public Recordinality(int sampleSize) { + this.sampleSize = sampleSize; + this.rk = 0; + } + + /** + * Process the offered hash. + * You can find a pseudocode in the description links + */ + public boolean offerHashed(long hashedLong) { + // if the element is not in the hashmap... + if (!elements.contains(hashedLong)) { + //if we don't have k-values this is a k-max + if (sampleSize > sampleSet.size()) { + elements.add(hashedLong); + sampleSet.add(hashedLong); + rk+=1; + return true; + // if we have k values but this is a k-max insert it and remove the minimum + } else if (sampleSet.peek() < hashedLong) { + elements.remove(sampleSet.peek()); + elements.add(hashedLong); + sampleSet.poll(); + sampleSet.add(hashedLong); + rk+=1; + return true; + } + } + return false; + } + + + @Override + public boolean offerHashed(int hashedInt) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(Object o) { + long x = MurmurHash.hash64(o); + return offerHashed(x); + } + + /** + * Return a estimation of distinct values + * You can find a pseudocode in the description links + */ + @Override + public long cardinality() { + if (sampleSet.size() < sampleSize) return sampleSet.size(); + else { + long pow = rk - sampleSize + 1; + double estimate = (sampleSize * (Math.pow(1 + (1.0 / sampleSize), pow))) - 1; + return (long) estimate; + } + } + + + /** + * Return a estimated Standar Error from the estimated cardinality + */ + public double estimatedStandarError(){ + if (sampleSet.size() < sampleSize) return 0; + else { + long estimateCardinality = cardinality(); + double pow = 1.0/sampleSize; + return Math.sqrt( Math.pow( + estimateCardinality/(sampleSize*Math.E), pow) + - 1); + } + } + + @Override + public int sizeof() { + return sampleSet.size() * 8; + } + + @Override + public byte[] getBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dos = new DataOutputStream(baos); + writeBytes(dos); + + return baos.toByteArray(); + } + + private void writeBytes(DataOutput serializedByteStream) throws IOException { + serializedByteStream.writeInt(sampleSize); + serializedByteStream.writeInt(elements.size() * 8); + + for (Long e : elements) { + serializedByteStream.writeLong(e); + } + } + + @Override + public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException { + throw new UnsupportedOperationException(); + } +} diff --git a/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java new file mode 100644 index 000000000..109ed52a4 --- /dev/null +++ b/src/test/java/com/clearspring/analytics/stream/cardinality/TestRecordinality.java @@ -0,0 +1,91 @@ +package com.clearspring.analytics.stream.cardinality; + + +/** + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.IOException; +import com.clearspring.analytics.TestUtils; +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRecordinality { + + @Test + public void testComputeCount() { + Recordinality recordinality = new Recordinality(16); + recordinality.offer(0); + recordinality.offer(1); + recordinality.offer(2); + recordinality.offer(3); + recordinality.offer(16); + recordinality.offer(17); + recordinality.offer(18); + recordinality.offer(19); + recordinality.offer(19); + assertEquals(8, recordinality.cardinality()); + } + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + Recordinality r = new Recordinality(8); + r.offer("a"); + r.offer("b"); + r.offer("c"); + r.offer("d"); + r.offer("e"); + + Recordinality r2 = (Recordinality) TestUtils.deserialize(TestUtils.serialize(r)); + assertEquals(r.cardinality(), r2.cardinality()); + } + + /** + * should not fail with probability 1/100 + */ + @Test + @Ignore + public void testHighCardinality() { + int counter = 0; + for (int j = 0; j < 3; ++j) { + long start = System.currentTimeMillis(); + Recordinality recordinality = new Recordinality(10); + int size = 10000000; + for (int i = 0; i < size; i++) { + recordinality.offer(TestICardinality.streamElement(i)); + } + System.out.println("time: " + (System.currentTimeMillis() - start)); + /** + * the algorithm RECORDINALITY is expected to provide estimates + * within σ, 2σ, 3σ of the exact count in respectively at least + * 68%, 95% and 99% of all cases. + */ + long estimate = recordinality.cardinality(); + double estimatedError = recordinality.estimatedStandarError(); + long permittedError = (long) (3*size*estimatedError); + long err = Math.abs(estimate - size); + + if (err > permittedError) ++counter; + + } + System.out.println("If counter (> 1) rerun the test. \nIf you have already done it, something is broken"); + System.out.println("Counter: " + counter); + assertTrue(counter < 2); + } +}