Skip to content

Commit

Permalink
Merge branch 'hyperblock-processor' into implement-blocks-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Mar 19, 2024
2 parents cd715f5 + 934e2a3 commit fb80947
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 134 deletions.
1 change: 1 addition & 0 deletions factory/wsConnectorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func CreateWSConnector(cfg config.WebSocketConfig) (process.WSConnector, error)
return nil, err
}

// TODO: move cache to config
cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.SizeLRUCache,
SizeInBytes: 209715200, // 200MB
Expand Down
3 changes: 3 additions & 0 deletions process/blocksPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (bp *blocksPool) prunePersister(round uint64) error {
return nil
}

// PutBlock will put the provided outport block data to the pool
func (bp *blocksPool) PutBlock(hash []byte, outportBlock *outport.OutportBlock) error {
bp.mutMap.Lock()
defer bp.mutMap.Unlock()
Expand Down Expand Up @@ -159,6 +160,7 @@ func (bp *blocksPool) putOutportBlock(hash []byte, outportBlock *outport.Outport
return nil
}

// GetBlock will return outport block data from the pool
func (bp *blocksPool) GetBlock(hash []byte) (*outport.OutportBlock, error) {
data, err := bp.storer.Get(hash)
if err != nil {
Expand All @@ -174,6 +176,7 @@ func (bp *blocksPool) GetBlock(hash []byte) (*outport.OutportBlock, error) {
return outportBlock, nil
}

// IsInterfaceNil returns nil if there is no value under the interface
func (bp *blocksPool) IsInterfaceNil() bool {
return bp == nil
}
3 changes: 3 additions & 0 deletions process/blocksPool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package process_test

// TODO: add tests
5 changes: 4 additions & 1 deletion process/dataAggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ type dataAggregator struct {
blocksPool BlocksPool
}

// NewDataAggregator will create a new data aggregator instance
func NewDataAggregator(
blocksPool BlocksPool,
) (*dataAggregator, error) {
if check.IfNil(blocksPool) {
return nil, errNilBlocksPool
return nil, ErrNilBlocksPool
}

return &dataAggregator{
blocksPool: blocksPool,
}, nil
}

// ProcessHyperBlock will process meta outport block. It will try to fetch and aggregate
// notarized shards data
func (da *dataAggregator) ProcessHyperBlock(outportBlock *outport.OutportBlock) (*data.HyperOutportBlock, error) {
hyperOutportBlock := &data.HyperOutportBlock{}
hyperOutportBlock.MetaOutportBlock = outportBlock
Expand Down
57 changes: 57 additions & 0 deletions process/dataAggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package process_test

import (
"encoding/hex"
"testing"

"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
"github.com/stretchr/testify/require"
)

func TestNewDataAggregator(t *testing.T) {
t.Parallel()

t.Run("nil blocks pool", func(t *testing.T) {
t.Parallel()

da, err := process.NewDataAggregator(nil)
require.Nil(t, da)
require.Equal(t, process.ErrNilBlocksPool, err)
})

t.Run("should work", func(t *testing.T) {
t.Parallel()

da, err := process.NewDataAggregator(&testscommon.BlocksPoolStub{})

Check failure on line 27 in process/dataAggregator_test.go

View workflow job for this annotation

GitHub Actions / golangci linter

cannot use &(testscommon.BlocksPoolStub literal) (value of type *testscommon.BlocksPoolStub) as process.BlocksPool value in argument to process.NewDataAggregator: *testscommon.BlocksPoolStub does not implement process.BlocksPool (missing method UpdateMetaRound) (typecheck)

Check failure on line 27 in process/dataAggregator_test.go

View workflow job for this annotation

GitHub Actions / build

cannot use &testscommon.BlocksPoolStub{} (type *testscommon.BlocksPoolStub) as type process.BlocksPool in argument to process.NewDataAggregator:
require.Nil(t, err)
require.False(t, da.IsInterfaceNil())
})
}

func TestDataAggregator_ProcessHyperBlock(t *testing.T) {
t.Parallel()

headerHash := []byte("headerHash1")

shardOutportBlock := createOutportBlock()
shardOutportBlock.BlockData.HeaderHash = headerHash

blocksPoolStub := &testscommon.BlocksPoolStub{
GetBlockCalled: func(hash []byte) (*outport.OutportBlock, error) {

return shardOutportBlock, nil
},
}

da, err := process.NewDataAggregator(blocksPoolStub)

Check failure on line 48 in process/dataAggregator_test.go

View workflow job for this annotation

GitHub Actions / golangci linter

cannot use blocksPoolStub (variable of type *testscommon.BlocksPoolStub) as process.BlocksPool value in argument to process.NewDataAggregator: *testscommon.BlocksPoolStub does not implement process.BlocksPool (missing method UpdateMetaRound) (typecheck)

Check failure on line 48 in process/dataAggregator_test.go

View workflow job for this annotation

GitHub Actions / build

cannot use blocksPoolStub (type *testscommon.BlocksPoolStub) as type process.BlocksPool in argument to process.NewDataAggregator:
require.Nil(t, err)

outportBlock := createMetaOutportBlock()
outportBlock.NotarizedHeadersHashes = []string{hex.EncodeToString(headerHash)}

hyperOutportBlock, err := da.ProcessHyperBlock(outportBlock)
require.Nil(t, err)
require.Equal(t, shardOutportBlock, hyperOutportBlock.NotarizedHeadersOutportData[0].OutportBlock)
}
10 changes: 5 additions & 5 deletions process/dataProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ func NewDataProcessor(
blockCreator BlockContainerHandler,
) (DataProcessor, error) {
if publisher == nil {
return nil, errNilWriter
return nil, ErrNilPublisher
}
if check.IfNil(blocksPool) {
return nil, errNilBlocksPool
return nil, ErrNilBlocksPool
}
if check.IfNil(marshaller) {
return nil, errNilMarshaller
return nil, ErrNilMarshaller
}
if check.IfNil(dataAggregator) {
return nil, errNilDataAggregator
return nil, ErrNilDataAggregator
}

dp := &dataProcessor{
Expand Down Expand Up @@ -71,7 +71,7 @@ func (dp *dataProcessor) saveBlock(marshalledData []byte) error {
}

if outportBlock == nil || outportBlock.BlockData == nil {
return errNilOutportBlockData
return ErrNilOutportBlockData
}

log.Info("saving block", "hash", outportBlock.BlockData.GetHeaderHash(), "shardID", outportBlock.ShardID)
Expand Down
Loading

0 comments on commit fb80947

Please sign in to comment.