Skip to content
This repository has been archived by the owner on Aug 23, 2020. It is now read-only.

Feature: Store pruned transaction hashes in a cuckoo filter #1417

Open
wants to merge 13 commits into
base: dev
Choose a base branch
from
Prev Previous commit
Next Next commit
Added saving filters
kwek20 committed Apr 14, 2019

Unverified

This user has not yet uploaded their public signing key.
commit df1a2ddd59dc0bf8a5b995a2862963f482fb5c18
Original file line number Diff line number Diff line change
@@ -11,28 +11,26 @@
/**
* Persistable to manage the data we get as a result of pruning a milestone and its transactions
*/
public class CuckooBucket implements Persistable {
public class Cuckoo implements Persistable {

/**
* The filter number it belonged to in previous cycles
*/
public IntegerIndex bucketId;
public IntegerIndex filterId;

/**
*
*/
public BitSet bucketBits;
public IntegerIndex bucketIndex;
public BitSet filterBits;

/**
*
* {@inheritDoc}
*/
@Override
public byte[] bytes() {
byte[] index = bucketIndex.bytes();
byte[] num = bucketId.bytes();
return ArrayUtils.addAll(ArrayUtils.addAll(num, index), bucketBits.toByteArray());
byte[] num = filterId.bytes();
return ArrayUtils.addAll(num, filterBits.toByteArray());
}

/**
@@ -45,13 +43,12 @@ public byte[] bytes() {
@Override
public void read(byte[] bytes) {
if(bytes != null) {
bucketId = new IntegerIndex(Serializer.getInteger(bytes, 0));
bucketIndex = new IntegerIndex(Serializer.getInteger(bytes, 4));
filterId = new IntegerIndex(Serializer.getInteger(bytes, 0));

short start = 8;
bucketBits = new BitSet(bytes.length - start);
short start = 4;
filterBits = new BitSet(bytes.length - start);
for (int i = start; i < bytes.length; i++) {
bucketBits.set(i-start, bytes[i]);
filterBits.set(i-start, bytes[i]);
}
}
}
Original file line number Diff line number Diff line change
@@ -15,10 +15,12 @@

import com.iota.iri.conf.SnapshotConfig;
import com.iota.iri.model.Hash;
import com.iota.iri.model.persistables.CuckooBucket;
import com.iota.iri.model.IntegerIndex;
import com.iota.iri.model.persistables.Cuckoo;
import com.iota.iri.service.spentaddresses.SpentAddressesException;
import com.iota.iri.service.transactionpruning.PrunedTransactionException;
import com.iota.iri.service.transactionpruning.PrunedTransactionProvider;
import com.iota.iri.storage.Indexable;
import com.iota.iri.storage.Persistable;
import com.iota.iri.storage.rocksDB.RocksDBPersistenceProvider;
import com.iota.iri.utils.datastructure.CuckooFilter;
@@ -77,11 +79,9 @@ public PrunedTransactionProviderImpl init(SnapshotConfig config) throws PrunedTr
config.getPrunedTransactionsDbLogPath(),
1000,
new HashMap<String, Class<? extends Persistable>>(1)
{{put("pruned-transactions", CuckooBucket.class);}}, null);
{{put("pruned-transactions", Cuckoo.class);}}, null);
this.persistenceProvider.init();

// 10 * 1.000.000 * 4 hashes we can hold, with a total of 80mb size when max filled.
// The database has overhead for the bucket index, and allows a maximum of 2^7 - 1 filters
filters = new CircularFifoQueue<CuckooFilter>(MAX_FILTERS);

readPreviousPrunedTransactions();
@@ -94,33 +94,30 @@ public PrunedTransactionProviderImpl init(SnapshotConfig config) throws PrunedTr

private void readPreviousPrunedTransactions() throws PrunedTransactionException {
if (config.isTestnet()) {
newFilter();
try {
newFilter();
} catch (Exception e) {
// Ignorable, testnet starts empty, log for debugging
log.warn(e.getMessage());
}
return;
}

try {

TreeMap<Integer, CuckooFilter> filters = new TreeMap<>();

// Load all data from all filters
List<byte[]> bytes = persistenceProvider.loadAllKeysFromTable(CuckooBucket.class);
for (byte[] bucketData : bytes) {
CuckooBucket bucket = new CuckooBucket();
bucket.read(bucketData);
List<byte[]> bytes = persistenceProvider.loadAllKeysFromTable(Cuckoo.class);
for (byte[] filterData : bytes) {
Cuckoo bucket = new Cuckoo();
bucket.read(filterData);

if (MAX_FILTERS < bucket.bucketId.getValue()) {
if (MAX_FILTERS < bucket.filterId.getValue()) {
throw new PrunedTransactionException("Database contains more filters then we can store");
}

// Find its bucket, or create it if wasnt there
CuckooFilter filter = filters.get(bucket.bucketId.getValue());
if (null == filter) {
filter = new CuckooFilterImpl(filterSize, 4, 16);
filters.put(bucket.bucketId.getValue(), filter);
}

// Don't update using the entire CuckooBucket so that packages are separate-able
filter.update(bucket.bucketIndex.getValue(), bucket.bucketBits);
CuckooFilter filter = new CuckooFilterImpl(filterSize, 4, 16, bucket.filterBits);
filters.put(bucket.filterId.getValue(), filter);
}

//Then add all in order, treemap maintains order from lowest to highest key
@@ -138,15 +135,23 @@ private void readPreviousPrunedTransactions() throws PrunedTransactionException
newFilter();
}

} catch (IllegalArgumentException e) {
} catch (Exception e) {
throw new PrunedTransactionException(e);
}
}

private void persistFilter(CuckooFilter filter, Integer index) {

private void persistFilter(CuckooFilter filter, Integer index) throws Exception {
IntegerIndex intIndex = new IntegerIndex(index);
Cuckoo bucket = new Cuckoo();
bucket.filterId = intIndex;
bucket.filterBits = filter.getFilterData();
persistenceProvider.save(bucket, intIndex);
}

/**
*
* {@inheritDoc}
*/
@Override
public boolean containsTransaction(Hash transactionHash) throws PrunedTransactionException {
byte[] hashBytes = transactionHash.bytes();
@@ -162,17 +167,28 @@ public boolean containsTransaction(Hash transactionHash) throws PrunedTransactio
}

/**
* Adds a transaction to the latest filter. When this filter is full, saves and creates a new filter.
*
* {@inheritDoc}
* @throws PrunedTransactionException when saving the old (full) filter or deleting the oldest fails
*/
@Override
public void addTransaction(Hash transactionHash) throws PrunedTransactionException {
if (null == lastAddedFilter) {
newFilter();
}

if (!lastAddedFilter.add(transactionHash.bytes())){
newFilter().add(transactionHash.bytes());
try {
if (null == lastAddedFilter) {
newFilter();
}

if (!lastAddedFilter.add(transactionHash.bytes())){
try {
persistFilter(lastAddedFilter, highestIndex);
} catch (Exception e) {
throw new PrunedTransactionException(e);
}
newFilter().add(transactionHash.bytes());
}
} catch (Exception e) {
throw new PrunedTransactionException(e);
}
}

@@ -187,12 +203,17 @@ public void addTransactionBatch(Collection<Hash> transactionHashes) throws Prune
for (Hash transactionHash : transactionHashes) {
addTransaction(transactionHash);
}
persistFilter(lastAddedFilter, highestIndex);
try {
persistFilter(lastAddedFilter, highestIndex);
} catch (Exception e) {
throw new PrunedTransactionException(e);
}
}

private CuckooFilter newFilter() {
private CuckooFilter newFilter() throws Exception {
if (filters.isAtFullCapacity()) {
log.debug("Removing " + filters.peek());
persistenceProvider.delete(Cuckoo.class, new IntegerIndex(getLowestIndex()));
}

highestIndex++;
@@ -201,4 +222,12 @@ private CuckooFilter newFilter() {
filters.offer(lastAddedFilter = new CuckooFilterImpl(filterSize, 4, 16));
return lastAddedFilter;
}

private int getLowestIndex() {
if (highestIndex < MAX_FILTERS) {
return 0;
}

return highestIndex - MAX_FILTERS;
}
}
12 changes: 4 additions & 8 deletions src/main/java/com/iota/iri/utils/datastructure/CuckooFilter.java
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

import java.util.BitSet;

import com.iota.iri.model.persistables.CuckooBucket;
import com.iota.iri.model.persistables.Cuckoo;

/**
* The Cuckoo Filter is a probabilistic data structure that supports fast set membership testing.
@@ -89,12 +89,8 @@ public interface CuckooFilter {
int size();

/**
* Update a part of this filter with the bucket information supplied.
* Discards any previous data inside this bucket.
*
* @param index The index of the bucket we are updating
* @param bits the bucket data we use to update the filter
* @throws IllegalArgumentException when the bucket does not contain data which is valid for this filter
* This method returns a copy of all the bits that make up this filter
* @return The ilter bits
*/
void update(int index, BitSet bits) throws IllegalArgumentException ;
BitSet getFilterData();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.iota.iri.utils.datastructure.impl;

import com.iota.iri.model.persistables.CuckooBucket;
import com.iota.iri.utils.BitSetUtils;
import com.iota.iri.utils.datastructure.CuckooFilter;

@@ -123,6 +122,38 @@ public CuckooFilterImpl(int itemCount, int bucketSize, int fingerPrintSize) thro

cuckooFilterTable = new CuckooFilterTable(tableSize, bucketSize, fingerPrintSize);
}

/**
* Advanced constructor that allows for fine tuning of the desired filter.
*
* It first saves a reference to the hash function and then checks the parameters - the finger print size cannot
* be bigger than 128 bits because SHA1 generates 160 bits and we use 128 of that for the fingerprint and the rest
* for the index.
*
* After verifying that the passed in parameters are reasonable, we calculate the required size of the
* {@link CuckooFilterTable} by increasing the table size exponentially until we can fit the desired item count with
* a load factor of <= 0.955. Finally we create the {@link CuckooFilterTable} that will hold our data.
*
* NOTE: The actual size will be slightly bigger since the size has to be a power of 2 and take the optimal load
* factor of 0.955 into account.
*
* @param itemCount the minimum amount of items that should fit into the filter
* @param bucketSize the amount of items that can be stored in each bucket
* @param fingerPrintSize the amount of bits per fingerprint (it has to be bigger than 0 and smaller than 128)
* @param filterData The data this filter is initialized with. Must match with the parameters
* @throws IllegalArgumentException if the finger print size is too small or too big
* @throws InternalError if the SHA1 hashing function can not be found with this java version [should never happen]
*/
public CuckooFilterImpl(int itemCount, int bucketSize, int fingerPrintSize, BitSet filterData) throws IllegalArgumentException,
InternalError {

this(itemCount, bucketSize, fingerPrintSize);

if (cuckooFilterTable.data.size() != filterData.size()) {
throw new IllegalArgumentException("Filter data does not match filter parameters");
}
cuckooFilterTable.data = BitSet.valueOf(filterData.toByteArray());
}

/**
* {@inheritDoc}
@@ -143,31 +174,6 @@ public boolean add(String item) throws IndexOutOfBoundsException {
public boolean add(byte[] item) throws IndexOutOfBoundsException {
return add(new CuckooFilterItem(hashFunction.digest(item)));
}
/**
* {@inheritDoc}
*
*/
@Override
public void update(int index, BitSet bits) throws IllegalArgumentException {
int amountInBucket = bucketSize;
if (bits.length() % fingerPrintSize != 0) {
throw new IllegalArgumentException("Provided bits do not match fingerprint scheme");
} else if (bits.length() % fingerPrintSize > amountInBucket) {
throw new IllegalArgumentException("Provided fingerprint data will overflow the bucket");
} else if (index > tableSize * 0.955) {
throw new IllegalArgumentException("Provided bucket exceeds filter size");
} else if (false) {
// Can we recover input in any case when expected input does not match given?
}

for (int i=0; i < amountInBucket; i++) {
cuckooFilterTable.set(index, i, bits.get(
fingerPrintSize * i,
fingerPrintSize * (i + 1)
));

}
}

/**
* {@inheritDoc}
@@ -465,6 +471,16 @@ private BitSet generateFingerPrint(byte[] hash) throws IllegalArgumentException
// do a simple conversion of the byte array to a BitSet of the desired length
return BitSetUtils.convertByteArrayToBitSet(hash, 4, fingerPrintSize);
}

@Override
public String toString() {
return index + "";
}

@Override
public BitSet getFilterData() {
return (BitSet) cuckooFilterTable.data.clone();
}

/**
* Internal helper class to represent items that are stored in the filter.
@@ -617,9 +633,4 @@ public CuckooFilterTable delete(int bucketIndex, int slotIndex) {
return set(bucketIndex, slotIndex, null);
}
}

@Override
public String toString() {
return index +" ";
}
}