diff --git a/process/blocksPool_test.go b/process/blocksPool_test.go new file mode 100644 index 0000000..af2ae3f --- /dev/null +++ b/process/blocksPool_test.go @@ -0,0 +1,3 @@ +package process_test + +// TODO: add tests diff --git a/process/dataAggregator.go b/process/dataAggregator.go index e7eb798..e477475 100644 --- a/process/dataAggregator.go +++ b/process/dataAggregator.go @@ -17,7 +17,7 @@ func NewDataAggregator( blocksPool BlocksPool, ) (*dataAggregator, error) { if check.IfNil(blocksPool) { - return nil, errNilBlocksPool + return nil, ErrNilBlocksPool } return &dataAggregator{ diff --git a/process/dataAggregator_test.go b/process/dataAggregator_test.go new file mode 100644 index 0000000..f1891dc --- /dev/null +++ b/process/dataAggregator_test.go @@ -0,0 +1,57 @@ +package process_test + +import ( + "encoding/hex" + "testing" + + "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-ws-connector-template-go/process" + "github.com/multiversx/mx-chain-ws-connector-template-go/testscommon" + "github.com/stretchr/testify/require" +) + +func TestNewDataAggregator(t *testing.T) { + t.Parallel() + + t.Run("nil blocks pool", func(t *testing.T) { + t.Parallel() + + da, err := process.NewDataAggregator(nil) + require.Nil(t, da) + require.Equal(t, process.ErrNilBlocksPool, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + da, err := process.NewDataAggregator(&testscommon.BlocksPoolStub{}) + require.Nil(t, err) + require.False(t, da.IsInterfaceNil()) + }) +} + +func TestDataAggregator_ProcessHyperBlock(t *testing.T) { + t.Parallel() + + headerHash := []byte("headerHash1") + + shardOutportBlock := createOutportBlock() + shardOutportBlock.BlockData.HeaderHash = headerHash + + blocksPoolStub := &testscommon.BlocksPoolStub{ + GetBlockCalled: func(hash []byte) (*outport.OutportBlock, error) { + + return shardOutportBlock, nil + }, + } + + da, err := process.NewDataAggregator(blocksPoolStub) + require.Nil(t, err) + + outportBlock := createMetaOutportBlock() + outportBlock.NotarizedHeadersHashes = []string{hex.EncodeToString(headerHash)} + + hyperOutportBlock, err := da.ProcessHyperBlock(outportBlock) + require.Nil(t, err) + require.Equal(t, shardOutportBlock, hyperOutportBlock.NotarizedHeadersOutportData[0].OutportBlock) +} diff --git a/process/dataProcessor.go b/process/dataProcessor.go index 2c77b2e..8922108 100644 --- a/process/dataProcessor.go +++ b/process/dataProcessor.go @@ -23,16 +23,16 @@ func NewDataProcessor( dataAggregator DataAggregator, ) (DataProcessor, error) { if publisher == nil { - return nil, errNilWriter + return nil, ErrNilPublisher } if check.IfNil(blocksPool) { - return nil, errNilBlocksPool + return nil, ErrNilBlocksPool } if check.IfNil(marshaller) { - return nil, errNilMarshaller + return nil, ErrNilMarshaller } if check.IfNil(dataAggregator) { - return nil, errNilDataAggregator + return nil, ErrNilDataAggregator } dp := &dataProcessor{ @@ -67,7 +67,7 @@ func (dp *dataProcessor) saveBlock(marshalledData []byte) error { } if outportBlock == nil || outportBlock.BlockData == nil { - return errNilOutportBlockData + return ErrNilOutportBlockData } log.Info("saving block", "hash", outportBlock.BlockData.GetHeaderHash(), "shardID", outportBlock.ShardID) diff --git a/process/dataProcessor_test.go b/process/dataProcessor_test.go new file mode 100644 index 0000000..3a617af --- /dev/null +++ b/process/dataProcessor_test.go @@ -0,0 +1,253 @@ +package process_test + +import ( + "errors" + "testing" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/data/block" + outportcore "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-ws-connector-template-go/data" + "github.com/multiversx/mx-chain-ws-connector-template-go/process" + "github.com/multiversx/mx-chain-ws-connector-template-go/testscommon" + "github.com/stretchr/testify/require" +) + +func createOutportBlock() *outportcore.OutportBlock { + header := &block.Header{ + Nonce: 1, + PrevHash: []byte("prev hash"), + TimeStamp: 100, + } + headerBytes, _ := protoMarshaller.Marshal(header) + + return &outportcore.OutportBlock{ + ShardID: 1, + BlockData: &outportcore.BlockData{ + HeaderBytes: headerBytes, + HeaderType: string(core.ShardHeaderV1), + HeaderHash: []byte("hash"), + }, + } +} + +func createMetaOutportBlock() *outportcore.OutportBlock { + header := &block.MetaBlock{ + Nonce: 1, + PrevHash: []byte("prev hash"), + TimeStamp: 100, + } + headerBytes, _ := protoMarshaller.Marshal(header) + + return &outportcore.OutportBlock{ + ShardID: core.MetachainShardId, + BlockData: &outportcore.BlockData{ + HeaderBytes: headerBytes, + HeaderType: string(core.MetaHeader), + HeaderHash: []byte("hash"), + }, + } +} + +func TestNewDataProcessor(t *testing.T) { + t.Parallel() + + t.Run("nil publisher", func(t *testing.T) { + t.Parallel() + + dp, err := process.NewDataProcessor( + nil, + &testscommon.MarshallerStub{}, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + require.Nil(t, dp) + require.Equal(t, process.ErrNilPublisher, err) + }) + + t.Run("nil marshaller", func(t *testing.T) { + t.Parallel() + + dp, err := process.NewDataProcessor( + &testscommon.PublisherStub{}, + nil, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + require.Nil(t, dp) + require.Equal(t, process.ErrNilMarshaller, err) + }) + + t.Run("nil blocks pool", func(t *testing.T) { + t.Parallel() + + dp, err := process.NewDataProcessor( + &testscommon.PublisherStub{}, + &testscommon.MarshallerStub{}, + nil, + &testscommon.DataAggregatorStub{}, + ) + require.Nil(t, dp) + require.Equal(t, process.ErrNilBlocksPool, err) + }) + + t.Run("nil data aggregator", func(t *testing.T) { + t.Parallel() + + dp, err := process.NewDataProcessor( + &testscommon.PublisherStub{}, + &testscommon.MarshallerStub{}, + &testscommon.BlocksPoolStub{}, + nil, + ) + require.Nil(t, dp) + require.Equal(t, process.ErrNilDataAggregator, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + dp, err := process.NewDataProcessor( + &testscommon.PublisherStub{}, + &testscommon.MarshallerStub{}, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + require.Nil(t, err) + require.False(t, dp.IsInterfaceNil()) + }) +} + +func TestDataProcessor_ProcessPayload_NotImplementedTopics(t *testing.T) { + t.Parallel() + + dp, _ := process.NewDataProcessor( + &testscommon.PublisherStub{}, + &testscommon.MarshallerStub{}, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + + require.Nil(t, dp.ProcessPayload([]byte("payload"), "random topic", 1)) + require.Nil(t, dp.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo, 1)) + require.Nil(t, dp.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating, 1)) + require.Nil(t, dp.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys, 1)) + require.Nil(t, dp.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts, 1)) + require.Nil(t, dp.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock, 1)) +} + +func TestDataProcessor_ProcessPayload(t *testing.T) { + t.Parallel() + + t.Run("nil outport block, should return error", func(t *testing.T) { + t.Parallel() + + dp, _ := process.NewDataProcessor( + &testscommon.PublisherStub{}, + protoMarshaller, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + + err := dp.ProcessPayload(nil, outportcore.TopicSaveBlock, 1) + require.Equal(t, process.ErrNilOutportBlockData, err) + + outportBlock := createOutportBlock() + outportBlock.BlockData = nil + outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) + + err = dp.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + require.Equal(t, process.ErrNilOutportBlockData, err) + }) + + t.Run("invalid payload, cannot unmarshall, should return error", func(t *testing.T) { + t.Parallel() + + dp, _ := process.NewDataProcessor( + &testscommon.PublisherStub{}, + protoMarshaller, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + + err := dp.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1) + require.NotNil(t, err) + }) + + t.Run("shard outport block, should work", func(t *testing.T) { + t.Parallel() + + outportBlock := createOutportBlock() + outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) + + putBlockWasCalled := false + dp, _ := process.NewDataProcessor( + &testscommon.PublisherStub{}, + protoMarshaller, + &testscommon.BlocksPoolStub{ + PutBlockCalled: func(hash []byte, outportBlock *outportcore.OutportBlock) error { + putBlockWasCalled = true + return nil + }, + }, + &testscommon.DataAggregatorStub{}, + ) + + err := dp.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + require.Nil(t, err) + + require.True(t, putBlockWasCalled) + }) + + t.Run("meta outport block, should work", func(t *testing.T) { + t.Parallel() + + outportBlock := createMetaOutportBlock() + outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) + + publishWasCalled := false + dp, _ := process.NewDataProcessor( + &testscommon.PublisherStub{ + PublishHyperBlockCalled: func(hyperOutportBlock *data.HyperOutportBlock) error { + publishWasCalled = true + return nil + }, + }, + protoMarshaller, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{ + ProcessHyperBlockCalled: func(outportBlock *outportcore.OutportBlock) (*data.HyperOutportBlock, error) { + return &data.HyperOutportBlock{ + MetaOutportBlock: outportBlock, + }, nil + }, + }, + ) + + err := dp.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + require.Nil(t, err) + + require.True(t, publishWasCalled) + }) +} + +func TestDataProcessor_Close(t *testing.T) { + t.Parallel() + + expectedErr := errors.New("expected error") + + dp, err := process.NewDataProcessor( + &testscommon.PublisherStub{ + CloseCalled: func() error { + return expectedErr + }, + }, + &testscommon.MarshallerStub{}, + &testscommon.BlocksPoolStub{}, + &testscommon.DataAggregatorStub{}, + ) + require.Nil(t, err) + + err = dp.Close() + require.Equal(t, expectedErr, err) +} diff --git a/process/errors.go b/process/errors.go index d6931c1..1c02e03 100644 --- a/process/errors.go +++ b/process/errors.go @@ -2,19 +2,26 @@ package process import "errors" -var errNilMarshaller = errors.New("nil marshaller provided") +// ErrNilMarshaller signals that a nil marshaller was provided +var ErrNilMarshaller = errors.New("nil marshaller provided") -var errNilOutportBlockData = errors.New("nil outport block data") +// ErrNilOutportBlockData signals that a nil outport block data was provided +var ErrNilOutportBlockData = errors.New("nil outport block data") -var errNilWriter = errors.New("nil writer provided") +// ErrNilWriter signals that a nil write was provided +var ErrNilWriter = errors.New("nil writer provided") -var errNilBlockCreator = errors.New("nil block creator provided") +// ErrNilBlockCreator signals that a nil block creator was provided +var ErrNilBlockCreator = errors.New("nil block creator provided") -var errNilPublisher = errors.New("nil publisher provided") +// ErrNilPublisher signals that a nil publisher was provided +var ErrNilPublisher = errors.New("nil publisher provided") -var errNilBlocksPool = errors.New("nil blocks pool provided") +// ErrNilBlocksPool signals that a nil blocks pool was provided +var ErrNilBlocksPool = errors.New("nil blocks pool provided") -var errNilDataAggregator = errors.New("nil data aggregator provided") +// ErrNilDataAggregator signals that a nil data aggregator was provided +var ErrNilDataAggregator = errors.New("nil data aggregator provided") // ErrWrongTypeAssertion signals that a type assertion faled var ErrWrongTypeAssertion = errors.New("type assertion failed") diff --git a/process/export_test.go b/process/export_test.go new file mode 100644 index 0000000..6a37edf --- /dev/null +++ b/process/export_test.go @@ -0,0 +1,6 @@ +package process + +const ( + FirehosePrefix = firehosePrefix + BlockPrefix = blockPrefix +) diff --git a/process/firehosePublisher.go b/process/firehosePublisher.go index 80ce4c9..72d1c13 100644 --- a/process/firehosePublisher.go +++ b/process/firehosePublisher.go @@ -37,13 +37,13 @@ func NewFirehosePublisher( marshaller marshal.Marshalizer, ) (*firehosePublisher, error) { if writer == nil { - return nil, errNilWriter + return nil, ErrNilWriter } if check.IfNil(blockCreator) { - return nil, errNilBlockCreator + return nil, ErrNilBlockCreator } if check.IfNil(marshaller) { - return nil, errNilMarshaller + return nil, ErrNilMarshaller } fp := &firehosePublisher{ diff --git a/process/firehosePublisher_test.go b/process/firehosePublisher_test.go index 3e291f5..74f623e 100644 --- a/process/firehosePublisher_test.go +++ b/process/firehosePublisher_test.go @@ -1,4 +1,4 @@ -package process +package process_test import ( "encoding/base64" @@ -15,12 +15,14 @@ import ( "github.com/multiversx/mx-chain-core-go/marshal" "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-ws-connector-template-go/data" + "github.com/multiversx/mx-chain-ws-connector-template-go/process" "github.com/multiversx/mx-chain-ws-connector-template-go/testscommon" ) var protoMarshaller = &marshal.GogoProtoMarshalizer{} -func createOutportBlock() *outportcore.OutportBlock { +func createHyperOutportBlock() *data.HyperOutportBlock { header := &block.Header{ Nonce: 1, PrevHash: []byte("prev hash"), @@ -28,91 +30,88 @@ func createOutportBlock() *outportcore.OutportBlock { } headerBytes, _ := protoMarshaller.Marshal(header) - return &outportcore.OutportBlock{ - BlockData: &outportcore.BlockData{ - HeaderHash: []byte("hash"), - HeaderBytes: headerBytes, - HeaderType: string(core.ShardHeaderV1), + hyperOutportBlock := &data.HyperOutportBlock{ + MetaOutportBlock: &outportcore.OutportBlock{ + ShardID: 1, + BlockData: &outportcore.BlockData{ + HeaderBytes: headerBytes, + HeaderType: string(core.ShardHeaderV1), + HeaderHash: []byte("hash"), + }, + NotarizedHeadersHashes: []string{}, + NumberOfShards: 0, + SignersIndexes: []uint64{}, + HighestFinalBlockNonce: 0, + HighestFinalBlockHash: []byte{}, }, + NotarizedHeadersOutportData: []*data.NotarizedHeaderOutportData{}, } + + return hyperOutportBlock } -func createContainer() BlockContainerHandler { +func createContainer() process.BlockContainerHandler { container := block.NewEmptyBlockCreatorsContainer() _ = container.Add(core.ShardHeaderV1, block.NewEmptyHeaderCreator()) return container } -func TestNewFirehoseDataProcessor(t *testing.T) { +func TestNewFirehosePublisher(t *testing.T) { t.Parallel() t.Run("nil io writer, should return error", func(t *testing.T) { t.Parallel() - fi, err := NewFirehoseDataProcessor(nil, createContainer(), protoMarshaller) - require.Nil(t, fi) - require.Equal(t, errNilWriter, err) + fp, err := process.NewFirehosePublisher(nil, createContainer(), protoMarshaller) + require.Nil(t, fp) + require.Equal(t, process.ErrNilWriter, err) }) t.Run("nil block creator, should return error", func(t *testing.T) { t.Parallel() - fi, err := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, nil, protoMarshaller) - require.Nil(t, fi) - require.Equal(t, errNilBlockCreator, err) + fp, err := process.NewFirehosePublisher(&testscommon.IoWriterStub{}, nil, protoMarshaller) + require.Nil(t, fp) + require.Equal(t, process.ErrNilBlockCreator, err) }) t.Run("nil marshaller, should return error", func(t *testing.T) { t.Parallel() - fi, err := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), nil) - require.Nil(t, fi) - require.Equal(t, errNilMarshaller, err) + fp, err := process.NewFirehosePublisher(&testscommon.IoWriterStub{}, createContainer(), nil) + require.Nil(t, fp) + require.Equal(t, process.ErrNilMarshaller, err) }) t.Run("should work", func(t *testing.T) { t.Parallel() - fi, err := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) + fp, err := process.NewFirehosePublisher(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) require.Nil(t, err) - require.False(t, check.IfNil(fi)) + require.False(t, check.IfNil(fp)) }) } -func TestFirehoseIndexer_SaveBlock(t *testing.T) { +func TestFirehosePublisher_PublishHyperBlock(t *testing.T) { t.Parallel() - t.Run("nil outport block, should return error", func(t *testing.T) { + t.Run("should work", func(t *testing.T) { t.Parallel() - fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) - - err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock, 1) - require.Equal(t, errNilOutportBlockData, err) - - outportBlock := createOutportBlock() - outportBlock.BlockData = nil - outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) - - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) - require.Equal(t, errNilOutportBlockData, err) - }) - - t.Run("invalid payload, cannot unmarshall, should return error", func(t *testing.T) { - t.Parallel() + fp, err := process.NewFirehosePublisher(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) + require.Nil(t, err) - fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) + outportBlock := createHyperOutportBlock() - err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1) - require.NotNil(t, err) + err = fp.PublishHyperBlock(outportBlock) }) t.Run("unknown block creator for header type, should return error", func(t *testing.T) { t.Parallel() - outportBlock := createOutportBlock() - outportBlock.BlockData.HeaderType = "unknown" + outportBlock := createHyperOutportBlock() + outportBlock.MetaOutportBlock.BlockData.HeaderType = "unknown" ioWriterCalledCt := 0 ioWriter := &testscommon.IoWriterStub{ @@ -122,13 +121,11 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { }, } - fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller) + fi, _ := process.NewFirehosePublisher(ioWriter, createContainer(), protoMarshaller) - outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) - err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + err := fi.PublishHyperBlock(outportBlock) require.NotNil(t, err) - // New does the first write - require.Equal(t, 1, ioWriterCalledCt) + require.Equal(t, 1, ioWriterCalledCt) // 1 write comes from constructor }) t.Run("cannot unmarshall to get header from bytes, should return error", func(t *testing.T) { @@ -142,32 +139,18 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { }, } - outportBlock := createOutportBlock() - outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) - - unmarshalCalledBefore := false - errUnmarshal := errors.New("err unmarshal") + expectedErr := errors.New("expected err") marshaller := &testscommon.MarshallerStub{ UnmarshalCalled: func(obj interface{}, buff []byte) error { - defer func() { - unmarshalCalledBefore = true - }() - - if unmarshalCalledBefore { - return errUnmarshal - } - - require.Equal(t, outportBlockBytes, buff) - err := protoMarshaller.Unmarshal(obj, buff) - require.Nil(t, err) - - return nil + return expectedErr }, } - fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), marshaller) - err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) - require.Equal(t, errUnmarshal, err) + fp, _ := process.NewFirehosePublisher(ioWriter, createContainer(), marshaller) + + outportBlock := createHyperOutportBlock() + err := fp.PublishHyperBlock(outportBlock) + require.Equal(t, expectedErr, err) }) t.Run("cannot write in console, should return error", func(t *testing.T) { @@ -197,18 +180,17 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { }, } - fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller) + fp, _ := process.NewFirehosePublisher(ioWriter, createContainer(), protoMarshaller) - outportBlock := createOutportBlock() - outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) + outportBlock := createHyperOutportBlock() - err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + err := fp.PublishHyperBlock(outportBlock) require.True(t, strings.Contains(err.Error(), err1.Error())) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + err = fp.PublishHyperBlock(outportBlock) require.Nil(t, err) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + err = fp.PublishHyperBlock(outportBlock) require.True(t, errors.Is(err, err2)) require.Equal(t, 4, ioWriterCalledCt) @@ -222,17 +204,23 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { PrevHash: []byte("prev hash"), TimeStamp: 100, } - headerBytes, err := protoMarshaller.Marshal(header) - require.Nil(t, err) - - outportBlock := &outportcore.OutportBlock{ - BlockData: &outportcore.BlockData{ - HeaderHash: []byte("hash"), - HeaderBytes: headerBytes, - HeaderType: string(core.ShardHeaderV1), + headerBytes, _ := protoMarshaller.Marshal(header) + + outportBlock := &data.HyperOutportBlock{ + MetaOutportBlock: &outportcore.OutportBlock{ + ShardID: 1, + BlockData: &outportcore.BlockData{ + HeaderBytes: headerBytes, + HeaderType: string(core.ShardHeaderV1), + HeaderHash: []byte("hash"), + }, + NotarizedHeadersHashes: []string{}, + NumberOfShards: 0, + SignersIndexes: []uint64{}, + HighestFinalBlockNonce: 0, + HighestFinalBlockHash: []byte{}, }, - - HighestFinalBlockNonce: 0, + NotarizedHeadersOutportData: []*data.NotarizedHeaderOutportData{}, } outportBlockBytes, err := protoMarshaller.Marshal(outportBlock) require.Nil(t, err) @@ -253,10 +241,10 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { encodedMvxBlock := base64.StdEncoding.EncodeToString(outportBlockBytes) require.Equal(t, []byte(fmt.Sprintf("%s %s %d %s %d %s %d %d %s\n", - firehosePrefix, - blockPrefix, + process.FirehosePrefix, + process.BlockPrefix, num, - hex.EncodeToString(outportBlock.BlockData.HeaderHash), + hex.EncodeToString(outportBlock.MetaOutportBlock.BlockData.HeaderHash), parentNum, hex.EncodeToString(header.PrevHash), libNum, @@ -269,28 +257,15 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { }, } - fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller) + fp, _ := process.NewFirehosePublisher(ioWriter, createContainer(), protoMarshaller) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) + err = fp.PublishHyperBlock(outportBlock) require.Nil(t, err) require.Equal(t, 2, ioWriterCalledCt) }) } -func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) { - t.Parallel() - - fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) - - require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic", 1)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo, 1)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating, 1)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys, 1)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts, 1)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock, 1)) -} - -func TestFirehoseIndexer_Close(t *testing.T) { +func TestFirehosePublisher_Close(t *testing.T) { t.Parallel() closeError := errors.New("error closing") @@ -300,7 +275,7 @@ func TestFirehoseIndexer_Close(t *testing.T) { }, } - fi, _ := NewFirehoseDataProcessor(writer, createContainer(), protoMarshaller) + fi, _ := process.NewFirehosePublisher(writer, createContainer(), protoMarshaller) err := fi.Close() require.Equal(t, closeError, err) } diff --git a/testscommon/PublisherStub.go b/testscommon/PublisherStub.go new file mode 100644 index 0000000..a9499ca --- /dev/null +++ b/testscommon/PublisherStub.go @@ -0,0 +1,27 @@ +package testscommon + +import "github.com/multiversx/mx-chain-ws-connector-template-go/data" + +// PublisherStub - +type PublisherStub struct { + PublishHyperBlockCalled func(hyperOutportBlock *data.HyperOutportBlock) error + CloseCalled func() error +} + +// PublishHyperBlock - +func (p *PublisherStub) PublishHyperBlock(hyperOutportBlock *data.HyperOutportBlock) error { + if p.PublishHyperBlockCalled != nil { + return p.PublishHyperBlockCalled(hyperOutportBlock) + } + + return nil +} + +// Close - +func (p *PublisherStub) Close() error { + if p.CloseCalled != nil { + return p.CloseCalled() + } + + return nil +} diff --git a/testscommon/blocksPoolStub.go b/testscommon/blocksPoolStub.go new file mode 100644 index 0000000..b195be3 --- /dev/null +++ b/testscommon/blocksPoolStub.go @@ -0,0 +1,32 @@ +package testscommon + +import "github.com/multiversx/mx-chain-core-go/data/outport" + +// BlocksPoolStub - +type BlocksPoolStub struct { + PutBlockCalled func(hash []byte, outportBlock *outport.OutportBlock) error + GetBlockCalled func(hash []byte) (*outport.OutportBlock, error) +} + +// PutBlock - +func (b *BlocksPoolStub) PutBlock(hash []byte, outportBlock *outport.OutportBlock) error { + if b.PutBlockCalled != nil { + return b.PutBlockCalled(hash, outportBlock) + } + + return nil +} + +// GetBlock - +func (b *BlocksPoolStub) GetBlock(hash []byte) (*outport.OutportBlock, error) { + if b.GetBlockCalled != nil { + return b.GetBlockCalled(hash) + } + + return &outport.OutportBlock{}, nil +} + +// IsInterfaceNil - +func (b *BlocksPoolStub) IsInterfaceNil() bool { + return b == nil +} diff --git a/testscommon/dataAggregatorStub.go b/testscommon/dataAggregatorStub.go new file mode 100644 index 0000000..6840e0a --- /dev/null +++ b/testscommon/dataAggregatorStub.go @@ -0,0 +1,25 @@ +package testscommon + +import ( + "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-ws-connector-template-go/data" +) + +// DataAggregatorStub - +type DataAggregatorStub struct { + ProcessHyperBlockCalled func(outportBlock *outport.OutportBlock) (*data.HyperOutportBlock, error) +} + +// ProcessHyperBlock - +func (d *DataAggregatorStub) ProcessHyperBlock(outportBlock *outport.OutportBlock) (*data.HyperOutportBlock, error) { + if d.ProcessHyperBlockCalled != nil { + return d.ProcessHyperBlockCalled(outportBlock) + } + + return &data.HyperOutportBlock{}, nil +} + +// IsInterfaceNil - +func (d *DataAggregatorStub) IsInterfaceNil() bool { + return d == nil +}