diff --git a/cmd/connector/config/config.toml b/cmd/connector/config/config.toml index 7431256..ed8d985 100644 --- a/cmd/connector/config/config.toml +++ b/cmd/connector/config/config.toml @@ -57,7 +57,7 @@ [OutportBlocksStorage.Cache] Name = "OutportBlocksStorage" # Cache capacity and size has to be higher than max allowed deltas per shards in total - Capacity = 100 + Capacity = 160 Type = "SizeLRU" SizeInBytes = 209715200 # 200MB [OutportBlocksStorage.DB] diff --git a/cmd/connector/flags.go b/cmd/connector/flags.go index 990506f..81d0607 100644 --- a/cmd/connector/flags.go +++ b/cmd/connector/flags.go @@ -38,11 +38,16 @@ var ( dbMode = cli.StringFlag{ Name: "db-mode", Usage: "Option for specifying db mode. Available options: `full-persister`, `import-db`, `optimized-persister`", - Value: "optimized-persister", + Value: "full-persister", } enableGrpcServer = cli.BoolFlag{ Name: "enable-grpc-server", Usage: "Option for enabling grpc server", } + + resetCheckpoints = cli.BoolFlag{ + Name: "reset-checkpoints", + Usage: "Option for resetting state checkpoints", + } ) diff --git a/cmd/connector/main.go b/cmd/connector/main.go index 3a4b223..39b7ce6 100644 --- a/cmd/connector/main.go +++ b/cmd/connector/main.go @@ -37,6 +37,7 @@ func main() { disableAnsiColor, dbMode, enableGrpcServer, + resetCheckpoints, } app.Authors = []cli.Author{ { @@ -82,7 +83,10 @@ func startConnector(ctx *cli.Context) error { enableGrpcServer := ctx.GlobalBool(enableGrpcServer.Name) log.Info("grpc server enabled", "enableGrpcServer", enableGrpcServer) - connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode, enableGrpcServer) + resetCheckpoints := ctx.GlobalBool(resetCheckpoints.Name) + log.Info("reset checkpoint at start", "resetCheckpoints", resetCheckpoints) + + connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode, enableGrpcServer, resetCheckpoints) if err != nil { return fmt.Errorf("cannot create connector runner, error: %w", err) } diff --git a/common/common.go b/common/common.go index 93e6376..f4b20f4 100644 --- a/common/common.go +++ b/common/common.go @@ -36,3 +36,14 @@ func ConvertFirstCommitableBlocks(blocks []config.FirstCommitableBlock) (map[uin return newBlocks, nil } + +// DeepCopyNoncesMap will deep copy nonces map +func DeepCopyNoncesMap(originalMap map[uint32]uint64) map[uint32]uint64 { + newMap := make(map[uint32]uint64) + + for key, value := range originalMap { + newMap[key] = value + } + + return newMap +} diff --git a/connector/connectorRunner.go b/connector/connectorRunner.go index 8e89886..445daf0 100644 --- a/connector/connectorRunner.go +++ b/connector/connectorRunner.go @@ -25,10 +25,11 @@ type connectorRunner struct { config *config.Config dbMode common.DBMode enableGrpcServer bool + resetCheckpoints bool } // NewConnectorRunner will create a new connector runner instance -func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool) (*connectorRunner, error) { +func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool, resetCheckpoints bool) (*connectorRunner, error) { if cfg == nil { return nil, ErrNilConfig } @@ -37,6 +38,7 @@ func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool config: cfg, dbMode: common.DBMode(dbMode), enableGrpcServer: enableGrpcServer, + resetCheckpoints: resetCheckpoints, }, nil } @@ -70,7 +72,8 @@ func (cr *connectorRunner) Run() error { Marshaller: protoMarshaller, MaxDelta: cr.config.DataPool.MaxDelta, CleanupInterval: cr.config.DataPool.PruningWindow, - FirstCommitableBlocks: firstCommitableBlocks, + FirstCommitableBlocks: common.DeepCopyNoncesMap(firstCommitableBlocks), + ResetCheckpoints: cr.resetCheckpoints, } dataPool, err := process.NewDataPool(argsBlocksPool) if err != nil { @@ -101,7 +104,7 @@ func (cr *connectorRunner) Run() error { DataAggregator: dataAggregator, RetryDurationInMilliseconds: cr.config.Publisher.RetryDurationInMiliseconds, Marshalizer: protoMarshaller, - FirstCommitableBlocks: firstCommitableBlocks, + FirstCommitableBlocks: common.DeepCopyNoncesMap(firstCommitableBlocks), } publisherHandler, err := process.NewPublisherHandler(publisherHandlerArgs) @@ -114,6 +117,7 @@ func (cr *connectorRunner) Run() error { gogoProtoMarshaller, blocksPool, outportBlockConverter, + common.DeepCopyNoncesMap(firstCommitableBlocks), ) if err != nil { return fmt.Errorf("cannot create ws firehose data processor, error: %w", err) diff --git a/process/blocksPool.go b/process/blocksPool.go index 8c004a6..373fe22 100644 --- a/process/blocksPool.go +++ b/process/blocksPool.go @@ -92,7 +92,7 @@ func (bp *blocksPool) GetMetaBlock(hash []byte) (*hyperOutportBlocks.MetaOutport metaOutportBlock := &hyperOutportBlocks.MetaOutportBlock{} err = bp.marshaller.Unmarshal(metaOutportBlock, marshalledData) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to unmarshall meta outport block: %w", err) } return metaOutportBlock, nil @@ -108,7 +108,7 @@ func (bp *blocksPool) GetShardBlock(hash []byte) (*hyperOutportBlocks.ShardOutpo shardOutportBlock := &hyperOutportBlocks.ShardOutportBlock{} err = bp.marshaller.Unmarshal(shardOutportBlock, marshalledData) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to unmarshall shard outport block: %w", err) } return shardOutportBlock, nil diff --git a/process/dataPool.go b/process/dataPool.go index 5ac2cc9..497cc7c 100644 --- a/process/dataPool.go +++ b/process/dataPool.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-ws-connector-firehose-go/common" "github.com/multiversx/mx-chain-ws-connector-firehose-go/data" ) @@ -15,6 +16,8 @@ const ( // MetaCheckpointKey defines the meta checkpoint key MetaCheckpointKey = "lastMetaIndex" + softMetaCheckpointKey = "softLastCheckpoint" + initIndex = 0 minDelta = 3 ) @@ -25,9 +28,12 @@ type dataPool struct { maxDelta uint64 cleanupInterval uint64 firstCommitableBlocks map[uint32]uint64 + resetCheckpoints bool - previousIndexesMap map[uint32]uint64 - mutMap sync.RWMutex + // soft checkpoint data is being used only at startup (without any previous data) + // until there is a valid hard checkpoint (set by publisher) + softCheckpointMap map[uint32]uint64 + mutMap sync.RWMutex } // DataPoolArgs defines the arguments needed to create the blocks pool component @@ -37,6 +43,7 @@ type DataPoolArgs struct { MaxDelta uint64 CleanupInterval uint64 FirstCommitableBlocks map[uint32]uint64 + ResetCheckpoints bool } // NewDataPool will create a new data pool instance @@ -60,6 +67,7 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) { maxDelta: args.MaxDelta, cleanupInterval: args.CleanupInterval, firstCommitableBlocks: args.FirstCommitableBlocks, + resetCheckpoints: args.ResetCheckpoints, } bp.initIndexesMap() @@ -68,22 +76,30 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) { } func (bp *dataPool) initIndexesMap() { - lastCheckpoint, err := bp.GetLastCheckpoint() - if err != nil || lastCheckpoint == nil || lastCheckpoint.LastNonces == nil { - log.Warn("failed to get last checkpoint, will set empty indexes map", "error", err) + if bp.resetCheckpoints { + log.Warn("initializing with reset checkpoints option, will set empty soft checkpoint") + bp.softCheckpointMap = make(map[uint32]uint64) + return + } - indexesMap := make(map[uint32]uint64) - bp.previousIndexesMap = indexesMap + lastCheckpoint, err := bp.GetLastCheckpoint() + if err == nil { + log.Info("initIndexesMap", "lastCheckpoint", lastCheckpoint) + bp.softCheckpointMap = lastCheckpoint.GetLastNonces() return } - log.Info("initIndexesMap", "lastCheckpoint", lastCheckpoint) + log.Warn("failed to get last checkpoint, will try to set soft last checkpoint", "error", err) - indexesMap := make(map[uint32]uint64, len(lastCheckpoint.LastNonces)) - for shardID, nonce := range lastCheckpoint.LastNonces { - indexesMap[shardID] = nonce + softCheckpoint, err := bp.getLastSoftCheckpoint() + if err == nil { + log.Info("initIndexesMap", "softCheckpoint", softCheckpoint) + bp.softCheckpointMap = softCheckpoint.GetLastNonces() + return } - bp.previousIndexesMap = indexesMap + + log.Warn("failed to get last soft checkpoint, will set empty soft checkpoint", "error", err) + bp.softCheckpointMap = make(map[uint32]uint64) } // Put will put value into storer @@ -136,7 +152,7 @@ func (bp *dataPool) getLastIndex(shardID uint32) uint64 { } } - lastIndex, ok := bp.previousIndexesMap[shardID] + lastIndex, ok := bp.softCheckpointMap[shardID] if !ok { return initIndex } @@ -161,7 +177,7 @@ func (bp *dataPool) PutBlock(hash []byte, value []byte, newIndex uint64, shardID lastIndex := bp.getLastIndex(shardID) if lastIndex == initIndex { - bp.previousIndexesMap[shardID] = newIndex + bp.softCheckpointMap[shardID] = newIndex return bp.storer.Put(hash, value) } @@ -198,27 +214,58 @@ func (bp *dataPool) setCheckpoint(checkpoint *data.BlockCheckpoint) error { // GetLastCheckpoint returns last checkpoint data func (bp *dataPool) GetLastCheckpoint() (*data.BlockCheckpoint, error) { - checkpointBytes, err := bp.storer.Get([]byte(MetaCheckpointKey)) + return bp.getCheckpointData(MetaCheckpointKey) +} + +func (bp *dataPool) getLastSoftCheckpoint() (*data.BlockCheckpoint, error) { + return bp.getCheckpointData(softMetaCheckpointKey) +} + +func (bp *dataPool) getCheckpointData(checkpointKey string) (*data.BlockCheckpoint, error) { + checkpointBytes, err := bp.storer.Get([]byte(checkpointKey)) if err != nil { - return nil, fmt.Errorf("failed to get checkpoint data from storer: %w", err) + return nil, fmt.Errorf("failed to get checkpoint data (key = %s) from storer: %w", checkpointKey, err) } checkpoint := &data.BlockCheckpoint{} err = bp.marshaller.Unmarshal(checkpoint, checkpointBytes) if err != nil { - return nil, fmt.Errorf("failed to unmarshall checkpoint data: %w", err) + return nil, fmt.Errorf("failed to unmarshall checkpoint (key = %s) data: %w", checkpointKey, err) } if checkpoint == nil || checkpoint.LastNonces == nil { - return nil, fmt.Errorf("nil checkpoint data has been provided") + return nil, fmt.Errorf("nil checkpoint data (key = %s) has been provided", checkpointKey) } return checkpoint, nil } +func (bp *dataPool) saveLastSoftCheckpoint() error { + bp.mutMap.RLock() + softCheckpointMap := common.DeepCopyNoncesMap(bp.softCheckpointMap) + bp.mutMap.RUnlock() + + softCheckpoint := &data.BlockCheckpoint{} + softCheckpoint.LastNonces = softCheckpointMap + + checkpointBytes, err := bp.marshaller.Marshal(softCheckpoint) + if err != nil { + return fmt.Errorf("failed to marshall publish soft checkpoint data: %w", err) + } + + log.Debug("saveLastSoftCheckpoint", "previousIndexesMap", softCheckpointMap) + + return bp.storer.Put([]byte(softMetaCheckpointKey), checkpointBytes) +} + // Close will trigger close on blocks pool component func (bp *dataPool) Close() error { - err := bp.storer.Close() + err := bp.saveLastSoftCheckpoint() + if err != nil { + return err + } + + err = bp.storer.Close() if err != nil { return err } diff --git a/process/dataPool_test.go b/process/dataPool_test.go index b24689d..e95aa85 100644 --- a/process/dataPool_test.go +++ b/process/dataPool_test.go @@ -18,7 +18,7 @@ import ( func createDefaultBlocksPoolArgs() process.DataPoolArgs { return process.DataPoolArgs{ Storer: &testscommon.PruningStorerMock{}, - Marshaller: gogoProtoMarshaller, + Marshaller: protoMarshaller, MaxDelta: 10, CleanupInterval: 100, FirstCommitableBlocks: map[uint32]uint64{ @@ -405,9 +405,10 @@ func TestDataPool_Close(t *testing.T) { return nil }, } - bp, _ := process.NewDataPool(args) + bp, err := process.NewDataPool(args) + require.Nil(t, err) - err := bp.Close() + err = bp.Close() require.Nil(t, err) require.True(t, wasCalled) diff --git a/process/dataProcessor.go b/process/dataProcessor.go index b0e3aca..d62fe32 100644 --- a/process/dataProcessor.go +++ b/process/dataProcessor.go @@ -15,6 +15,7 @@ type dataProcessor struct { publisher Publisher outportBlocksPool BlocksPool outportBlockConverter OutportBlockConverter + firstCommitableBlocks map[uint32]uint64 } // NewDataProcessor creates a data processor able to receive data from a ws outport driver and handle blocks @@ -23,6 +24,7 @@ func NewDataProcessor( marshaller marshal.Marshalizer, blocksPool BlocksPool, outportBlockConverter OutportBlockConverter, + firstCommitableBlocks map[uint32]uint64, ) (DataProcessor, error) { if check.IfNil(publisher) { return nil, ErrNilPublisher @@ -36,17 +38,20 @@ func NewDataProcessor( if check.IfNil(outportBlockConverter) { return nil, ErrNilOutportBlocksConverter } + if firstCommitableBlocks == nil { + return nil, ErrNilFirstCommitableBlocks + } dp := &dataProcessor{ marshaller: marshaller, publisher: publisher, outportBlocksPool: blocksPool, outportBlockConverter: outportBlockConverter, + firstCommitableBlocks: firstCommitableBlocks, } dp.operationHandlers = map[string]func(marshalledData []byte) error{ - outport.TopicSaveBlock: dp.saveBlock, - outport.TopicRevertIndexedBlock: dp.revertBlock, + outport.TopicSaveBlock: dp.saveBlock, } return dp, nil @@ -95,18 +100,32 @@ func (dp *dataProcessor) handleMetaOutportBlock(outportBlock *outport.OutportBlo return fmt.Errorf("%w for blockData header", ErrInvalidOutportBlock) } metaNonce := metaOutportBlock.BlockData.Header.GetNonce() + headerHash := metaOutportBlock.BlockData.GetHeaderHash() log.Info("saving meta outport block", - "hash", metaOutportBlock.BlockData.GetHeaderHash(), + "hash", headerHash, "nonce", metaNonce, "shardID", metaOutportBlock.ShardID) - headerHash := metaOutportBlock.BlockData.HeaderHash err = dp.outportBlocksPool.PutBlock(headerHash, metaOutportBlock) if err != nil { return fmt.Errorf("failed to put metablock: %w", err) } + shardID := metaOutportBlock.GetShardID() + firstCommitableBlock, ok := dp.firstCommitableBlocks[shardID] + if !ok { + return fmt.Errorf("failed to get first commitable block for shard %d", shardID) + } + + if metaNonce < firstCommitableBlock { + // do not try to aggregate or publish hyper outport block + + log.Trace("do not publish block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock) + + return nil + } + err = dp.publisher.PublishBlock(headerHash) if err != nil { return fmt.Errorf("failed to publish block: %w", err) @@ -128,31 +147,16 @@ func (dp *dataProcessor) handleShardOutportBlock(outportBlock *outport.OutportBl } nonce := shardOutportBlock.BlockData.Header.GetNonce() + headerHash := shardOutportBlock.BlockData.GetHeaderHash() + log.Info("saving shard outport block", - "hash", shardOutportBlock.BlockData.GetHeaderHash(), + "hash", headerHash, "nonce", nonce, "shardID", shardOutportBlock.ShardID) - headerHash := outportBlock.BlockData.HeaderHash - return dp.outportBlocksPool.PutBlock(headerHash, shardOutportBlock) } -func (dp *dataProcessor) revertBlock(marshalledData []byte) error { - blockData := &outport.BlockData{} - err := dp.marshaller.Unmarshal(blockData, marshalledData) - if err != nil { - return err - } - - err = dp.publisher.PublishBlock(blockData.HeaderHash) - if err != nil { - return fmt.Errorf("failed to publish block: %w", err) - } - - return nil -} - // Close will close the internal writer func (dp *dataProcessor) Close() error { return dp.publisher.Close() diff --git a/process/dataProcessor_test.go b/process/dataProcessor_test.go index be61606..861d76a 100644 --- a/process/dataProcessor_test.go +++ b/process/dataProcessor_test.go @@ -89,6 +89,7 @@ func TestNewDataProcessor(t *testing.T) { &testscommon.MarshallerStub{}, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) require.Nil(t, dp) require.Equal(t, process.ErrNilPublisher, err) @@ -102,6 +103,7 @@ func TestNewDataProcessor(t *testing.T) { nil, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) require.Nil(t, dp) require.Equal(t, process.ErrNilMarshaller, err) @@ -115,6 +117,7 @@ func TestNewDataProcessor(t *testing.T) { &testscommon.MarshallerMock{}, nil, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) require.Nil(t, dp) require.Equal(t, process.ErrNilBlocksPool, err) @@ -128,11 +131,26 @@ func TestNewDataProcessor(t *testing.T) { &testscommon.MarshallerStub{}, &testscommon.HyperBlocksPoolMock{}, nil, + defaultFirstCommitableBlocks, ) require.Nil(t, dp) require.Equal(t, process.ErrNilOutportBlocksConverter, err) }) + t.Run("nil first commitable blocks", func(t *testing.T) { + t.Parallel() + + dp, err := process.NewDataProcessor( + &testscommon.PublisherMock{}, + &testscommon.MarshallerStub{}, + &testscommon.HyperBlocksPoolMock{}, + &testscommon.OutportBlockConverterMock{}, + nil, + ) + require.Nil(t, dp) + require.Equal(t, process.ErrNilFirstCommitableBlocks, err) + }) + t.Run("should work", func(t *testing.T) { t.Parallel() @@ -141,6 +159,7 @@ func TestNewDataProcessor(t *testing.T) { &testscommon.MarshallerStub{}, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) require.Nil(t, err) require.False(t, dp.IsInterfaceNil()) @@ -155,6 +174,7 @@ func TestDataProcessor_ProcessPayload_NotImplementedTopics(t *testing.T) { &testscommon.MarshallerStub{}, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) require.Nil(t, dp.ProcessPayload([]byte("payload"), "random topic", 1)) @@ -179,6 +199,7 @@ func TestDataProcessor_ProcessPayload(t *testing.T) { gogoProtoMarshaller, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) err := dp.ProcessPayload(nil, outportcore.TopicSaveBlock, 1) @@ -200,6 +221,7 @@ func TestDataProcessor_ProcessPayload(t *testing.T) { protoMarshaller, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) err := dp.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1) @@ -223,6 +245,7 @@ func TestDataProcessor_ProcessPayload(t *testing.T) { }, }, outportBlockConverter, + defaultFirstCommitableBlocks, ) err := dp.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) @@ -231,6 +254,48 @@ func TestDataProcessor_ProcessPayload(t *testing.T) { require.True(t, putBlockWasCalled) }) + t.Run("should not publish block until first commitable block", func(t *testing.T) { + t.Parallel() + + round := uint64(10) + + firstCommitableBlocks := map[uint32]uint64{ + core.MetachainShardId: round + 1, + } + + outportBlock := createChainMetaOutportBlock() + outportBlockBytes, err := gogoProtoMarshaller.Marshal(outportBlock) + require.Nil(t, err) + + metaOutportBlock, err := outportBlockConverter.HandleMetaOutportBlock(outportBlock) + require.Nil(t, err) + + publishWasCalled := false + dp, _ := process.NewDataProcessor( + &testscommon.PublisherMock{ + PublishBlockCalled: func(headerHash []byte) error { + publishWasCalled = true + return nil + }, + }, + gogoProtoMarshaller, + &testscommon.HyperBlocksPoolMock{ + PutBlockCalled: func(hash []byte, outportBlock process.OutportBlockHandler) error { + require.Equal(t, metaOutportBlock, outportBlock) + + return nil + }, + }, + outportBlockConverter, + firstCommitableBlocks, + ) + + err = dp.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + require.Nil(t, err) + + require.False(t, publishWasCalled) + }) + t.Run("meta outport block, should work", func(t *testing.T) { t.Parallel() @@ -258,6 +323,7 @@ func TestDataProcessor_ProcessPayload(t *testing.T) { }, }, outportBlockConverter, + defaultFirstCommitableBlocks, ) err = dp.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) @@ -281,6 +347,7 @@ func TestDataProcessor_Close(t *testing.T) { &testscommon.MarshallerStub{}, &testscommon.HyperBlocksPoolMock{}, &testscommon.OutportBlockConverterMock{}, + defaultFirstCommitableBlocks, ) require.Nil(t, err) diff --git a/process/export_test.go b/process/export_test.go index bc1dde7..1f39e2b 100644 --- a/process/export_test.go +++ b/process/export_test.go @@ -4,6 +4,8 @@ import ( "math/big" "github.com/multiversx/mx-chain-storage-go/types" + "github.com/multiversx/mx-chain-ws-connector-firehose-go/data" + "github.com/multiversx/mx-chain-ws-connector-firehose-go/data/hyperOutportBlocks" ) const ( @@ -42,3 +44,8 @@ func (ps *pruningStorer) GetPersisterPaths() ([]string, error) { func (o *outportBlockConverter) CastBigInt(i *big.Int) ([]byte, error) { return o.castBigInt(i) } + +// GetLastBlockCheckpoint - +func (ph *publisherHandler) GetLastBlockCheckpoint(hyperOutportBlock *hyperOutportBlocks.HyperOutportBlock) (*data.BlockCheckpoint, error) { + return ph.getLastBlockCheckpoint(hyperOutportBlock) +} diff --git a/process/firehosePublisher_test.go b/process/firehosePublisher_test.go index d9f394f..ab73d9e 100644 --- a/process/firehosePublisher_test.go +++ b/process/firehosePublisher_test.go @@ -30,7 +30,7 @@ func createHyperOutportBlock() *data.HyperOutportBlock { ShardID: 1, BlockData: &data.MetaBlockData{ Header: &data.MetaHeader{ - Nonce: 1, + Nonce: 16, PrevHash: []byte("prev hash"), TimeStamp: 100, }, @@ -43,7 +43,41 @@ func createHyperOutportBlock() *data.HyperOutportBlock { HighestFinalBlockNonce: 0, HighestFinalBlockHash: []byte{}, }, - NotarizedHeadersOutportData: []*data.NotarizedHeaderOutportData{}, + NotarizedHeadersOutportData: []*data.NotarizedHeaderOutportData{ + &data.NotarizedHeaderOutportData{ + OutportBlock: &data.ShardOutportBlock{ + ShardID: 0, + BlockData: &data.BlockData{ + ShardID: 0, + Header: &data.Header{ + Nonce: 10, + }, + }, + }, + }, + &data.NotarizedHeaderOutportData{ + OutportBlock: &data.ShardOutportBlock{ + ShardID: 2, + BlockData: &data.BlockData{ + ShardID: 2, + Header: &data.Header{ + Nonce: 12, + }, + }, + }, + }, + &data.NotarizedHeaderOutportData{ + OutportBlock: &data.ShardOutportBlock{ + ShardID: 1, + BlockData: &data.BlockData{ + ShardID: 1, + Header: &data.Header{ + Nonce: 11, + }, + }, + }, + }, + }, } return hyperOutportBlock diff --git a/process/publisherHandler.go b/process/publisherHandler.go index 6e6a2a1..d5fe0b6 100644 --- a/process/publisherHandler.go +++ b/process/publisherHandler.go @@ -26,6 +26,7 @@ type publisherHandler struct { retryDuration time.Duration marshaller marshal.Marshalizer firstCommitableBlocks map[uint32]uint64 + resetCheckpoints bool blocksChan chan []byte cancelFunc func() @@ -43,6 +44,7 @@ type PublisherHandlerArgs struct { RetryDurationInMilliseconds uint64 Marshalizer marshal.Marshalizer FirstCommitableBlocks map[uint32]uint64 + ResetCheckpoints bool } // NewPublisherHandler creates a new publisher handler component @@ -72,8 +74,9 @@ func NewPublisherHandler(args PublisherHandlerArgs) (*publisherHandler, error) { outportBlocksPool: args.OutportBlocksPool, dataAggregator: args.DataAggregator, marshaller: args.Marshalizer, - retryDuration: time.Duration(args.RetryDurationInMilliseconds) * time.Millisecond, firstCommitableBlocks: args.FirstCommitableBlocks, + resetCheckpoints: args.ResetCheckpoints, + retryDuration: time.Duration(args.RetryDurationInMilliseconds) * time.Millisecond, checkpoint: &data.PublishCheckpoint{}, blocksChan: make(chan []byte), closeChan: make(chan struct{}), @@ -152,6 +155,11 @@ func (ph *publisherHandler) updatePublishCheckpoint() { } func (ph *publisherHandler) handleLastCheckpointOnInit() error { + if ph.resetCheckpoints { + log.Debug("did not check last publisher checkpoint on init") + return nil + } + checkpoint, err := ph.getLastPublishCheckpoint() if err != nil { return err @@ -235,7 +243,7 @@ func (ph *publisherHandler) handlerHyperOutportBlock(headerHash []byte) error { if metaNonce < firstCommitableBlock { // do not try to aggregate or publish hyper outport block - log.Trace("do not commit block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock) + log.Trace("do not publish block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock) return nil } @@ -274,8 +282,11 @@ func (ph *publisherHandler) getLastBlockCheckpoint(hyperOutportBlock *hyperOutpo return nil, err } - checkpoint := &data.BlockCheckpoint{ - LastNonces: make(map[uint32]uint64), + checkpoint, err := ph.outportBlocksPool.GetLastCheckpoint() + if err != nil { + checkpoint = &data.BlockCheckpoint{ + LastNonces: make(map[uint32]uint64), + } } metaBlock := hyperOutportBlock.MetaOutportBlock.BlockData.Header @@ -317,7 +328,9 @@ func (ph *publisherHandler) Close() error { ph.cancelFunc() - close(ph.closeChan) + if ph.closeChan != nil { + close(ph.closeChan) + } return nil } diff --git a/process/publisherHandler_test.go b/process/publisherHandler_test.go index 2b96592..037108f 100644 --- a/process/publisherHandler_test.go +++ b/process/publisherHandler_test.go @@ -320,6 +320,88 @@ func TestPublisherHandler_PublishBlock(t *testing.T) { }) } +func TestPublisherHandler_GetLastBlockCheckpoint(t *testing.T) { + t.Parallel() + + args := createDefaultPublisherHandlerArgs() + + hyperOutportBlock := &hyperOutportBlocks.HyperOutportBlock{ + MetaOutportBlock: &hyperOutportBlocks.MetaOutportBlock{ + ShardID: 1, + BlockData: &hyperOutportBlocks.MetaBlockData{ + Header: &hyperOutportBlocks.MetaHeader{ + Nonce: 16, + PrevHash: []byte("prev hash"), + TimeStamp: 100, + }, + HeaderType: string(core.ShardHeaderV1), + HeaderHash: []byte("hash"), + }, + NotarizedHeadersHashes: []string{}, + NumberOfShards: 0, + SignersIndexes: []uint64{}, + HighestFinalBlockNonce: 0, + HighestFinalBlockHash: []byte{}, + }, + NotarizedHeadersOutportData: []*hyperOutportBlocks.NotarizedHeaderOutportData{ + &hyperOutportBlocks.NotarizedHeaderOutportData{ + OutportBlock: &hyperOutportBlocks.ShardOutportBlock{ + ShardID: 0, + BlockData: &hyperOutportBlocks.BlockData{ + ShardID: 0, + Header: &hyperOutportBlocks.Header{ + Nonce: 11, + }, + }, + }, + }, + &hyperOutportBlocks.NotarizedHeaderOutportData{ + OutportBlock: &hyperOutportBlocks.ShardOutportBlock{ + ShardID: 2, + BlockData: &hyperOutportBlocks.BlockData{ + ShardID: 2, + Header: &hyperOutportBlocks.Header{ + Nonce: 13, + }, + }, + }, + }, + }, + } + + args.OutportBlocksPool = &testscommon.HyperBlocksPoolMock{ + GetLastCheckpointCalled: func() (*data.BlockCheckpoint, error) { + return &data.BlockCheckpoint{ + LastNonces: map[uint32]uint64{ + core.MetachainShardId: 15, + 0: 10, + 1: 11, + 2: 12, + }, + }, nil + }, + } + + ph, err := process.NewPublisherHandler(args) + require.Nil(t, err) + require.NotNil(t, ph) + + // shard 1 nonce is taken from last block checkpoint since it was missing from hyper outport block + expectedCheckpoint := &data.BlockCheckpoint{ + LastNonces: map[uint32]uint64{ + core.MetachainShardId: 16, + 0: 11, + 1: 11, + 2: 13, + }, + } + + lastCheckpoint, err := ph.GetLastBlockCheckpoint(hyperOutportBlock) + require.Nil(t, err) + + require.Equal(t, expectedCheckpoint, lastCheckpoint) +} + func TestPublisherHandler_Close(t *testing.T) { t.Parallel()