Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
LuttyYang committed Jun 15, 2022
1 parent 6c28424 commit 0bc8c59
Show file tree
Hide file tree
Showing 25 changed files with 211 additions and 12 deletions.
2 changes: 2 additions & 0 deletions api/service/explorer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type explorerDB struct {
db ethdb.KeyValueStore
}

// newExplorerLvlDB new explorer storage using leveldb
func newExplorerLvlDB(dbPath string) (database, error) {
db, err := leveldb.New(dbPath, 16, 500, "explorer_db")
if err != nil {
Expand All @@ -47,6 +48,7 @@ func newExplorerLvlDB(dbPath string) (database, error) {
return &explorerDB{db}, nil
}

// newExplorerTiKv new explorer storage using leveldb
func newExplorerTiKv(pdAddr []string, dbPath string, readOnly bool) (database, error) {
prefixStr := append([]byte(path.Base(dbPath)), '/')
db, err := remote.NewRemoteDatabase(pdAddr, readOnly)
Expand Down
2 changes: 2 additions & 0 deletions api/service/explorer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
oneAddrByteLen = 42 // byte size of string "one1..."
)

// readCheckpointBitmap read explorer checkpoint bitmap from storage
func readCheckpointBitmap(db databaseReader) (*roaring64.Bitmap, error) {
bitmapByte, err := db.Get([]byte(CheckpointBitmap))
if err != nil {
Expand All @@ -38,6 +39,7 @@ func readCheckpointBitmap(db databaseReader) (*roaring64.Bitmap, error) {
return rb, nil
}

// writeCheckpointBitmap write explorer checkpoint bitmap to storage
func writeCheckpointBitmap(db databaseWriter, rb *roaring64.Bitmap) error {
bitmapByte, err := rb.MarshalBinary()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions api/service/explorer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

// GetCheckpointBitmap get explorer checkpoint bitmap
func (s *Service) GetCheckpointBitmap() *roaring64.Bitmap {
return s.storage.rb.Clone()
}
Expand Down
7 changes: 7 additions & 0 deletions api/service/explorer/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ func newStorage(hc *harmonyconfig.HarmonyConfig, bc *core2.BlockChain, dbPath st
var err error

if hc.General.UseTiKV {
// init the storage using tikv
dbPath = fmt.Sprintf("explorer_tikv_%d", hc.General.ShardID)
readOnly := hc.TiKV.Role == "Reader"
utils.Logger().Info().Msg("explorer storage in tikv: " + dbPath)
db, err = newExplorerTiKv(hc.TiKV.PDAddr, dbPath, readOnly)
} else {
// or leveldb
utils.Logger().Info().Msg("explorer storage folder: " + dbPath)
db, err = newExplorerLvlDB(dbPath)
}
Expand All @@ -77,6 +79,8 @@ func newStorage(hc *harmonyconfig.HarmonyConfig, bc *core2.BlockChain, dbPath st
return nil, err
}

// load checkpoint roaring bitmap from storage
// roaring bitmap is a very high compression bitmap, in our scene, 1 million blocks use almost 1kb storage
bitmap, err := readCheckpointBitmap(db)
if err != nil {
return nil, err
Expand Down Expand Up @@ -271,20 +275,23 @@ func (tm *taskManager) PullTask() *types.Block {
return nil
}

// markBlockDone mark block processed done
func (tm *taskManager) markBlockDone(btc batch, blockNum uint64) {
tm.lock.Lock()
defer tm.lock.Unlock()

if tm.rb.CheckedAdd(blockNum) {
tm.rbChangedCount++

// every 100 change write once
if tm.rbChangedCount == 100 {
tm.rbChangedCount = 0
_ = writeCheckpointBitmap(btc, tm.rb)
}
}
}

// markBlockDone check block is processed done
func (tm *taskManager) hasBlockDone(blockNum uint64) bool {
tm.lock.Lock()
defer tm.lock.Unlock()
Expand Down
35 changes: 31 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ type BlockChain struct {
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping

latestCleanCacheNum uint64
cleanCacheChan chan uint64
redisPreempt *redis_helper.RedisPreempt
// The following two variables are used to clean up the cache of redis in tikv mode.
// This can improve the cache hit rate of redis
latestCleanCacheNum uint64 // The most recently cleaned cache of block num
cleanCacheChan chan uint64 // Used to notify blocks that will be cleaned up

// redisPreempt used in tikv mode, write nodes preempt for write permissions and publish updates to reader nodes
redisPreempt *redis_helper.RedisPreempt

hc *HeaderChain
trace bool // atomic?
Expand Down Expand Up @@ -711,7 +715,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) error {
return nil
}

// writeHeadBlock writes a new head block
// tikvFastForward writes a new head block in tikv mode, used for reader node or follower writer node
func (bc *BlockChain) tikvFastForward(block *types.Block, logs []*types.Log) error {
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
Expand Down Expand Up @@ -1249,6 +1253,7 @@ func (bc *BlockChain) WriteBlockWithState(
return NonStatTy, err
}

// clean block tire info in redis, used for tikv mode
if block.NumberU64() > triesInRedis {
select {
case bc.cleanCacheChan <- block.NumberU64() - triesInRedis:
Expand Down Expand Up @@ -1378,13 +1383,15 @@ func (bc *BlockChain) GetMaxGarbageCollectedBlockNumber() int64 {
//
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) {
// if in tikv mode, writer node need preempt master or come be a follower
if bc.redisPreempt != nil && !bc.tikvPreemptMaster(bc.rangeBlock(chain)) {
return len(chain), nil
}

n, events, logs, err := bc.insertChain(chain, verifyHeaders)
bc.PostChainEvents(events, logs)
if bc.redisPreempt != nil && err != nil {
// if has some error, master writer node will release the permission
_, _ = bc.redisPreempt.Unlock()
}
return n, err
Expand Down Expand Up @@ -1627,6 +1634,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block

// used for tikv mode, writer node will publish update to all reader node
err = redis_helper.PublishShardUpdate(bc.ShardID(), block.NumberU64(), logs)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis publish shard update error")
Expand Down Expand Up @@ -3211,6 +3219,7 @@ func (bc *BlockChain) IsEnablePruneBeaconChainFeature() bool {
return bc.pruneBeaconChainEnable
}

// SyncFromTiKVWriter used for tikv mode, all reader or follower writer used to sync block from master writer
func (bc *BlockChain) SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) error {
head := rawdb.ReadHeadBlockHash(bc.db)
dbBlock := bc.GetBlockByHash(head)
Expand Down Expand Up @@ -3241,22 +3250,26 @@ func (bc *BlockChain) SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) er
return nil
}

// tikvCleanCache used for tikv mode, clean block tire data from redis
func (bc *BlockChain) tikvCleanCache() {
var count int
for to := range bc.cleanCacheChan {
for i := bc.latestCleanCacheNum + 1; i <= to; i++ {
// build previous block statedb
fromBlock := bc.GetBlockByNumber(i)
fromTrie, err := state.New(fromBlock.Root(), bc.stateCache)
if err != nil {
continue
}

// build current block statedb
toBlock := bc.GetBlockByNumber(i + 1)
toTrie, err := state.New(toBlock.Root(), bc.stateCache)
if err != nil {
continue
}

// diff two statedb and delete redis cache
start := time.Now()
count, err = fromTrie.DiffAndCleanCache(bc.ShardID(), toTrie)
if err != nil {
Expand All @@ -3272,12 +3285,15 @@ func (bc *BlockChain) tikvCleanCache() {
}
}

// tikvPreemptMaster used for tikv mode, writer node need preempt master or come be a follower
func (bc *BlockChain) tikvPreemptMaster(fromBlock, toBlock uint64) bool {
for {
// preempt master
if ok, _ := bc.redisPreempt.TryLock(60); ok {
return true
}

// follower
if bc.CurrentBlock().NumberU64() >= toBlock {
return false
}
Expand All @@ -3286,6 +3302,7 @@ func (bc *BlockChain) tikvPreemptMaster(fromBlock, toBlock uint64) bool {
}
}

// rangeBlock get the block range of blocks
func (bc *BlockChain) rangeBlock(blocks types.Blocks) (uint64, uint64) {
if len(blocks) == 0 {
return 0, 0
Expand All @@ -3303,25 +3320,34 @@ func (bc *BlockChain) rangeBlock(blocks types.Blocks) (uint64, uint64) {
return min, max
}

// RedisPreempt used for tikv mode, get the redis preempt instance
func (bc *BlockChain) RedisPreempt() *redis_helper.RedisPreempt {
return bc.redisPreempt
}

// InitTiKV used for tikv mode, init the tikv mode
func (bc *BlockChain) InitTiKV(conf *harmonyconfig.TiKVConfig) {
bc.cleanCacheChan = make(chan uint64, 10)

if conf.Role == "Writer" {
// only writer need preempt permission
bc.redisPreempt = redis_helper.CreatePreempt(fmt.Sprintf("shard_%d_preempt", bc.ShardID()))
}

if conf.Debug {
// used for init redis cache
// If redis is empty, the hit rate will be too low and the synchronization block speed will be slow
// set LOAD_PRE_FETCH is yes can significantly improve this.
if os.Getenv("LOAD_PRE_FETCH") == "yes" {
if trie, err := state.New(bc.CurrentBlock().Root(), bc.stateCache); err == nil {
trie.Prefetch(512)
} else {
log.Println("LOAD_PRE_FETCH ERR: ", err)
}
}

// If redis is empty, there is no need to clear the cache of nearly 1000 blocks
// set CLEAN_INIT is the latest block num can skip slow and unneeded cleanup process
if os.Getenv("CLEAN_INIT") != "" {
from, err := strconv.Atoi(os.Getenv("CLEAN_INIT"))
if err == nil {
Expand All @@ -3330,6 +3356,7 @@ func (bc *BlockChain) InitTiKV(conf *harmonyconfig.TiKVConfig) {
}
}

// start clean block tire data process
go bc.tikvCleanCache()
}

Expand Down
17 changes: 16 additions & 1 deletion core/state/prefeth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ type prefetchJob struct {
start, end []byte
}

// Prefetch
// Prefetch If redis is empty, the hit rate will be too low and the synchronization block speed will be slow
// this function will parallel load the latest block statedb to redis
func (s *DB) Prefetch(parallel int) {
wg := sync.WaitGroup{}

jobChan := make(chan *prefetchJob, 10000)
waitWorker := int64(parallel)

// start parallel workers
for i := 0; i < parallel; i++ {
go func() {
defer wg.Done()
Expand All @@ -38,6 +40,7 @@ func (s *DB) Prefetch(parallel int) {
wg.Add(1)
}

// add first jobs
for i := 0; i < 255; i++ {
start := []byte{byte(i)}
end := []byte{byte(i + 1)}
Expand All @@ -53,6 +56,7 @@ func (s *DB) Prefetch(parallel int) {
}
}

// wait all worker done
start := time.Now()
log.Println("Prefetch start")
var sleepCount int
Expand All @@ -72,8 +76,10 @@ func (s *DB) Prefetch(parallel int) {
log.Println("Prefetch end, use", end.Sub(start))
}

// prefetchWorker used to process one job
func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) {
if job.account == nil {
// scan one account
nodeIterator := s.trie.NodeIterator(job.start)
it := trie.NewIterator(nodeIterator)

Expand All @@ -82,6 +88,7 @@ func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) {
return
}

// build account data from main trie tree
var data Account
if err := rlp.DecodeBytes(it.Value, &data); err != nil {
panic(err)
Expand All @@ -93,21 +100,27 @@ func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) {
obj.Code(s.db)
}

// build account trie tree
storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(nil))
storageJob := &prefetchJob{
accountAddr: addrBytes,
account: &data,
}

// fetch data
s.prefetchAccountStorage(jobs, storageJob, storageIt)
}
} else {
// scan main trie tree
obj := newObject(s, common.BytesToAddress(job.accountAddr), *job.account)
storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(job.start))

// fetch data
s.prefetchAccountStorage(jobs, job, storageIt)
}
}

// bytesMiddle get the binary middle value of a and b
func (s *DB) bytesMiddle(a, b []byte) []byte {
if a == nil && b == nil {
return []byte{128}
Expand Down Expand Up @@ -141,6 +154,7 @@ func (s *DB) bytesMiddle(a, b []byte) []byte {
return aI.Bytes()
}

// prefetchAccountStorage used for fetch account storage
func (s *DB) prefetchAccountStorage(jobs chan *prefetchJob, job *prefetchJob, it *trie.Iterator) {
start := time.Now()
count := 0
Expand All @@ -152,6 +166,7 @@ func (s *DB) prefetchAccountStorage(jobs chan *prefetchJob, job *prefetchJob, it

count++

// if fetch one account job used more than 15s, then split the job
if count%10000 == 0 && time.Now().Sub(start) > 15*time.Second {
middle := s.bytesMiddle(it.Key, job.end)
startJob := &prefetchJob{
Expand Down
8 changes: 8 additions & 0 deletions core/state/tikv_clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (

var secureKeyPrefix = []byte("secure-key-")

// DiffAndCleanCache clean block tire data from redis, Used to reduce redis storage and increase hit rate
func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) {
// create difference iterator
it, _ := trie.NewDifferenceIterator(to.trie.NodeIterator(nil), s.trie.NodeIterator(nil))
db, err := tikv_manage.GetDefaultTiKVFactory().NewCacheStateDB(shardId)
if err != nil {
Expand All @@ -24,9 +26,12 @@ func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) {
count := uint64(0)
for it.Next(true) {
if !it.Leaf() {
// delete it if trie leaf node
atomic.AddUint64(&count, 1)
_ = batch.Delete(it.Hash().Bytes())
} else {

// build account data
addrBytes := s.trie.GetKey(it.LeafKey())
addr := common.BytesToAddress(addrBytes)

Expand All @@ -41,14 +46,17 @@ func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) {
continue
}

// if account not changed, skip
if bytes.Compare(fromAccount.Root.Bytes(), toAccount.Root.Bytes()) == 0 {
continue
}

// create account difference iterator
fromAccountTrie := newObject(s, addr, fromAccount).getTrie(s.db)
toAccountTrie := newObject(to, addr, toAccount).getTrie(to.db)
accountIt, _ := trie.NewDifferenceIterator(toAccountTrie.NodeIterator(nil), fromAccountTrie.NodeIterator(nil))

// parallel to delete data
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
1 change: 1 addition & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ func (pool *TxPool) pendingEpoch() *big.Int {
func (pool *TxPool) add(tx types.PoolTransaction, local bool) (replaced bool, err error) {
defer func() {
if err == nil && pool.config.AddEvent != nil {
// used for tikv mode, writer will publish txpool change to all reader, this makes the state consistent
pool.config.AddEvent(tx, local)
}
}()
Expand Down
Loading

0 comments on commit 0bc8c59

Please sign in to comment.