generated from multiversx/mx-chain-ws-connector-template-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfirehoseDataProcessor.go
124 lines (103 loc) · 3.08 KB
/
firehoseDataProcessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package process
import (
"encoding/hex"
"fmt"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/marshal"
logger "github.com/multiversx/mx-chain-logger-go"
)
var log = logger.GetOrCreate("firehose")
const (
firehosePrefix = "FIRE"
beginBlockPrefix = "BLOCK_BEGIN"
endBlockPrefix = "BLOCK_END"
)
type dataProcessor struct {
marshaller marshal.Marshalizer
operationHandlers map[string]func(marshalledData []byte) error
writer Writer
blockCreator BlockContainerHandler
}
// NewFirehoseDataProcessor creates a data processor able to receive data from a ws outport driver and print saved blocks
func NewFirehoseDataProcessor(
writer Writer,
blockCreator BlockContainerHandler,
marshaller marshal.Marshalizer,
) (DataProcessor, error) {
if writer == nil {
return nil, errNilWriter
}
if check.IfNil(blockCreator) {
return nil, errNilBlockCreator
}
if check.IfNil(marshaller) {
return nil, errNilMarshaller
}
dp := &dataProcessor{
marshaller: marshaller,
writer: writer,
blockCreator: blockCreator,
}
dp.operationHandlers = map[string]func(marshalledData []byte) error{
outport.TopicSaveBlock: dp.saveBlock,
}
return dp, nil
}
// ProcessPayload will process the received payload only for TopicSaveBlock, otherwise ignores it.
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string, _ uint32) error {
operationHandler, found := dp.operationHandlers[topic]
if !found {
return nil
}
return operationHandler(payload)
}
func (dp *dataProcessor) saveBlock(marshalledData []byte) error {
outportBlock := &outport.OutportBlock{}
err := dp.marshaller.Unmarshal(outportBlock, marshalledData)
if err != nil {
return err
}
if outportBlock == nil || outportBlock.BlockData == nil {
return errNilOutportBlockData
}
blockCreator, err := dp.blockCreator.Get(core.HeaderType(outportBlock.BlockData.HeaderType))
if err != nil {
return err
}
header, err := block.GetHeaderFromBytes(dp.marshaller, blockCreator, outportBlock.BlockData.HeaderBytes)
if err != nil {
return err
}
log.Info("firehose: saving block", "nonce", header.GetNonce(), "hash", outportBlock.BlockData.HeaderHash)
_, err = fmt.Fprintf(dp.writer, "%s %s %d\n",
firehosePrefix,
beginBlockPrefix,
header.GetNonce(),
)
if err != nil {
return fmt.Errorf("could not write %s prefix , err: %w", beginBlockPrefix, err)
}
_, err = fmt.Fprintf(dp.writer, "%s %s %d %s %d %x\n",
firehosePrefix,
endBlockPrefix,
header.GetNonce(),
hex.EncodeToString(header.GetPrevHash()),
header.GetTimeStamp(),
marshalledData,
)
if err != nil {
return fmt.Errorf("could not write %s prefix , err: %w", endBlockPrefix, err)
}
return nil
}
// Close will close the internal writer
func (dp *dataProcessor) Close() error {
return dp.writer.Close()
}
// IsInterfaceNil checks if the underlying pointer is nil
func (dp *dataProcessor) IsInterfaceNil() bool {
return dp == nil
}