Skip to content

Commit

Permalink
Merge pull request #23 from multiversx/data-pool-fixes
Browse files Browse the repository at this point in the history
Data pool improvements
  • Loading branch information
ssd04 authored Aug 7, 2024
2 parents 1544e6c + 40e9bfc commit 7564475
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
[OutportBlocksStorage.Cache]
Name = "OutportBlocksStorage"
# Cache capacity and size has to be higher than max allowed deltas per shards in total
Capacity = 100
Capacity = 160
Type = "SizeLRU"
SizeInBytes = 209715200 # 200MB
[OutportBlocksStorage.DB]
Expand Down
7 changes: 6 additions & 1 deletion cmd/connector/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ var (
dbMode = cli.StringFlag{
Name: "db-mode",
Usage: "Option for specifying db mode. Available options: `full-persister`, `import-db`, `optimized-persister`",
Value: "optimized-persister",
Value: "full-persister",
}

enableGrpcServer = cli.BoolFlag{
Name: "enable-grpc-server",
Usage: "Option for enabling grpc server",
}

resetCheckpoints = cli.BoolFlag{
Name: "reset-checkpoints",
Usage: "Option for resetting state checkpoints",
}
)
6 changes: 5 additions & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
disableAnsiColor,
dbMode,
enableGrpcServer,
resetCheckpoints,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -82,7 +83,10 @@ func startConnector(ctx *cli.Context) error {
enableGrpcServer := ctx.GlobalBool(enableGrpcServer.Name)
log.Info("grpc server enabled", "enableGrpcServer", enableGrpcServer)

connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode, enableGrpcServer)
resetCheckpoints := ctx.GlobalBool(resetCheckpoints.Name)
log.Info("reset checkpoint at start", "resetCheckpoints", resetCheckpoints)

connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode, enableGrpcServer, resetCheckpoints)
if err != nil {
return fmt.Errorf("cannot create connector runner, error: %w", err)
}
Expand Down
11 changes: 11 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ func ConvertFirstCommitableBlocks(blocks []config.FirstCommitableBlock) (map[uin

return newBlocks, nil
}

// DeepCopyNoncesMap will deep copy nonces map
func DeepCopyNoncesMap(originalMap map[uint32]uint64) map[uint32]uint64 {
newMap := make(map[uint32]uint64)

for key, value := range originalMap {
newMap[key] = value
}

return newMap
}
10 changes: 7 additions & 3 deletions connector/connectorRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ type connectorRunner struct {
config *config.Config
dbMode common.DBMode
enableGrpcServer bool
resetCheckpoints bool
}

// NewConnectorRunner will create a new connector runner instance
func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool) (*connectorRunner, error) {
func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool, resetCheckpoints bool) (*connectorRunner, error) {
if cfg == nil {
return nil, ErrNilConfig
}
Expand All @@ -37,6 +38,7 @@ func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool
config: cfg,
dbMode: common.DBMode(dbMode),
enableGrpcServer: enableGrpcServer,
resetCheckpoints: resetCheckpoints,
}, nil
}

Expand Down Expand Up @@ -70,7 +72,8 @@ func (cr *connectorRunner) Run() error {
Marshaller: protoMarshaller,
MaxDelta: cr.config.DataPool.MaxDelta,
CleanupInterval: cr.config.DataPool.PruningWindow,
FirstCommitableBlocks: firstCommitableBlocks,
FirstCommitableBlocks: common.DeepCopyNoncesMap(firstCommitableBlocks),
ResetCheckpoints: cr.resetCheckpoints,
}
dataPool, err := process.NewDataPool(argsBlocksPool)
if err != nil {
Expand Down Expand Up @@ -101,7 +104,7 @@ func (cr *connectorRunner) Run() error {
DataAggregator: dataAggregator,
RetryDurationInMilliseconds: cr.config.Publisher.RetryDurationInMiliseconds,
Marshalizer: protoMarshaller,
FirstCommitableBlocks: firstCommitableBlocks,
FirstCommitableBlocks: common.DeepCopyNoncesMap(firstCommitableBlocks),
}

publisherHandler, err := process.NewPublisherHandler(publisherHandlerArgs)
Expand All @@ -114,6 +117,7 @@ func (cr *connectorRunner) Run() error {
gogoProtoMarshaller,
blocksPool,
outportBlockConverter,
common.DeepCopyNoncesMap(firstCommitableBlocks),
)
if err != nil {
return fmt.Errorf("cannot create ws firehose data processor, error: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions process/blocksPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (bp *blocksPool) GetMetaBlock(hash []byte) (*hyperOutportBlocks.MetaOutport
metaOutportBlock := &hyperOutportBlocks.MetaOutportBlock{}
err = bp.marshaller.Unmarshal(metaOutportBlock, marshalledData)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshall meta outport block: %w", err)
}

return metaOutportBlock, nil
Expand All @@ -108,7 +108,7 @@ func (bp *blocksPool) GetShardBlock(hash []byte) (*hyperOutportBlocks.ShardOutpo
shardOutportBlock := &hyperOutportBlocks.ShardOutportBlock{}
err = bp.marshaller.Unmarshal(shardOutportBlock, marshalledData)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshall shard outport block: %w", err)
}

return shardOutportBlock, nil
Expand Down
85 changes: 66 additions & 19 deletions process/dataPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-ws-connector-firehose-go/common"
"github.com/multiversx/mx-chain-ws-connector-firehose-go/data"
)

const (
// MetaCheckpointKey defines the meta checkpoint key
MetaCheckpointKey = "lastMetaIndex"

softMetaCheckpointKey = "softLastCheckpoint"

initIndex = 0
minDelta = 3
)
Expand All @@ -25,9 +28,12 @@ type dataPool struct {
maxDelta uint64
cleanupInterval uint64
firstCommitableBlocks map[uint32]uint64
resetCheckpoints bool

previousIndexesMap map[uint32]uint64
mutMap sync.RWMutex
// soft checkpoint data is being used only at startup (without any previous data)
// until there is a valid hard checkpoint (set by publisher)
softCheckpointMap map[uint32]uint64
mutMap sync.RWMutex
}

// DataPoolArgs defines the arguments needed to create the blocks pool component
Expand All @@ -37,6 +43,7 @@ type DataPoolArgs struct {
MaxDelta uint64
CleanupInterval uint64
FirstCommitableBlocks map[uint32]uint64
ResetCheckpoints bool
}

// NewDataPool will create a new data pool instance
Expand All @@ -60,6 +67,7 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) {
maxDelta: args.MaxDelta,
cleanupInterval: args.CleanupInterval,
firstCommitableBlocks: args.FirstCommitableBlocks,
resetCheckpoints: args.ResetCheckpoints,
}

bp.initIndexesMap()
Expand All @@ -68,22 +76,30 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) {
}

func (bp *dataPool) initIndexesMap() {
lastCheckpoint, err := bp.GetLastCheckpoint()
if err != nil || lastCheckpoint == nil || lastCheckpoint.LastNonces == nil {
log.Warn("failed to get last checkpoint, will set empty indexes map", "error", err)
if bp.resetCheckpoints {
log.Warn("initializing with reset checkpoints option, will set empty soft checkpoint")
bp.softCheckpointMap = make(map[uint32]uint64)
return
}

indexesMap := make(map[uint32]uint64)
bp.previousIndexesMap = indexesMap
lastCheckpoint, err := bp.GetLastCheckpoint()
if err == nil {
log.Info("initIndexesMap", "lastCheckpoint", lastCheckpoint)
bp.softCheckpointMap = lastCheckpoint.GetLastNonces()
return
}

log.Info("initIndexesMap", "lastCheckpoint", lastCheckpoint)
log.Warn("failed to get last checkpoint, will try to set soft last checkpoint", "error", err)

indexesMap := make(map[uint32]uint64, len(lastCheckpoint.LastNonces))
for shardID, nonce := range lastCheckpoint.LastNonces {
indexesMap[shardID] = nonce
softCheckpoint, err := bp.getLastSoftCheckpoint()
if err == nil {
log.Info("initIndexesMap", "softCheckpoint", softCheckpoint)
bp.softCheckpointMap = softCheckpoint.GetLastNonces()
return
}
bp.previousIndexesMap = indexesMap

log.Warn("failed to get last soft checkpoint, will set empty soft checkpoint", "error", err)
bp.softCheckpointMap = make(map[uint32]uint64)
}

// Put will put value into storer
Expand Down Expand Up @@ -136,7 +152,7 @@ func (bp *dataPool) getLastIndex(shardID uint32) uint64 {
}
}

lastIndex, ok := bp.previousIndexesMap[shardID]
lastIndex, ok := bp.softCheckpointMap[shardID]
if !ok {
return initIndex
}
Expand All @@ -161,7 +177,7 @@ func (bp *dataPool) PutBlock(hash []byte, value []byte, newIndex uint64, shardID

lastIndex := bp.getLastIndex(shardID)
if lastIndex == initIndex {
bp.previousIndexesMap[shardID] = newIndex
bp.softCheckpointMap[shardID] = newIndex
return bp.storer.Put(hash, value)
}

Expand Down Expand Up @@ -198,27 +214,58 @@ func (bp *dataPool) setCheckpoint(checkpoint *data.BlockCheckpoint) error {

// GetLastCheckpoint returns last checkpoint data
func (bp *dataPool) GetLastCheckpoint() (*data.BlockCheckpoint, error) {
checkpointBytes, err := bp.storer.Get([]byte(MetaCheckpointKey))
return bp.getCheckpointData(MetaCheckpointKey)
}

func (bp *dataPool) getLastSoftCheckpoint() (*data.BlockCheckpoint, error) {
return bp.getCheckpointData(softMetaCheckpointKey)
}

func (bp *dataPool) getCheckpointData(checkpointKey string) (*data.BlockCheckpoint, error) {
checkpointBytes, err := bp.storer.Get([]byte(checkpointKey))
if err != nil {
return nil, fmt.Errorf("failed to get checkpoint data from storer: %w", err)
return nil, fmt.Errorf("failed to get checkpoint data (key = %s) from storer: %w", checkpointKey, err)
}

checkpoint := &data.BlockCheckpoint{}
err = bp.marshaller.Unmarshal(checkpoint, checkpointBytes)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall checkpoint data: %w", err)
return nil, fmt.Errorf("failed to unmarshall checkpoint (key = %s) data: %w", checkpointKey, err)
}

if checkpoint == nil || checkpoint.LastNonces == nil {
return nil, fmt.Errorf("nil checkpoint data has been provided")
return nil, fmt.Errorf("nil checkpoint data (key = %s) has been provided", checkpointKey)
}

return checkpoint, nil
}

func (bp *dataPool) saveLastSoftCheckpoint() error {
bp.mutMap.RLock()
softCheckpointMap := common.DeepCopyNoncesMap(bp.softCheckpointMap)
bp.mutMap.RUnlock()

softCheckpoint := &data.BlockCheckpoint{}
softCheckpoint.LastNonces = softCheckpointMap

checkpointBytes, err := bp.marshaller.Marshal(softCheckpoint)
if err != nil {
return fmt.Errorf("failed to marshall publish soft checkpoint data: %w", err)
}

log.Debug("saveLastSoftCheckpoint", "previousIndexesMap", softCheckpointMap)

return bp.storer.Put([]byte(softMetaCheckpointKey), checkpointBytes)
}

// Close will trigger close on blocks pool component
func (bp *dataPool) Close() error {
err := bp.storer.Close()
err := bp.saveLastSoftCheckpoint()
if err != nil {
return err
}

err = bp.storer.Close()
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions process/dataPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func createDefaultBlocksPoolArgs() process.DataPoolArgs {
return process.DataPoolArgs{
Storer: &testscommon.PruningStorerMock{},
Marshaller: gogoProtoMarshaller,
Marshaller: protoMarshaller,
MaxDelta: 10,
CleanupInterval: 100,
FirstCommitableBlocks: map[uint32]uint64{
Expand Down Expand Up @@ -405,9 +405,10 @@ func TestDataPool_Close(t *testing.T) {
return nil
},
}
bp, _ := process.NewDataPool(args)
bp, err := process.NewDataPool(args)
require.Nil(t, err)

err := bp.Close()
err = bp.Close()
require.Nil(t, err)

require.True(t, wasCalled)
Expand Down
Loading

0 comments on commit 7564475

Please sign in to comment.