Skip to content

Commit

Permalink
feat(db): optimize for bloomFilter initialization (tronprotocol#5394)
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 authored Aug 21, 2023
1 parent f18864c commit f20e11b
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 3 deletions.
5 changes: 5 additions & 0 deletions chainbase/src/main/java/org/tron/core/ChainBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.tron.core.db.PbftSignDataStore;
import org.tron.core.db.RecentBlockStore;
import org.tron.core.db.RecentTransactionStore;
import org.tron.core.db.TransactionCache;
import org.tron.core.db.TransactionStore;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.exception.BadItemException;
Expand Down Expand Up @@ -237,6 +238,9 @@ public class ChainBaseManager {
@Autowired
private DbStatService dbStatService;

@Autowired
private TransactionCache transactionCache;

@Getter
@Setter
private NodeType nodeType;
Expand Down Expand Up @@ -291,6 +295,7 @@ public void closeAllStore() {
closeOneStore(pbftSignDataStore);
closeOneStore(sectionBloomStore);
closeOneStore(accountAssetStore);
closeOneStore(transactionCache);
}

// for test only
Expand Down
149 changes: 146 additions & 3 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,24 @@
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.primitives.Longs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.bouncycastle.util.encoders.Hex;
Expand All @@ -17,6 +31,7 @@
import org.tron.common.storage.leveldb.LevelDbDataSourceImpl;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.JsonUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.capsule.BytesCapsule;
Expand All @@ -42,6 +57,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
private BloomFilter<byte[]>[] bloomFilters = new BloomFilter[2];
// filterStartBlock record the start block of the active filter
private volatile long filterStartBlock = INVALID_BLOCK;
private volatile long currentBlockNum = INVALID_BLOCK;
// currentFilterIndex records the index of the active filter
private volatile int currentFilterIndex = 0;

Expand All @@ -57,6 +73,12 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
// replace persistentStore and optimizes startup performance
private RecentTransactionStore recentTransactionStore;

private final Path cacheFile0;
private final Path cacheFile1;
private final Path cacheProperties;
private final Path cacheDir;
private AtomicBoolean isValid = new AtomicBoolean(false);

public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
this.name = name;
this.TRANSACTION_COUNT =
Expand Down Expand Up @@ -85,6 +107,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
this.bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
cacheDir = Paths.get(CommonParameter.getInstance().getOutputDirectory(), ".cache");
this.cacheFile0 = Paths.get(cacheDir.toString(), "bloomFilters_0");
this.cacheFile1 = Paths.get(cacheDir.toString(), "bloomFilters_1");
this.cacheProperties = Paths.get(cacheDir.toString(), "txCache.properties");

}

Expand All @@ -110,6 +136,10 @@ private void initCache() {
}

public void init() {
if (recovery()) {
isValid.set(true);
return;
}
long size = recentTransactionStore.size();
if (size != MAX_BLOCK_SIZE) {
// 0. load from persistentStore
Expand All @@ -129,6 +159,7 @@ public void init() {
logger.info("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms.",
bloomFilters[1].approximateElementCount(), bloomFilters[1].expectedFpp(),
System.currentTimeMillis() - start);
isValid.set(true);
}

@Override
Expand Down Expand Up @@ -172,7 +203,7 @@ public void put(byte[] key, byte[] value) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
}
bloomFilters[currentFilterIndex].put(key);

currentBlockNum = blockNum;
if (lastMetricBlock != blockNum) {
lastMetricBlock = blockNum;
Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE,
Expand Down Expand Up @@ -208,13 +239,15 @@ public Iterator<Entry<byte[], byte[]>> iterator() {
}

@Override
public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
public synchronized void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
isValid.set(false);
batch.forEach((k, v) -> this.put(k.getBytes(), v.getBytes()));
isValid.set(true);
}

@Override
public void close() {
reset();
dump();
bloomFilters[0] = null;
bloomFilters[1] = null;
persistentStore.close();
Expand All @@ -224,6 +257,116 @@ public void close() {
public void reset() {
}

private boolean recovery() {
FileUtil.createDirIfNotExists(this.cacheDir.toString());
logger.info("recovery bloomFilters start.");
CompletableFuture<Boolean> loadProperties = CompletableFuture.supplyAsync(this::loadProperties);
CompletableFuture<Boolean> tk0 = loadProperties.thenApplyAsync(
v -> recovery(0, this.cacheFile0));
CompletableFuture<Boolean> tk1 = loadProperties.thenApplyAsync(
v -> recovery(1, this.cacheFile1));

return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
}

private boolean recovery(int index, Path file) {
try (InputStream in = new BufferedInputStream(Files.newInputStream(file,
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) {
logger.info("recovery bloomFilter[{}] from file.", index);
long start = System.currentTimeMillis();
bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel());
logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean handleException(Throwable e) {
bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
try {
Files.deleteIfExists(this.cacheFile0);
Files.deleteIfExists(this.cacheFile1);
} catch (Exception ignored) {

}
logger.info("recovery bloomFilters failed. {}", e.getMessage());
logger.info("rollback to previous mode.");
return false;
}

private void dump() {
if (!isValid.get()) {
logger.info("bloomFilters is not valid.");
}
FileUtil.createDirIfNotExists(this.cacheDir.toString());
logger.info("dump bloomFilters start.");
CompletableFuture<Void> task0 = CompletableFuture.runAsync(
() -> dump(0, this.cacheFile0));
CompletableFuture<Void> task1 = CompletableFuture.runAsync(
() -> dump(1, this.cacheFile1));
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");

}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
}

private void dump(int index, Path file) {
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
bloomFilters[index].writeTo(out);
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean loadProperties() {
try (Reader r = new InputStreamReader(new BufferedInputStream(Files.newInputStream(
this.cacheProperties, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)),
StandardCharsets.UTF_8)) {
Properties properties = new Properties();
properties.load(r);
filterStartBlock = Long.parseLong(properties.getProperty("filterStartBlock"));
currentBlockNum = Long.parseLong(properties.getProperty("currentBlockNum"));
currentFilterIndex = Integer.parseInt(properties.getProperty("currentFilterIndex"));
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void writeProperties() {
try (Writer w = Files.newBufferedWriter(this.cacheProperties, StandardCharsets.UTF_8)) {
Properties properties = new Properties();
properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock));
properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum));
properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex));
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public TxCacheDB newInstance() {
return new TxCacheDB(name, recentTransactionStore);
Expand Down
90 changes: 90 additions & 0 deletions framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.tron.core.db;

import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.tron.common.application.TronApplicationContext;
import org.tron.common.utils.ByteArray;
import org.tron.core.Constant;
import org.tron.core.capsule.BytesCapsule;
import org.tron.core.config.DefaultConfig;
import org.tron.core.config.args.Args;
import org.tron.keystore.Wallet;

@Slf4j
public class TxCacheDBInitTest {

private static TronApplicationContext context;

@ClassRule
public static final TemporaryFolder temporaryFolder = new TemporaryFolder();

private final byte[][] hash = new byte[140000][64];

@AfterClass
public static void destroy() {
context.destroy();
Args.clearParam();
}

/**
* Init data.
*/
@BeforeClass
public static void init() throws IOException {
Args.setParam(new String[]{"--output-directory", temporaryFolder.newFolder().toString(),
"--p2p-disable", "true"}, Constant.TEST_CONF);
context = new TronApplicationContext(DefaultConfig.class);
}

@Test
public void reload() {
TransactionCache db = context.getBean(TransactionCache.class);
db.initCache();
putTransaction();
DefaultListableBeanFactory defaultListableBeanFactory =
(DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
queryTransaction();
defaultListableBeanFactory.destroySingleton("transactionCache");
TransactionCache transactionCache = new TransactionCache("transactionCache",
context.getBean(RecentTransactionStore.class));
transactionCache.initCache();
defaultListableBeanFactory.registerSingleton("transactionCache",transactionCache);
queryTransaction();
}

private void putTransaction() {
TransactionCache db = context.getBean(TransactionCache.class);
for (int i = 1; i < 140000; i++) {
hash[i] = Wallet.generateRandomBytes(64);
db.put(hash[i], new BytesCapsule(ByteArray.fromLong(i)));
}
}

private void queryTransaction() {
TransactionCache db = context.getBean(TransactionCache.class);
// [1,65537] are expired
for (int i = 1; i < 65538; i++) {
try {
Assert.assertFalse("index = " + i, db.has(hash[i]));
} catch (Exception e) {
Assert.fail("transaction should be expired index = " + i);
}
}
// [65538,140000] are in cache
for (int i = 65538; i < 140000; i++) {
try {
Assert.assertTrue("index = " + i, db.has(hash[i]));
} catch (Exception e) {
Assert.fail("transaction should not be expired index = " + i);
}
}
}

}

0 comments on commit f20e11b

Please sign in to comment.