Skip to content

Commit

Permalink
blocks pool with rounds map
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Mar 15, 2024
1 parent 590d499 commit 0878889
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 47 deletions.
2 changes: 1 addition & 1 deletion factory/wsConnectorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func CreateWSConnector(cfg config.WebSocketConfig) (process.WSConnector, error)
return nil, err
}

blocksPool, err := process.NewBlocksPool(cacher)
blocksPool, err := process.NewBlocksPool(cacher, blockContainer, protoMarshaller)
if err != nil {
return nil, err
}
Expand Down
117 changes: 71 additions & 46 deletions process/blocksPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,111 @@ package process

import (
"fmt"
"math"
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-storage-go/storageUnit"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-storage-go/types"
)

type blocksData struct {
blocksCacher types.Cacher
delta uint32
}

type blocksPool struct {
cacher types.Cacher
blocksMap map[uint32]*blocksData
cacher types.Cacher
blockCreator BlockContainerHandler
marshaller marshal.Marshalizer
maxDelta uint64

roundsMap map[uint32]uint64
mutMap sync.RWMutex
}

func NewBlocksPool(cacher types.Cacher) (*blocksPool, error) {
func NewBlocksPool(
cacher types.Cacher,
blockCreator BlockContainerHandler,
marshaller marshal.Marshalizer,
) (*blocksPool, error) {
numberOfShards := uint32(3)

roundsMap := make(map[uint32]uint64)
for shardID := uint32(0); shardID < numberOfShards; shardID++ {
roundsMap[shardID] = 0
}
roundsMap[core.MetachainShardId] = 0

bp := &blocksPool{
cacher: cacher,
cacher: cacher,
blockCreator: blockCreator,
roundsMap: roundsMap,
marshaller: marshaller,
maxDelta: 10,
}

numOfShards := uint32(3)
return bp, nil
}

bp.createBlocksMap(numOfShards)
func (bp *blocksPool) UpdateMetaRound(round uint64) {
bp.mutMap.Lock()
defer bp.mutMap.Unlock()

return bp, nil
bp.roundsMap[core.MetachainShardId] = round
}

func (bp *blocksPool) createBlocksMap(numOfShards uint32) error {
blocksMap := make(map[uint32]*blocksData)
func (bp *blocksPool) PutBlock(hash []byte, outportBlock *outport.OutportBlock) error {
bp.mutMap.Lock()
defer bp.mutMap.Unlock()

for i := uint32(0); i < numOfShards; i++ {
cacher, err := bp.createCacher()
if err != nil {
return err
}
shardID := outportBlock.ShardID

blocksMap[i] = &blocksData{
blocksCacher: cacher,
delta: 0,
}
round, ok := bp.roundsMap[shardID]
if !ok {
return fmt.Errorf("did not find shard id %d in blocksMap", shardID)
}

cacher, err := bp.createCacher()
if err != nil {
return err
if round == 0 {
bp.putOutportBlock(hash, outportBlock)
}

blocksMap[core.MetachainShardId] = &blocksData{
blocksCacher: cacher,
}
metaRound := bp.roundsMap[core.MetachainShardId]

bp.blocksMap = blocksMap
if !bp.shouldPutOutportBlock(round, metaRound) {
log.Error("failed to put outport block", "hash", hash, "round", round, "metaRound", metaRound)
return fmt.Errorf("failed to put outport block", "hash", hash, "round", round, "metaRound", metaRound)
}

return nil
return bp.putOutportBlock(hash, outportBlock)
}

func (bp *blocksPool) createCacher() (types.Cacher, error) {
cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.SizeLRUCache,
SizeInBytes: 209715200, // 200MB
Capacity: 100,
}
// should be run under mutex
func (bp *blocksPool) shouldPutOutportBlock(round, metaRound uint64) bool {
diff := float64(int64(round) - int64(metaRound))
delta := math.Abs(diff)

cacher, err := storageUnit.NewCache(cacheConfig)
if err != nil {
return nil, err
if math.Abs(delta) > float64(bp.maxDelta) {
return false
}

return cacher, nil
return true
}

func (bp *blocksPool) PutBlock(hash []byte, outportBlock *outport.OutportBlock) error {
blocksData, ok := bp.blocksMap[outportBlock.ShardID]
if !ok {
return fmt.Errorf("did not find shard id %d in blocksMap", outportBlock.ShardID)
// should be run under mutex
func (bp *blocksPool) putOutportBlock(hash []byte, outportBlock *outport.OutportBlock) error {
shardID := outportBlock.ShardID

blockCreator, err := bp.blockCreator.Get(core.HeaderType(outportBlock.BlockData.HeaderType))
if err != nil {
return err
}

header, err := block.GetHeaderFromBytes(bp.marshaller, blockCreator, outportBlock.BlockData.HeaderBytes)
if err != nil {
return err
}

_ = bp.cacher.Put(hash, outportBlock, 0)
bp.roundsMap[shardID] = header.GetRound()

return nil
}

Expand Down

0 comments on commit 0878889

Please sign in to comment.