diff --git a/factory/wsConnectorFactory.go b/factory/wsConnectorFactory.go index 52982a2..7fc1151 100644 --- a/factory/wsConnectorFactory.go +++ b/factory/wsConnectorFactory.go @@ -31,8 +31,12 @@ func CreateWSConnector(cfg config.WebSocketConfig) (process.WSConnector, error) return nil, err } + protoMarshaller := &marshal.GogoProtoMarshalizer{} + firehosePublisher, err := process.NewFirehosePublisher( os.Stdout, // DO NOT CHANGE + blockContainer, + protoMarshaller, ) if err != nil { return nil, err @@ -54,13 +58,12 @@ func CreateWSConnector(cfg config.WebSocketConfig) (process.WSConnector, error) return nil, err } - protoMarshaller := &marshal.GogoProtoMarshalizer{} - - dataAggregator, err := process.NewDataAggregator(blockContainer, blocksPool, protoMarshaller) + dataAggregator, err := process.NewDataAggregator(blocksPool) if err != nil { return nil, err } + // TODO: move to separate factory dataProcessor, err := process.NewDataProcessor(firehosePublisher, protoMarshaller, blocksPool, dataAggregator) if err != nil { return nil, err diff --git a/process/dataAggregator.go b/process/dataAggregator.go index 2263dfa..1823312 100644 --- a/process/dataAggregator.go +++ b/process/dataAggregator.go @@ -3,54 +3,28 @@ package process import ( "encoding/hex" - "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" - coreData "github.com/multiversx/mx-chain-core-go/data" - "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" - "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-ws-connector-template-go/data" ) type dataAggregator struct { - blockCreator BlockContainerHandler - blocksPool BlocksPool - marshaller marshal.Marshalizer + blocksPool BlocksPool } func NewDataAggregator( - blockCreator BlockContainerHandler, blocksPool BlocksPool, - marshaller marshal.Marshalizer, ) (*dataAggregator, error) { - if check.IfNil(blockCreator) { - return nil, errNilBlockCreator - } if check.IfNil(blocksPool) { return nil, errNilBlocksPool } - if check.IfNil(marshaller) { - return nil, errNilMarshaller - } return &dataAggregator{ - blockCreator: blockCreator, - blocksPool: blocksPool, - marshaller: marshaller, + blocksPool: blocksPool, }, nil } -func (da *dataAggregator) ProcessHyperBlock(outportBlock *outport.OutportBlock) (coreData.HeaderHandler, []byte, error) { - blockCreator, err := da.blockCreator.Get(core.HeaderType(outportBlock.BlockData.HeaderType)) - if err != nil { - return nil, nil, err - } - - header, err := block.GetHeaderFromBytes(da.marshaller, blockCreator, outportBlock.BlockData.HeaderBytes) - if err != nil { - return nil, nil, err - } - +func (da *dataAggregator) ProcessHyperBlock(outportBlock *outport.OutportBlock) (*data.HyperOutportBlock, error) { hyperOutportBlock := &data.HyperOutportBlock{} hyperOutportBlock.MetaOutportBlock = outportBlock @@ -58,12 +32,12 @@ func (da *dataAggregator) ProcessHyperBlock(outportBlock *outport.OutportBlock) for _, notarizedHash := range outportBlock.NotarizedHeadersHashes { hash, err := hex.DecodeString(notarizedHash) if err != nil { - return nil, nil, err + return nil, err } outportBlockShard, err := da.blocksPool.GetBlock(hash) if err != nil { - return nil, nil, err + return nil, err } notarizedShardOutportBlock := &data.NotarizedHeaderOutportData{ @@ -74,12 +48,7 @@ func (da *dataAggregator) ProcessHyperBlock(outportBlock *outport.OutportBlock) notarizedShardOutportBlocks = append(notarizedShardOutportBlocks, notarizedShardOutportBlock) } - marshalledData, err := da.marshaller.Marshal(hyperOutportBlock) - if err != nil { - return nil, nil, err - } - - return header, marshalledData, nil + return hyperOutportBlock, nil } // IsInterfaceNil returns true if there is no value under interface diff --git a/process/dataProcessor.go b/process/dataProcessor.go index 83318e9..2c77b2e 100644 --- a/process/dataProcessor.go +++ b/process/dataProcessor.go @@ -70,6 +70,8 @@ func (dp *dataProcessor) saveBlock(marshalledData []byte) error { return errNilOutportBlockData } + log.Info("saving block", "hash", outportBlock.BlockData.GetHeaderHash(), "shardID", outportBlock.ShardID) + if outportBlock.ShardID == core.MetachainShardId { return dp.handleMetaOutportBlock(outportBlock) } @@ -78,15 +80,12 @@ func (dp *dataProcessor) saveBlock(marshalledData []byte) error { } func (dp *dataProcessor) handleMetaOutportBlock(outportBlock *outport.OutportBlock) error { - header, marshalledData, err := dp.dataAggregator.ProcessHyperBlock(outportBlock) + hyperOutportBlock, err := dp.dataAggregator.ProcessHyperBlock(outportBlock) if err != nil { return err } - // test - headerHash := header.GetRootHash() - - err = dp.publisher.PublishHyperBlock(header, headerHash, marshalledData) + err = dp.publisher.PublishHyperBlock(hyperOutportBlock) if err != nil { return err } diff --git a/process/firehoseDataProcessor.go b/process/firehosePublisher.go similarity index 52% rename from process/firehoseDataProcessor.go rename to process/firehosePublisher.go index e6b1d1c..55b43df 100644 --- a/process/firehoseDataProcessor.go +++ b/process/firehosePublisher.go @@ -5,8 +5,12 @@ import ( "encoding/hex" "fmt" - "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/block" + "github.com/multiversx/mx-chain-core-go/marshal" logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-ws-connector-template-go/data" ) var log = logger.GetOrCreate("firehose") @@ -21,19 +25,31 @@ const ( ) type firehosePublisher struct { - writer Writer + marshaller marshal.Marshalizer + writer Writer + blockCreator BlockContainerHandler } // NewFirehosePublisher creates a data processor able to receive data from a ws outport driver and print saved blocks func NewFirehosePublisher( writer Writer, + blockCreator BlockContainerHandler, + marshaller marshal.Marshalizer, ) (*firehosePublisher, error) { if writer == nil { return nil, errNilWriter } + if check.IfNil(blockCreator) { + return nil, errNilBlockCreator + } + if check.IfNil(marshaller) { + return nil, errNilMarshaller + } fp := &firehosePublisher{ - writer: writer, + marshaller: marshaller, + writer: writer, + blockCreator: blockCreator, } _, err := fmt.Fprintf(fp.writer, "%s %s %s %s\n", firehosePrefix, initPrefix, protocolReaderVersion, protoMessageType) @@ -44,8 +60,18 @@ func NewFirehosePublisher( return fp, nil } -func (fp *firehosePublisher) PublishHyperBlock(header data.HeaderHandler, headerHash []byte, marshalledData []byte) error { - log.Info("saving block", "nonce", header.GetNonce(), "hash", headerHash) +func (fp *firehosePublisher) PublishHyperBlock(hyperOutportBlock *data.HyperOutportBlock) error { + outportBlock := hyperOutportBlock.MetaOutportBlock + + blockCreator, err := fp.blockCreator.Get(core.HeaderType(outportBlock.BlockData.HeaderType)) + if err != nil { + return err + } + + header, err := block.GetHeaderFromBytes(fp.marshaller, blockCreator, outportBlock.BlockData.HeaderBytes) + if err != nil { + return err + } blockNum := header.GetNonce() parentNum := blockNum - 1 @@ -53,16 +79,21 @@ func (fp *firehosePublisher) PublishHyperBlock(header data.HeaderHandler, header parentNum = 0 } + marshalledData, err := fp.marshaller.Marshal(hyperOutportBlock) + if err != nil { + return err + } + encodedMarshalledData := base64.StdEncoding.EncodeToString(marshalledData) - _, err := fmt.Fprintf(fp.writer, "%s %s %d %s %d %s %d %d %s\n", + _, err = fmt.Fprintf(fp.writer, "%s %s %d %s %d %s %d %d %s\n", firehosePrefix, blockPrefix, blockNum, - hex.EncodeToString(headerHash), + hex.EncodeToString(outportBlock.BlockData.HeaderHash), parentNum, hex.EncodeToString(header.GetPrevHash()), - // outportBlock.HighestFinalBlockNonce, + outportBlock.HighestFinalBlockNonce, header.GetTimeStamp(), encodedMarshalledData, ) diff --git a/process/firehoseDataProcessor_test.go b/process/firehosePublisher_test.go similarity index 100% rename from process/firehoseDataProcessor_test.go rename to process/firehosePublisher_test.go diff --git a/process/hyperBlockProcessor.go b/process/hyperBlockProcessor.go deleted file mode 100644 index f2ef9d2..0000000 --- a/process/hyperBlockProcessor.go +++ /dev/null @@ -1 +0,0 @@ -package process diff --git a/process/interface.go b/process/interface.go index 0a4e030..1a40bfc 100644 --- a/process/interface.go +++ b/process/interface.go @@ -4,9 +4,9 @@ import ( "io" "github.com/multiversx/mx-chain-core-go/core" - "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-ws-connector-template-go/data" ) // WSConnector defines a ws connector that receives incoming data and can be closed @@ -41,12 +41,12 @@ type Writer interface { } type Publisher interface { - PublishHyperBlock(header data.HeaderHandler, headerHash []byte, marshalledData []byte) error + PublishHyperBlock(hyperOutportBlock *data.HyperOutportBlock) error Close() error } type DataAggregator interface { - ProcessHyperBlock(outportBlock *outport.OutportBlock) (data.HeaderHandler, []byte, error) + ProcessHyperBlock(outportBlock *outport.OutportBlock) (*data.HyperOutportBlock, error) IsInterfaceNil() bool }