generated from multiversx/mx-chain-ws-connector-template-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
328 additions
and
86 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package process | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/multiversx/mx-chain-core-go/data/outport" | ||
"github.com/multiversx/mx-chain-go/storage" | ||
) | ||
|
||
type blocksPool struct { | ||
cacher storage.Cacher | ||
} | ||
|
||
func NewBlocksPool(cacher storage.Cacher) (*blocksPool, error) { | ||
return &blocksPool{cacher: cacher}, nil | ||
} | ||
|
||
func (bp *blocksPool) PutBlock(hash []byte, outportBlock *outport.OutportBlock) error { | ||
_ = bp.cacher.Put(hash, outportBlock, 0) | ||
return nil | ||
} | ||
|
||
func (bp *blocksPool) GetBlock(hash []byte) (*outport.OutportBlock, error) { | ||
data, ok := bp.cacher.Get(hash) | ||
if !ok { | ||
return nil, fmt.Errorf("failed to get data from pool") | ||
} | ||
|
||
outportBlock, ok := data.(*outport.OutportBlock) | ||
if !ok { | ||
return nil, ErrWrongTypeAssertion | ||
} | ||
|
||
return outportBlock, nil | ||
} | ||
|
||
func (bp *blocksPool) IsInterfaceNil() bool { | ||
return bp == nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package process | ||
|
||
import ( | ||
"github.com/multiversx/mx-chain-core-go/core" | ||
"github.com/multiversx/mx-chain-core-go/core/check" | ||
"github.com/multiversx/mx-chain-core-go/data" | ||
"github.com/multiversx/mx-chain-core-go/data/block" | ||
"github.com/multiversx/mx-chain-core-go/data/outport" | ||
"github.com/multiversx/mx-chain-core-go/marshal" | ||
) | ||
|
||
type dataAggregator struct { | ||
blockCreator BlockContainerHandler | ||
blocksPool BlocksPool | ||
marshaller marshal.Marshalizer | ||
} | ||
|
||
func NewDataAggregator( | ||
blockCreator BlockContainerHandler, | ||
blocksPool BlocksPool, | ||
marshaller marshal.Marshalizer, | ||
) (*dataAggregator, error) { | ||
if check.IfNil(blockCreator) { | ||
return nil, errNilBlockCreator | ||
} | ||
if check.IfNil(blocksPool) { | ||
return nil, errNilBlocksPool | ||
} | ||
if check.IfNil(marshaller) { | ||
return nil, errNilMarshaller | ||
} | ||
|
||
return &dataAggregator{ | ||
blockCreator: blockCreator, | ||
blocksPool: blocksPool, | ||
marshaller: marshaller, | ||
}, nil | ||
} | ||
|
||
func (da *dataAggregator) ProcessHyperBlock(outportBlock *outport.OutportBlock) (data.HeaderHandler, []byte, error) { | ||
blockCreator, err := da.blockCreator.Get(core.HeaderType(outportBlock.BlockData.HeaderType)) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
header, err := block.GetHeaderFromBytes(da.marshaller, blockCreator, outportBlock.BlockData.HeaderBytes) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
// dummy marshalled data | ||
marshalledData, err := da.marshaller.Marshal(outportBlock) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
return header, marshalledData, nil | ||
} | ||
|
||
// IsInterfaceNil returns true if there is no value under interface | ||
func (da *dataAggregator) IsInterfaceNil() bool { | ||
return da == nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package process | ||
|
||
import ( | ||
"github.com/multiversx/mx-chain-core-go/core" | ||
"github.com/multiversx/mx-chain-core-go/core/check" | ||
"github.com/multiversx/mx-chain-core-go/data/outport" | ||
"github.com/multiversx/mx-chain-core-go/marshal" | ||
) | ||
|
||
type dataProcessor struct { | ||
marshaller marshal.Marshalizer | ||
operationHandlers map[string]func(marshalledData []byte) error | ||
publisher Publisher | ||
blocksPool BlocksPool | ||
dataAggregator DataAggregator | ||
} | ||
|
||
// NewDataProcessor creates a data processor able to receive data from a ws outport driver and handle blocks | ||
func NewDataProcessor( | ||
publisher Publisher, | ||
marshaller marshal.Marshalizer, | ||
blocksPool BlocksPool, | ||
dataAggregator DataAggregator, | ||
) (DataProcessor, error) { | ||
if publisher == nil { | ||
return nil, errNilWriter | ||
} | ||
if check.IfNil(blocksPool) { | ||
return nil, errNilBlocksPool | ||
} | ||
if check.IfNil(marshaller) { | ||
return nil, errNilMarshaller | ||
} | ||
if check.IfNil(dataAggregator) { | ||
return nil, errNilDataAggregator | ||
} | ||
|
||
dp := &dataProcessor{ | ||
marshaller: marshaller, | ||
publisher: publisher, | ||
blocksPool: blocksPool, | ||
dataAggregator: dataAggregator, | ||
} | ||
|
||
dp.operationHandlers = map[string]func(marshalledData []byte) error{ | ||
outport.TopicSaveBlock: dp.saveBlock, | ||
} | ||
|
||
return dp, nil | ||
} | ||
|
||
// ProcessPayload will process the received payload only for TopicSaveBlock, otherwise ignores it. | ||
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string, _ uint32) error { | ||
operationHandler, found := dp.operationHandlers[topic] | ||
if !found { | ||
return nil | ||
} | ||
|
||
return operationHandler(payload) | ||
} | ||
|
||
func (dp *dataProcessor) saveBlock(marshalledData []byte) error { | ||
outportBlock := &outport.OutportBlock{} | ||
err := dp.marshaller.Unmarshal(outportBlock, marshalledData) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if outportBlock == nil || outportBlock.BlockData == nil { | ||
return errNilOutportBlockData | ||
} | ||
|
||
if outportBlock.ShardID == core.MetachainShardId { | ||
return dp.handleMetaOutportBlock(outportBlock) | ||
} | ||
|
||
return dp.handleShardOutportBlock(outportBlock) | ||
} | ||
|
||
func (dp *dataProcessor) handleMetaOutportBlock(outportBlock *outport.OutportBlock) error { | ||
header, marshalledData, err := dp.dataAggregator.ProcessHyperBlock(outportBlock) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// test | ||
headerHash := header.GetRootHash() | ||
|
||
err = dp.publisher.PublishHyperBlock(header, headerHash, marshalledData) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (dp *dataProcessor) handleShardOutportBlock(outportBlock *outport.OutportBlock) error { | ||
blockHash := outportBlock.BlockData.HeaderHash | ||
|
||
err := dp.blocksPool.PutBlock(blockHash, outportBlock) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Close will close the internal writer | ||
func (dp *dataProcessor) Close() error { | ||
return dp.publisher.Close() | ||
} | ||
|
||
// IsInterfaceNil checks if the underlying pointer is nil | ||
func (dp *dataProcessor) IsInterfaceNil() bool { | ||
return dp == nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.