diff --git a/api/service/explorer/interface.go b/api/service/explorer/interface.go index d4eed5c6d4..b14fa27032 100644 --- a/api/service/explorer/interface.go +++ b/api/service/explorer/interface.go @@ -39,6 +39,7 @@ type explorerDB struct { db ethdb.KeyValueStore } +// newExplorerLvlDB new explorer storage using leveldb func newExplorerLvlDB(dbPath string) (database, error) { db, err := leveldb.New(dbPath, 16, 500, "explorer_db") if err != nil { @@ -47,6 +48,7 @@ func newExplorerLvlDB(dbPath string) (database, error) { return &explorerDB{db}, nil } +// newExplorerTiKv new explorer storage using leveldb func newExplorerTiKv(pdAddr []string, dbPath string, readOnly bool) (database, error) { prefixStr := append([]byte(path.Base(dbPath)), '/') db, err := remote.NewRemoteDatabase(pdAddr, readOnly) diff --git a/api/service/explorer/schema.go b/api/service/explorer/schema.go index 56b13c200f..8cf970c189 100644 --- a/api/service/explorer/schema.go +++ b/api/service/explorer/schema.go @@ -20,6 +20,7 @@ const ( oneAddrByteLen = 42 // byte size of string "one1..." ) +// readCheckpointBitmap read explorer checkpoint bitmap from storage func readCheckpointBitmap(db databaseReader) (*roaring64.Bitmap, error) { bitmapByte, err := db.Get([]byte(CheckpointBitmap)) if err != nil { @@ -38,6 +39,7 @@ func readCheckpointBitmap(db databaseReader) (*roaring64.Bitmap, error) { return rb, nil } +// writeCheckpointBitmap write explorer checkpoint bitmap to storage func writeCheckpointBitmap(db databaseWriter, rb *roaring64.Bitmap) error { bitmapByte, err := rb.MarshalBinary() if err != nil { diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index 14714d4be1..62f3fc7f45 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -338,6 +338,7 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } +// GetCheckpointBitmap get explorer checkpoint bitmap func (s *Service) GetCheckpointBitmap() *roaring64.Bitmap { return s.storage.rb.Clone() } diff --git a/api/service/explorer/storage.go b/api/service/explorer/storage.go index a72f249be7..bd533a8965 100644 --- a/api/service/explorer/storage.go +++ b/api/service/explorer/storage.go @@ -63,11 +63,13 @@ func newStorage(hc *harmonyconfig.HarmonyConfig, bc *core2.BlockChain, dbPath st var err error if hc.General.UseTiKV { + // init the storage using tikv dbPath = fmt.Sprintf("explorer_tikv_%d", hc.General.ShardID) readOnly := hc.TiKV.Role == "Reader" utils.Logger().Info().Msg("explorer storage in tikv: " + dbPath) db, err = newExplorerTiKv(hc.TiKV.PDAddr, dbPath, readOnly) } else { + // or leveldb utils.Logger().Info().Msg("explorer storage folder: " + dbPath) db, err = newExplorerLvlDB(dbPath) } @@ -77,6 +79,8 @@ func newStorage(hc *harmonyconfig.HarmonyConfig, bc *core2.BlockChain, dbPath st return nil, err } + // load checkpoint roaring bitmap from storage + // roaring bitmap is a very high compression bitmap, in our scene, 1 million blocks use almost 1kb storage bitmap, err := readCheckpointBitmap(db) if err != nil { return nil, err @@ -271,6 +275,7 @@ func (tm *taskManager) PullTask() *types.Block { return nil } +// markBlockDone mark block processed done func (tm *taskManager) markBlockDone(btc batch, blockNum uint64) { tm.lock.Lock() defer tm.lock.Unlock() @@ -278,6 +283,7 @@ func (tm *taskManager) markBlockDone(btc batch, blockNum uint64) { if tm.rb.CheckedAdd(blockNum) { tm.rbChangedCount++ + // every 100 change write once if tm.rbChangedCount == 100 { tm.rbChangedCount = 0 _ = writeCheckpointBitmap(btc, tm.rb) @@ -285,6 +291,7 @@ func (tm *taskManager) markBlockDone(btc batch, blockNum uint64) { } } +// markBlockDone check block is processed done func (tm *taskManager) hasBlockDone(blockNum uint64) bool { tm.lock.Lock() defer tm.lock.Unlock() diff --git a/core/blockchain.go b/core/blockchain.go index f8a59258b1..2a98879264 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -147,9 +147,13 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - latestCleanCacheNum uint64 - cleanCacheChan chan uint64 - redisPreempt *redis_helper.RedisPreempt + // The following two variables are used to clean up the cache of redis in tikv mode. + // This can improve the cache hit rate of redis + latestCleanCacheNum uint64 // The most recently cleaned cache of block num + cleanCacheChan chan uint64 // Used to notify blocks that will be cleaned up + + // redisPreempt used in tikv mode, write nodes preempt for write permissions and publish updates to reader nodes + redisPreempt *redis_helper.RedisPreempt hc *HeaderChain trace bool // atomic? @@ -711,7 +715,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) error { return nil } -// writeHeadBlock writes a new head block +// tikvFastForward writes a new head block in tikv mode, used for reader node or follower writer node func (bc *BlockChain) tikvFastForward(block *types.Block, logs []*types.Log) error { bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) @@ -1249,6 +1253,7 @@ func (bc *BlockChain) WriteBlockWithState( return NonStatTy, err } + // clean block tire info in redis, used for tikv mode if block.NumberU64() > triesInRedis { select { case bc.cleanCacheChan <- block.NumberU64() - triesInRedis: @@ -1378,6 +1383,7 @@ func (bc *BlockChain) GetMaxGarbageCollectedBlockNumber() int64 { // // After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { + // if in tikv mode, writer node need preempt master or come be a follower if bc.redisPreempt != nil && !bc.tikvPreemptMaster(bc.rangeBlock(chain)) { return len(chain), nil } @@ -1385,6 +1391,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, n, events, logs, err := bc.insertChain(chain, verifyHeaders) bc.PostChainEvents(events, logs) if bc.redisPreempt != nil && err != nil { + // if has some error, master writer node will release the permission _, _ = bc.redisPreempt.Unlock() } return n, err @@ -1627,6 +1634,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int, events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block + // used for tikv mode, writer node will publish update to all reader node err = redis_helper.PublishShardUpdate(bc.ShardID(), block.NumberU64(), logs) if err != nil { utils.Logger().Info().Err(err).Msg("redis publish shard update error") @@ -3211,6 +3219,7 @@ func (bc *BlockChain) IsEnablePruneBeaconChainFeature() bool { return bc.pruneBeaconChainEnable } +// SyncFromTiKVWriter used for tikv mode, all reader or follower writer used to sync block from master writer func (bc *BlockChain) SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) error { head := rawdb.ReadHeadBlockHash(bc.db) dbBlock := bc.GetBlockByHash(head) @@ -3241,22 +3250,26 @@ func (bc *BlockChain) SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) er return nil } +// tikvCleanCache used for tikv mode, clean block tire data from redis func (bc *BlockChain) tikvCleanCache() { var count int for to := range bc.cleanCacheChan { for i := bc.latestCleanCacheNum + 1; i <= to; i++ { + // build previous block statedb fromBlock := bc.GetBlockByNumber(i) fromTrie, err := state.New(fromBlock.Root(), bc.stateCache) if err != nil { continue } + // build current block statedb toBlock := bc.GetBlockByNumber(i + 1) toTrie, err := state.New(toBlock.Root(), bc.stateCache) if err != nil { continue } + // diff two statedb and delete redis cache start := time.Now() count, err = fromTrie.DiffAndCleanCache(bc.ShardID(), toTrie) if err != nil { @@ -3272,12 +3285,15 @@ func (bc *BlockChain) tikvCleanCache() { } } +// tikvPreemptMaster used for tikv mode, writer node need preempt master or come be a follower func (bc *BlockChain) tikvPreemptMaster(fromBlock, toBlock uint64) bool { for { + // preempt master if ok, _ := bc.redisPreempt.TryLock(60); ok { return true } + // follower if bc.CurrentBlock().NumberU64() >= toBlock { return false } @@ -3286,6 +3302,7 @@ func (bc *BlockChain) tikvPreemptMaster(fromBlock, toBlock uint64) bool { } } +// rangeBlock get the block range of blocks func (bc *BlockChain) rangeBlock(blocks types.Blocks) (uint64, uint64) { if len(blocks) == 0 { return 0, 0 @@ -3303,18 +3320,24 @@ func (bc *BlockChain) rangeBlock(blocks types.Blocks) (uint64, uint64) { return min, max } +// RedisPreempt used for tikv mode, get the redis preempt instance func (bc *BlockChain) RedisPreempt() *redis_helper.RedisPreempt { return bc.redisPreempt } +// InitTiKV used for tikv mode, init the tikv mode func (bc *BlockChain) InitTiKV(conf *harmonyconfig.TiKVConfig) { bc.cleanCacheChan = make(chan uint64, 10) if conf.Role == "Writer" { + // only writer need preempt permission bc.redisPreempt = redis_helper.CreatePreempt(fmt.Sprintf("shard_%d_preempt", bc.ShardID())) } if conf.Debug { + // used for init redis cache + // If redis is empty, the hit rate will be too low and the synchronization block speed will be slow + // set LOAD_PRE_FETCH is yes can significantly improve this. if os.Getenv("LOAD_PRE_FETCH") == "yes" { if trie, err := state.New(bc.CurrentBlock().Root(), bc.stateCache); err == nil { trie.Prefetch(512) @@ -3322,6 +3345,9 @@ func (bc *BlockChain) InitTiKV(conf *harmonyconfig.TiKVConfig) { log.Println("LOAD_PRE_FETCH ERR: ", err) } } + + // If redis is empty, there is no need to clear the cache of nearly 1000 blocks + // set CLEAN_INIT is the latest block num can skip slow and unneeded cleanup process if os.Getenv("CLEAN_INIT") != "" { from, err := strconv.Atoi(os.Getenv("CLEAN_INIT")) if err == nil { @@ -3330,6 +3356,7 @@ func (bc *BlockChain) InitTiKV(conf *harmonyconfig.TiKVConfig) { } } + // start clean block tire data process go bc.tikvCleanCache() } diff --git a/core/state/prefeth.go b/core/state/prefeth.go index 7c2697f641..065a6e2a63 100644 --- a/core/state/prefeth.go +++ b/core/state/prefeth.go @@ -18,13 +18,15 @@ type prefetchJob struct { start, end []byte } -// Prefetch +// Prefetch If redis is empty, the hit rate will be too low and the synchronization block speed will be slow +// this function will parallel load the latest block statedb to redis func (s *DB) Prefetch(parallel int) { wg := sync.WaitGroup{} jobChan := make(chan *prefetchJob, 10000) waitWorker := int64(parallel) + // start parallel workers for i := 0; i < parallel; i++ { go func() { defer wg.Done() @@ -38,6 +40,7 @@ func (s *DB) Prefetch(parallel int) { wg.Add(1) } + // add first jobs for i := 0; i < 255; i++ { start := []byte{byte(i)} end := []byte{byte(i + 1)} @@ -53,6 +56,7 @@ func (s *DB) Prefetch(parallel int) { } } + // wait all worker done start := time.Now() log.Println("Prefetch start") var sleepCount int @@ -72,8 +76,10 @@ func (s *DB) Prefetch(parallel int) { log.Println("Prefetch end, use", end.Sub(start)) } +// prefetchWorker used to process one job func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) { if job.account == nil { + // scan one account nodeIterator := s.trie.NodeIterator(job.start) it := trie.NewIterator(nodeIterator) @@ -82,6 +88,7 @@ func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) { return } + // build account data from main trie tree var data Account if err := rlp.DecodeBytes(it.Value, &data); err != nil { panic(err) @@ -93,21 +100,27 @@ func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) { obj.Code(s.db) } + // build account trie tree storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(nil)) storageJob := &prefetchJob{ accountAddr: addrBytes, account: &data, } + + // fetch data s.prefetchAccountStorage(jobs, storageJob, storageIt) } } else { + // scan main trie tree obj := newObject(s, common.BytesToAddress(job.accountAddr), *job.account) storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(job.start)) + // fetch data s.prefetchAccountStorage(jobs, job, storageIt) } } +// bytesMiddle get the binary middle value of a and b func (s *DB) bytesMiddle(a, b []byte) []byte { if a == nil && b == nil { return []byte{128} @@ -141,6 +154,7 @@ func (s *DB) bytesMiddle(a, b []byte) []byte { return aI.Bytes() } +// prefetchAccountStorage used for fetch account storage func (s *DB) prefetchAccountStorage(jobs chan *prefetchJob, job *prefetchJob, it *trie.Iterator) { start := time.Now() count := 0 @@ -152,6 +166,7 @@ func (s *DB) prefetchAccountStorage(jobs chan *prefetchJob, job *prefetchJob, it count++ + // if fetch one account job used more than 15s, then split the job if count%10000 == 0 && time.Now().Sub(start) > 15*time.Second { middle := s.bytesMiddle(it.Key, job.end) startJob := &prefetchJob{ diff --git a/core/state/tikv_clean.go b/core/state/tikv_clean.go index 216d3f87da..d875000ae0 100644 --- a/core/state/tikv_clean.go +++ b/core/state/tikv_clean.go @@ -12,7 +12,9 @@ import ( var secureKeyPrefix = []byte("secure-key-") +// DiffAndCleanCache clean block tire data from redis, Used to reduce redis storage and increase hit rate func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) { + // create difference iterator it, _ := trie.NewDifferenceIterator(to.trie.NodeIterator(nil), s.trie.NodeIterator(nil)) db, err := tikv_manage.GetDefaultTiKVFactory().NewCacheStateDB(shardId) if err != nil { @@ -24,9 +26,12 @@ func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) { count := uint64(0) for it.Next(true) { if !it.Leaf() { + // delete it if trie leaf node atomic.AddUint64(&count, 1) _ = batch.Delete(it.Hash().Bytes()) } else { + + // build account data addrBytes := s.trie.GetKey(it.LeafKey()) addr := common.BytesToAddress(addrBytes) @@ -41,14 +46,17 @@ func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) { continue } + // if account not changed, skip if bytes.Compare(fromAccount.Root.Bytes(), toAccount.Root.Bytes()) == 0 { continue } + // create account difference iterator fromAccountTrie := newObject(s, addr, fromAccount).getTrie(s.db) toAccountTrie := newObject(to, addr, toAccount).getTrie(to.db) accountIt, _ := trie.NewDifferenceIterator(toAccountTrie.NodeIterator(nil), fromAccountTrie.NodeIterator(nil)) + // parallel to delete data wg.Add(1) go func() { defer wg.Done() diff --git a/core/tx_pool.go b/core/tx_pool.go index 3d5b6b18e0..ef9a39f1a3 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -929,6 +929,7 @@ func (pool *TxPool) pendingEpoch() *big.Int { func (pool *TxPool) add(tx types.PoolTransaction, local bool) (replaced bool, err error) { defer func() { if err == nil && pool.config.AddEvent != nil { + // used for tikv mode, writer will publish txpool change to all reader, this makes the state consistent pool.config.AddEvent(tx, local) } }() diff --git a/internal/shardchain/dbfactory_tikv.go b/internal/shardchain/dbfactory_tikv.go index 2bc0101951..1e68caf233 100644 --- a/internal/shardchain/dbfactory_tikv.go +++ b/internal/shardchain/dbfactory_tikv.go @@ -33,6 +33,7 @@ type TiKvFactory struct { CacheConfig statedb_cache.StateDBCacheConfig } +// getStateDB create statedb storage use tikv func (f *TiKvFactory) getRemoteDB(shardID uint32) (*prefix.PrefixDatabase, error) { key := fmt.Sprintf("remote_%d_%s", shardID, f.Role) if db, ok := f.cacheDBMap.Load(key); ok { @@ -54,6 +55,7 @@ func (f *TiKvFactory) getRemoteDB(shardID uint32) (*prefix.PrefixDatabase, error } } +// getStateDB create statedb storage with local memory cahce func (f *TiKvFactory) getStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) { key := fmt.Sprintf("state_db_%d_%s", shardID, f.Role) if db, ok := f.cacheDBMap.Load(key); ok { @@ -92,7 +94,7 @@ func (f *TiKvFactory) NewChainDB(shardID uint32) (ethdb.Database, error) { return rawdb.NewDatabase(database), nil } -// NewStateDB +// NewStateDB create shard tikv database func (f *TiKvFactory) NewStateDB(shardID uint32) (ethdb.Database, error) { var database ethdb.KeyValueStore @@ -106,10 +108,12 @@ func (f *TiKvFactory) NewStateDB(shardID uint32) (ethdb.Database, error) { return rawdb.NewDatabase(database), nil } +// NewCacheStateDB create shard statedb storage with memory cache func (f *TiKvFactory) NewCacheStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) { return f.getStateDB(shardID) } +// CloseAllDB close all tikv database func (f *TiKvFactory) CloseAllDB() { f.cacheDBMap.Range(func(_, value interface{}) bool { if closer, ok := value.(io.Closer); ok { diff --git a/internal/shardchain/shardchains.go b/internal/shardchain/shardchains.go index 82daaa3b41..f5023f2b11 100644 --- a/internal/shardchain/shardchains.go +++ b/internal/shardchain/shardchains.go @@ -110,6 +110,7 @@ func (sc *CollectionImpl) ShardChain(shardID uint32) (*core.BlockChain, error) { var stateCache state.Database if sc.harmonyconfig.General.UseTiKV { + // used for tikv mode, init state db using tikv storage stateDB, err := tikv_manage.GetDefaultTiKVFactory().NewStateDB(shardID) if err != nil { return nil, err @@ -129,6 +130,7 @@ func (sc *CollectionImpl) ShardChain(shardID uint32) (*core.BlockChain, error) { sc.pool[shardID] = bc if sc.harmonyconfig.General.UseTiKV { + // init the tikv mode bc.InitTiKV(sc.harmonyconfig.TiKV) } diff --git a/internal/tikv/common/tikv_store.go b/internal/tikv/common/tikv_store.go index 230fa9490e..a5f7aeb0ea 100644 --- a/internal/tikv/common/tikv_store.go +++ b/internal/tikv/common/tikv_store.go @@ -17,6 +17,7 @@ type TiKVStore interface { NewIterator(start, end []byte) ethdb.Iterator } +// TiKVStoreWrapper simple wrapper to covert to ethdb.KeyValueStore type TiKVStoreWrapper struct { TiKVStore } diff --git a/internal/tikv/prefix/prefix_batch.go b/internal/tikv/prefix/prefix_batch.go index 4da3bd4665..733496cbe8 100644 --- a/internal/tikv/prefix/prefix_batch.go +++ b/internal/tikv/prefix/prefix_batch.go @@ -11,25 +11,32 @@ func newPrefixBatch(prefix []byte, batch ethdb.Batch) *PrefixBatch { return &PrefixBatch{prefix: prefix, batch: batch} } +// Put inserts the given value into the key-value data store. func (p *PrefixBatch) Put(key []byte, value []byte) error { return p.batch.Put(append(append([]byte{}, p.prefix...), key...), value) } +// Delete removes the key from the key-value data store. func (p *PrefixBatch) Delete(key []byte) error { return p.batch.Delete(append(append([]byte{}, p.prefix...), key...)) } + +// ValueSize retrieves the amount of data queued up for writing. func (p *PrefixBatch) ValueSize() int { return p.batch.ValueSize() } +// Write flushes any accumulated data to disk. func (p *PrefixBatch) Write() error { return p.batch.Write() } +// Reset resets the batch for reuse. func (p *PrefixBatch) Reset() { p.batch.Reset() } +// Replay replays the batch contents. func (p *PrefixBatch) Replay(w ethdb.KeyValueWriter) error { return p.batch.Replay(newPrefixBatchReplay(p.prefix, w)) } diff --git a/internal/tikv/prefix/prefix_batch_replay.go b/internal/tikv/prefix/prefix_batch_replay.go index e80c93eceb..7b3e0ff674 100644 --- a/internal/tikv/prefix/prefix_batch_replay.go +++ b/internal/tikv/prefix/prefix_batch_replay.go @@ -15,6 +15,7 @@ func newPrefixBatchReplay(prefix []byte, w ethdb.KeyValueWriter) *PrefixBatchRep return &PrefixBatchReplay{prefix: prefix, prefixLen: len(prefix), w: w} } +// Put inserts the given value into the key-value data store. func (p *PrefixBatchReplay) Put(key []byte, value []byte) error { if bytes.HasPrefix(key, p.prefix) { return p.w.Put(key[p.prefixLen:], value) @@ -23,6 +24,7 @@ func (p *PrefixBatchReplay) Put(key []byte, value []byte) error { } } +// Delete removes the key from the key-value data store. func (p *PrefixBatchReplay) Delete(key []byte) error { if bytes.HasPrefix(key, p.prefix) { return p.w.Delete(key[p.prefixLen:]) diff --git a/internal/tikv/prefix/prefix_database.go b/internal/tikv/prefix/prefix_database.go index 1a6421be2e..7bfe9d9451 100644 --- a/internal/tikv/prefix/prefix_database.go +++ b/internal/tikv/prefix/prefix_database.go @@ -6,6 +6,7 @@ import ( "github.com/harmony-one/harmony/internal/tikv/common" ) +// PrefixDatabase is a wrapper to split the storage with prefix type PrefixDatabase struct { prefix []byte db common.TiKVStore @@ -20,6 +21,7 @@ func NewPrefixDatabase(prefix []byte, db common.TiKVStore) *PrefixDatabase { } } +// makeKey use to create a key with prefix, keysPool can reduce gc pressure func (p *PrefixDatabase) makeKey(keys []byte) []byte { prefixLen := len(p.prefix) byt := p.keysPool.Get(len(keys) + prefixLen) @@ -29,26 +31,33 @@ func (p *PrefixDatabase) makeKey(keys []byte) []byte { return byt } +// Has retrieves if a key is present in the key-value data store. func (p *PrefixDatabase) Has(key []byte) (bool, error) { return p.db.Has(p.makeKey(key)) } +// Get retrieves the given key if it's present in the key-value data store. func (p *PrefixDatabase) Get(key []byte) ([]byte, error) { return p.db.Get(p.makeKey(key)) } +// Put inserts the given value into the key-value data store. func (p *PrefixDatabase) Put(key []byte, value []byte) error { return p.db.Put(p.makeKey(key), value) } +// Delete removes the key from the key-value data store. func (p *PrefixDatabase) Delete(key []byte) error { return p.db.Delete(p.makeKey(key)) } +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. func (p *PrefixDatabase) NewBatch() ethdb.Batch { return newPrefixBatch(p.prefix, p.db.NewBatch()) } +// buildLimitUsePrefix build the limit byte from start byte, useful generating from prefix works func (p *PrefixDatabase) buildLimitUsePrefix() []byte { var limit []byte for i := len(p.prefix) - 1; i >= 0; i-- { @@ -64,6 +73,8 @@ func (p *PrefixDatabase) buildLimitUsePrefix() []byte { return limit } +// NewIterator creates a binary-alphabetical iterator over the start to end keyspace +// contained within the key-value database. func (p *PrefixDatabase) NewIterator(start, end []byte) ethdb.Iterator { start = append(p.prefix, start...) @@ -76,14 +87,23 @@ func (p *PrefixDatabase) NewIterator(start, end []byte) ethdb.Iterator { return newPrefixIterator(p.prefix, p.db.NewIterator(start, end)) } +// Stat returns a particular internal stat of the database. func (p *PrefixDatabase) Stat(property string) (string, error) { return p.db.Stat(property) } +// Compact flattens the underlying data store for the given key range. In essence, +// deleted and overwritten versions are discarded, and the data is rearranged to +// reduce the cost of operations needed to access them. +// +// A nil start is treated as a key before all keys in the data store; a nil limit +// is treated as a key after all keys in the data store. If both is nil then it +// will compact entire data store. func (p *PrefixDatabase) Compact(start []byte, limit []byte) error { return p.db.Compact(start, limit) } +// Close the storage func (p *PrefixDatabase) Close() error { return p.db.Close() } diff --git a/internal/tikv/prefix/prefix_iterator.go b/internal/tikv/prefix/prefix_iterator.go index ca61522d5f..7f85626031 100644 --- a/internal/tikv/prefix/prefix_iterator.go +++ b/internal/tikv/prefix/prefix_iterator.go @@ -15,14 +15,21 @@ func newPrefixIterator(prefix []byte, it ethdb.Iterator) *PrefixIterator { return &PrefixIterator{prefix: prefix, prefixLen: len(prefix), it: it} } +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. func (i *PrefixIterator) Next() bool { return i.it.Next() } +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. func (i *PrefixIterator) Error() error { return i.it.Error() } +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. func (i *PrefixIterator) Key() []byte { key := i.it.Key() if len(key) < len(i.prefix) { @@ -32,10 +39,15 @@ func (i *PrefixIterator) Key() []byte { return key[i.prefixLen:] } +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. func (i *PrefixIterator) Value() []byte { return i.it.Value() } +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. func (i *PrefixIterator) Release() { i.it.Release() } diff --git a/internal/tikv/redis_helper/common.go b/internal/tikv/redis_helper/common.go index 3d86dbfc62..d237088a92 100644 --- a/internal/tikv/redis_helper/common.go +++ b/internal/tikv/redis_helper/common.go @@ -7,6 +7,7 @@ import ( var redisInstance *redis.ClusterClient +// Init used to init redis instance in tikv mode func Init(serverAddr []string) error { option := &redis.ClusterOptions{ Addrs: serverAddr, @@ -23,6 +24,7 @@ func Init(serverAddr []string) error { return nil } +// Close disconnect redis instance in tikv mode func Close() error { if redisInstance != nil { return redisInstance.Close() diff --git a/internal/tikv/redis_helper/lock.go b/internal/tikv/redis_helper/lock.go index e8a15e5683..718caf422b 100644 --- a/internal/tikv/redis_helper/lock.go +++ b/internal/tikv/redis_helper/lock.go @@ -14,6 +14,7 @@ type RedisPreempt struct { lastLockStatus bool } +// CreatePreempt used to create a redis preempt instance func CreatePreempt(key string) *RedisPreempt { p := &RedisPreempt{ key: key, @@ -23,6 +24,7 @@ func CreatePreempt(key string) *RedisPreempt { return p } +// init redis preempt instance and some script func (p *RedisPreempt) init() { byt := make([]byte, 18) _, _ = rand.Read(byt) @@ -43,12 +45,14 @@ func (p *RedisPreempt) init() { `) } +// TryLock attempt to lock the master for ttlSecond func (p *RedisPreempt) TryLock(ttlSecond int) (ok bool, err error) { ok, err = p.lockScript.Run(context.Background(), redisInstance, []string{p.key}, p.password, ttlSecond).Bool() p.lastLockStatus = ok return } +// Unlock try to release the master permission func (p *RedisPreempt) Unlock() (bool, error) { if p == nil { return false, nil @@ -57,6 +61,7 @@ func (p *RedisPreempt) Unlock() (bool, error) { return p.unlockScript.Run(context.Background(), redisInstance, []string{p.key}, p.password).Bool() } +// LastLockStatus get the last preempt status func (p *RedisPreempt) LastLockStatus() bool { if p == nil { return false diff --git a/internal/tikv/redis_helper/pubsub.go b/internal/tikv/redis_helper/pubsub.go index ba277a4ce8..354c7a9657 100644 --- a/internal/tikv/redis_helper/pubsub.go +++ b/internal/tikv/redis_helper/pubsub.go @@ -11,11 +11,13 @@ import ( "io" ) +// BlockUpdate block update event type BlockUpdate struct { BlkNum uint64 Logs []*types.Log } +// SubscribeShardUpdate subscribe block update event func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.Log)) { pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("shard_update_%d", shardID)) for message := range pubsub.Channel() { @@ -29,6 +31,7 @@ func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.L } } +// PublishShardUpdate publish block update event func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error { msg, err := rlp.EncodeToBytes(&BlockUpdate{ BlkNum: blkNum, @@ -40,12 +43,14 @@ func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error return redisInstance.Publish(context.Background(), fmt.Sprintf("shard_update_%d", shardID), msg).Err() } +//TxPoolUpdate tx pool update event type TxPoolUpdate struct { typ string Local bool Tx types.PoolTransaction } +// DecodeRLP decode struct from binary stream func (t *TxPoolUpdate) DecodeRLP(stream *rlp.Stream) error { if err := stream.Decode(&t.typ); err != nil { return err @@ -79,6 +84,7 @@ func (t *TxPoolUpdate) DecodeRLP(stream *rlp.Stream) error { return nil } +// EncodeRLP encode struct to binary stream func (t *TxPoolUpdate) EncodeRLP(w io.Writer) error { switch t.Tx.(type) { case *types.EthTransaction: @@ -98,6 +104,7 @@ func (t *TxPoolUpdate) EncodeRLP(w io.Writer) error { return rlp.Encode(w, t.Tx) } +// SubscribeTxPoolUpdate subscribe tx pool update event func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, local bool)) { pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("txpool_update_%d", shardID)) for message := range pubsub.Channel() { @@ -111,6 +118,7 @@ func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, loc } } +// PublishTxPoolUpdate publish tx pool update event func PublishTxPoolUpdate(shardID uint32, tx types.PoolTransaction, local bool) error { txu := &TxPoolUpdate{Local: local, Tx: tx} msg, err := rlp.EncodeToBytes(txu) diff --git a/internal/tikv/remote/remote_batch.go b/internal/tikv/remote/remote_batch.go index 034482ea17..cb471ba2dc 100644 --- a/internal/tikv/remote/remote_batch.go +++ b/internal/tikv/remote/remote_batch.go @@ -22,6 +22,7 @@ func newRemoteBatch(db *RemoteDatabase) *RemoteBatch { return &RemoteBatch{db: db} } +// Put inserts the given value into the key-value data store. func (b *RemoteBatch) Put(key []byte, value []byte) error { if len(key) == 0 { return common.ErrEmptyKey @@ -40,6 +41,7 @@ func (b *RemoteBatch) Put(key []byte, value []byte) error { return nil } +// Delete removes the key from the key-value data store. func (b *RemoteBatch) Delete(key []byte) error { b.lock.Lock() defer b.lock.Unlock() @@ -49,10 +51,12 @@ func (b *RemoteBatch) Delete(key []byte) error { return nil } +// ValueSize retrieves the amount of data queued up for writing. func (b *RemoteBatch) ValueSize() int { return b.size } +// Write flushes any accumulated data to disk. func (b *RemoteBatch) Write() error { b.lock.Lock() defer b.lock.Unlock() @@ -74,6 +78,7 @@ func (b *RemoteBatch) Write() error { return nil } +// Reset resets the batch for reuse. func (b *RemoteBatch) Reset() { b.lock.Lock() defer b.lock.Unlock() @@ -84,6 +89,7 @@ func (b *RemoteBatch) Reset() { b.size = 0 } +// Replay replays the batch contents. func (b *RemoteBatch) Replay(w ethdb.KeyValueWriter) error { for i, key := range b.batchWriteKey { if bytes.Compare(b.batchWriteValue[i], EmptyValueStub) == 0 { diff --git a/internal/tikv/remote/remote_database.go b/internal/tikv/remote/remote_database.go index 1a2c896ee3..c16ff9d9d0 100644 --- a/internal/tikv/remote/remote_database.go +++ b/internal/tikv/remote/remote_database.go @@ -33,10 +33,12 @@ func NewRemoteDatabase(pdAddr []string, readOnly bool) (*RemoteDatabase, error) return db, nil } +// ReadOnly set storage to readonly mode func (d *RemoteDatabase) ReadOnly() { d.readOnly = true } +// Has retrieves if a key is present in the key-value data store. func (d *RemoteDatabase) Has(key []byte) (bool, error) { data, err := d.Get(key) if err != nil { @@ -49,6 +51,7 @@ func (d *RemoteDatabase) Has(key []byte) (bool, error) { } } +// Get retrieves the given key if it's present in the key-value data store. func (d *RemoteDatabase) Get(key []byte) ([]byte, error) { if len(key) == 0 { return nil, common.ErrEmptyKey @@ -73,6 +76,7 @@ func (d *RemoteDatabase) Get(key []byte) ([]byte, error) { return get, nil } +// Put inserts the given value into the key-value data store. func (d *RemoteDatabase) Put(key []byte, value []byte) error { if len(key) == 0 { return common.ErrEmptyKey @@ -88,6 +92,7 @@ func (d *RemoteDatabase) Put(key []byte, value []byte) error { return d.client.Put(context.Background(), key, value) } +// Delete removes the key from the key-value data store. func (d *RemoteDatabase) Delete(key []byte) error { if len(key) == 0 { return common.ErrEmptyKey @@ -99,6 +104,8 @@ func (d *RemoteDatabase) Delete(key []byte) error { return d.client.Delete(context.Background(), key) } +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. func (d *RemoteDatabase) NewBatch() ethdb.Batch { if d.readOnly { return newNopRemoteBatch(d) @@ -111,14 +118,17 @@ func (d *RemoteDatabase) NewIterator(start, end []byte) ethdb.Iterator { return newRemoteIterator(d, start, end) } +// Stat returns a particular internal stat of the database. func (d *RemoteDatabase) Stat(property string) (string, error) { return "", common.ErrNotFound } +// Compact tikv current not supprot manual compact func (d *RemoteDatabase) Compact(start []byte, limit []byte) error { return nil } +// Close disconnect the tikv func (d *RemoteDatabase) Close() error { if atomic.CompareAndSwapUint64(&d.isClose, 0, 1) { return d.client.Close() diff --git a/internal/tikv/remote/remote_iterator.go b/internal/tikv/remote/remote_iterator.go index 0a7b97f5ec..7eaf34cf22 100644 --- a/internal/tikv/remote/remote_iterator.go +++ b/internal/tikv/remote/remote_iterator.go @@ -32,6 +32,8 @@ func newRemoteIterator(db *RemoteDatabase, start, limit []byte) *RemoteIterator } } +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. func (i *RemoteIterator) Next() bool { if i.end { return false @@ -54,6 +56,7 @@ func (i *RemoteIterator) Next() bool { return true } +// scanNext real scan from tikv, and cache it func (i *RemoteIterator) scanNext() bool { keys, values, err := i.db.client.Scan(context.Background(), i.start, i.limit, iteratorOnce) if err != nil { @@ -75,18 +78,28 @@ func (i *RemoteIterator) scanNext() bool { return true } +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. func (i *RemoteIterator) Error() error { return i.err } +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. func (i *RemoteIterator) Key() []byte { return i.currentKey } +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. func (i *RemoteIterator) Value() []byte { return i.currentValue } +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. func (i *RemoteIterator) Release() { i.db = nil i.end = true diff --git a/internal/tikv/remote/remote_nop_batch.go b/internal/tikv/remote/remote_nop_batch.go index 59e6940015..d58cbf9c74 100644 --- a/internal/tikv/remote/remote_nop_batch.go +++ b/internal/tikv/remote/remote_nop_batch.go @@ -4,6 +4,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" ) +// NopRemoteBatch on readonly mode, write operator will be reject type NopRemoteBatch struct { } diff --git a/internal/tikv/statedb_cache/statedb_cache_database.go b/internal/tikv/statedb_cache/statedb_cache_database.go index 94a4c8915d..992d9db3d9 100644 --- a/internal/tikv/statedb_cache/statedb_cache_database.go +++ b/internal/tikv/statedb_cache/statedb_cache_database.go @@ -16,11 +16,13 @@ type cacheWrapper struct { *fastcache.Cache } +// Put inserts the given value into the key-value data store. func (c *cacheWrapper) Put(key []byte, value []byte) error { c.Cache.Set(key, value) return nil } +// Delete removes the key from the key-value data store. func (c *cacheWrapper) Delete(key []byte) error { c.Cache.Del(key) return nil @@ -32,11 +34,13 @@ type redisPipelineWrapper struct { expiredTime time.Duration } +// Put inserts the given value into the key-value data store. func (c *redisPipelineWrapper) Put(key []byte, value []byte) error { c.SetEX(context.Background(), string(key), value, c.expiredTime) return nil } +// Delete removes the key from the key-value data store. func (c *redisPipelineWrapper) Delete(key []byte) error { c.Del(context.Background(), string(key)) return nil @@ -74,7 +78,10 @@ type StateDBCacheDatabase struct { } func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfig, readOnly bool) (*StateDBCacheDatabase, error) { - cache := fastcache.LoadFromFileOrNew(config.CachePersistencePath, int(config.CacheSizeInMB)*1024*1024) // 10GB + // create or load memory cache from file + cache := fastcache.LoadFromFileOrNew(config.CachePersistencePath, int(config.CacheSizeInMB)*1024*1024) + + // create options option := &redis.ClusterOptions{ Addrs: config.RedisServerAddr, PoolSize: 32, @@ -86,12 +93,14 @@ func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfi option.ReadOnly = true } + // check redis connection rdb := redis.NewClusterClient(option) err := rdb.Ping(context.Background()).Err() if err != nil { return nil, err } + // reload redis cluster slots status rdb.ReloadState(context.Background()) db := &StateDBCacheDatabase{ @@ -105,10 +114,13 @@ func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfi } if !readOnly { + // Read a copy of the memory hit data into redis to improve the hit rate + // refresh read time to prevent recycling by lru db.l2ExpiredRefresh = make(chan string, 100000) go db.startL2ExpiredRefresh() } + // print debug info if config.DebugHitRate { go func() { for range time.Tick(5 * time.Second) { @@ -120,21 +132,27 @@ func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfi return db, nil } +// Has retrieves if a key is present in the key-value data store. func (c *StateDBCacheDatabase) Has(key []byte) (bool, error) { return c.remoteDB.Has(key) } +// Get retrieves the given key if it's present in the key-value data store. func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) { if c.enableReadCache { var ok bool + + // first, get data from memory cache keyStr := string(key) if ret, ok = c.l1Cache.HasGet(nil, key); ok { if !c.l2ReadOnly { select { + // refresh read time to prevent recycling by lru case c.l2ExpiredRefresh <- keyStr: default: } } + atomic.AddUint64(&c.l1HitCount, 1) return ret, nil } @@ -142,6 +160,7 @@ func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) { defer func() { if err == nil { if len(ret) < 64*1024 { + // set data to memory db if loaded from redis cache or leveldb c.l1Cache.Set(key, ret) } } @@ -150,6 +169,7 @@ func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) { timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFunc() + // load data from redis cache data := c.l2Cache.Get(timeoutCtx, keyStr) if data.Err() == nil { atomic.AddUint64(&c.l2HitCount, 1) @@ -160,6 +180,7 @@ func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) { if !c.l2ReadOnly { defer func() { + // set data to redis db if loaded from leveldb if err == nil { c.l2Cache.SetEX(context.Background(), keyStr, ret, c.l2ExpiredTime) } @@ -177,17 +198,20 @@ func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) { return } +// Put inserts the given value into the key-value data store. func (c *StateDBCacheDatabase) Put(key []byte, value []byte) (err error) { if c.enableReadCache { defer func() { if err == nil { if len(value) < 64*1024 { + // memory db only accept the small data c.l1Cache.Set(key, value) } if !c.l2ReadOnly { timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFunc() + // send the data to redis res := c.l2Cache.SetEX(timeoutCtx, string(key), value, c.l2ExpiredTime) if res.Err() == nil { log.Printf("[Put WARN]Redis Error: %v", res.Err()) @@ -200,6 +224,7 @@ func (c *StateDBCacheDatabase) Put(key []byte, value []byte) (err error) { return c.remoteDB.Put(key, value) } +// Delete removes the key from the key-value data store. func (c *StateDBCacheDatabase) Delete(key []byte) (err error) { if c.enableReadCache { defer func() { @@ -215,10 +240,13 @@ func (c *StateDBCacheDatabase) Delete(key []byte) (err error) { return c.remoteDB.Delete(key) } +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. func (c *StateDBCacheDatabase) NewBatch() ethdb.Batch { return newStateDBCacheBatch(c, c.remoteDB.NewBatch()) } +// Stat returns a particular internal stat of the database. func (c *StateDBCacheDatabase) Stat(property string) (string, error) { switch property { case "tikv.save.cache": @@ -235,6 +263,7 @@ func (c *StateDBCacheDatabase) NewIterator(start, end []byte) ethdb.Iterator { return c.remoteDB.NewIterator(start, end) } +// Close disconnect the redis and save memory cache to file func (c *StateDBCacheDatabase) Close() error { if atomic.CompareAndSwapUint64(&c.isClose, 0, 1) { err := c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU()) @@ -259,6 +288,7 @@ func (c *StateDBCacheDatabase) Close() error { return nil } +// cacheWrite write batch to cache func (c *StateDBCacheDatabase) cacheWrite(b ethdb.Batch) error { if !c.l2ReadOnly { pipeline := c.l2Cache.Pipeline() @@ -283,6 +313,7 @@ func (c *StateDBCacheDatabase) refreshL2ExpiredTime() { c.l2NextExpiredTime = time.Unix(unix, 0) } +// startL2ExpiredRefresh batch refresh redis cache func (c *StateDBCacheDatabase) startL2ExpiredRefresh() { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -313,6 +344,7 @@ func (c *StateDBCacheDatabase) startL2ExpiredRefresh() { } } +// print stats to stdout func (c *StateDBCacheDatabase) cacheStatusPrint() { var state = &fastcache.Stats{} state.Reset() diff --git a/node/node.go b/node/node.go index 0eda113dd4..e7d1e4063b 100644 --- a/node/node.go +++ b/node/node.go @@ -154,6 +154,7 @@ func (node *Node) Blockchain() *core.BlockChain { // Beaconchain returns the beaconchain from node. func (node *Node) Beaconchain() *core.BlockChain { + // tikv mode not have the BeaconChain storage if node.HarmonyConfig.General.UseTiKV && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID { return nil } @@ -217,6 +218,7 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error { return nil } + // in tikv mode, reader only accept the pending transaction from writer node, ignore the p2p message if node.HarmonyConfig.General.UseTiKV && node.HarmonyConfig.TiKV.Role == "Reader" { log.Printf("skip reader addPendingTransactions: %#v", newTxs) return nil @@ -434,6 +436,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) ( case proto_node.Sync: nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "block_sync"}).Inc() + // in tikv mode, not need BeaconChain message if node.HarmonyConfig.General.UseTiKV && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID { return nil, 0, errIgnoreBeaconMsg } @@ -1020,6 +1023,7 @@ func New( blockchain := node.Blockchain() // this also sets node.isFirstTime if the DB is fresh beaconChain := node.Beaconchain() if b1, b2 := beaconChain == nil, blockchain == nil; b1 || b2 { + // in tikv mode, not need BeaconChain if !(node.HarmonyConfig.General.UseTiKV && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID) { var err error if b2 { @@ -1052,6 +1056,7 @@ func New( txPoolConfig.Blacklist = blacklist txPoolConfig.Journal = fmt.Sprintf("%v/%v", node.NodeConfig.DBDir, txPoolConfig.Journal) txPoolConfig.AddEvent = func(tx types.PoolTransaction, local bool) { + // in tikv mode, writer will publish tx pool update to all reader if preempt := node.Blockchain().RedisPreempt(); preempt != nil && preempt.LastLockStatus() { err := redis_helper.PublishTxPoolUpdate(uint32(harmonyconfig.General.ShardID), tx, local) if err != nil { @@ -1067,6 +1072,7 @@ func New( node.deciderCache, _ = lru.New(16) node.committeeCache, _ = lru.New(16) + // in tikv mode, not need BeaconChain if !node.HarmonyConfig.General.UseTiKV && node.Blockchain().ShardID() != shard.BeaconChainShardID { node.BeaconWorker = worker.New( node.Beaconchain().Config(), beaconChain, engine, @@ -1121,6 +1127,7 @@ func New( }() } + // in tikv mode, not need BeaconChain if !node.HarmonyConfig.General.UseTiKV && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID { // update reward values now that node is ready node.updateInitialRewardValues() @@ -1403,9 +1410,11 @@ func (node *Node) IsRunningBeaconChain() bool { return node.NodeConfig.ShardID == shard.BeaconChainShardID } +// syncFromTiKVWriter used for tikv mode, subscribe data from tikv writer func (node *Node) syncFromTiKVWriter() { bc := node.Blockchain() + // subscribe block update go redis_helper.SubscribeShardUpdate(bc.ShardID(), func(blkNum uint64, logs []*types.Log) { err := bc.SyncFromTiKVWriter(blkNum, logs) if err != nil { @@ -1416,6 +1425,7 @@ func (node *Node) syncFromTiKVWriter() { } }) + // subscribe txpool update if node.HarmonyConfig.TiKV.Role == "Reader" { go redis_helper.SubscribeTxPoolUpdate(bc.ShardID(), func(tx types.PoolTransaction, local bool) { var err error diff --git a/node/node_explorer.go b/node/node_explorer.go index 28712417c4..f50545829e 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -159,6 +159,7 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) { // Clean up the blocks to avoid OOM. node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64()) + // if in tikv mode, only master writer node need dump all explorer block if !node.HarmonyConfig.General.UseTiKV || node.Blockchain().RedisPreempt().LastLockStatus() { // Do dump all blocks from state syncing for explorer one time // TODO: some blocks can be dumped before state syncing finished. @@ -176,8 +177,12 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) { if block.NumberU64() == 0 { return } + + // get checkpoint bitmap and flip all bit bitmap := exp.GetCheckpointBitmap() bitmap.Flip(0, block.NumberU64()) + + // find all not processed block and dump it iterator := bitmap.ReverseIterator() for iterator.HasNext() { exp.DumpCatchupBlock(node.Blockchain().GetBlockByNumber(iterator.Next())) @@ -190,13 +195,9 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) { } } -func (node *Node) AddNewBlockForTiKV(block *types.Block) { - // Clean up the blocks to avoid OOM. - node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64()) -} - // ExplorerMessageHandler passes received message in node_handler to explorer service. func (node *Node) commitBlockForExplorer(block *types.Block) { + // if in tikv mode, only master writer node need dump explorer block if !node.HarmonyConfig.General.UseTiKV || (node.HarmonyConfig.TiKV.Role == "Writer" && node.Blockchain().RedisPreempt().LastLockStatus()) { if block.ShardID() != node.NodeConfig.ShardID { return