-
Notifications
You must be signed in to change notification settings - Fork 1
/
forkchecker.go
176 lines (144 loc) · 4.84 KB
/
forkchecker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main
import (
"context"
"fmt"
"log"
"math"
"time"
"github.com/proximax-storage/go-xpx-chain-sdk/sdk"
"github.com/proximax-storage/go-xpx-chain-sdk/tools/health"
"github.com/proximax-storage/go-xpx-chain-sdk/tools/health/packets"
crypto "github.com/proximax-storage/go-xpx-crypto"
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
)
type ForkChecker struct {
cfg Config
alertManager *AlertManager
catapultClient *sdk.Client
nodePool *health.NodeHealthCheckerPool
checkpoint uint64
}
func NewForkChecker(config Config) (*ForkChecker, error) {
fc := &ForkChecker{cfg: config}
if err := fc.initCatapultClient(); err != nil {
return nil, fmt.Errorf("failed to initialize catapult client: %v", err)
}
if err := fc.initAlertManager(); err != nil {
return nil, fmt.Errorf("failed to initialize alert manager: %v", err)
}
if err := fc.initPool(); err != nil {
return nil, fmt.Errorf("failed to initialize node health checker pool: %v", err)
}
if err := fc.initCheckpoint(); err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint: %v", err)
}
return fc, nil
}
func (fc *ForkChecker) initCheckpoint() error {
if fc.cfg.Checkpoint != 0 {
fc.checkpoint = fc.cfg.Checkpoint
} else {
height, err := fc.catapultClient.Blockchain.GetBlockchainHeight(context.Background())
if err != nil {
return fmt.Errorf("error getting blockchain height: %v", err)
}
fc.checkpoint = uint64(height)
}
log.Println("Initialized checkpoint:", fc.checkpoint)
return nil
}
func (fc *ForkChecker) initPool() error {
clientKeyPair, err := crypto.NewRandomKeyPair()
if err != nil {
return fmt.Errorf("error generating random keypair: %s", err)
}
fc.nodePool = health.NewNodeHealthCheckerPool(
clientKeyPair,
packets.NoneConnectionSecurity,
math.MaxInt,
)
return nil
}
func (fc *ForkChecker) initAlertManager() error {
nodeInfos, err := parseNodes(fc.cfg.Nodes)
if err != nil {
return fmt.Errorf("error parsing node info: %v", err)
}
bot, err := tgbotapi.NewBotAPI(fc.cfg.BotAPIKey)
if err != nil {
return fmt.Errorf("failed to initialize telegram bot: %w", err)
}
bot.Debug = false
fc.alertManager = &AlertManager{
config: fc.cfg.AlertConfig,
lastAlertTimes: make(map[AlertType]time.Time),
offlineNodeStats: make(map[string]NodeStatus),
nodeInfos: nodeInfos,
notifier: &Notifier{
bot: bot,
chatID: fc.cfg.ChatID,
enabled: fc.cfg.Notify,
},
}
return nil
}
func (fc *ForkChecker) initCatapultClient() error {
var conf *sdk.Config
var err error
for _, url := range fc.cfg.ApiUrls {
conf, err = sdk.NewConfig(context.Background(), []string{url})
if err == nil {
log.Printf("Initialized client on URL: %s", url)
fc.catapultClient = sdk.NewClient(nil, conf)
return nil
}
}
return fmt.Errorf("all provided URLs failed: %v", err)
}
func (fc *ForkChecker) Start() error {
for {
failedConnectionsNodes, err := fc.nodePool.ConnectToNodes(fc.alertManager.nodeInfos, fc.cfg.Discover)
if err != nil {
log.Printf("error connecting to nodes: %s", err)
continue
}
// Trigger alert if offline nodes include bootstrap nodes or API nodes.
fc.alertManager.handleOfflineAlert(failedConnectionsNodes)
notReached, reached, err := fc.nodePool.WaitHeight(fc.checkpoint)
if err != nil {
log.Printf("error waiting for connected nodes to reach %d height: %s", fc.checkpoint, err)
continue
}
// Trigger alert if the following conditions are met:
// - No nodes have synced to the checkpoint height for X minutes (stuck alert)
// - Among the out-of-sync nodes, there are Y or more bootstrap or API nodes that are Z blocks or more behind the chain's highest height.
// X, Y, Z values are configurable in the config.json file:
// X - stuckDurationThreshold
// Y - outOfSyncCriticalNodesThreshold
// Z - outOfSyncBlocksThreshold
fc.alertManager.handleSyncAlert(fc.checkpoint, notReached, reached)
// Skip incrementing checkpoint if the chain is stuck.
if len(reached) == 0 {
log.Printf("Chain is stuck! No nodes reached height: %d", fc.checkpoint)
continue
}
log.Printf("Checking block hash at %d height", fc.checkpoint)
hashes, err := fc.nodePool.CompareHashes(fc.checkpoint)
// Trigger alert if the hashes of the last confirmed block are not the same.
if err != nil {
switch err {
case health.ErrHashesAreNotTheSame:
log.Printf("hashes are not the same at %d height: %v", fc.checkpoint, hashes)
fc.alertManager.handleHashAlert(fc.checkpoint, hashes)
case health.ErrNoConnectedPeers:
log.Printf("error comparing hashes for connected nodes at %d height: %s", fc.checkpoint, err)
continue
default:
log.Printf("unexpected error when comparing hashes at %d height: %s", fc.checkpoint, err)
continue
}
}
// Update checkpoint
fc.checkpoint += fc.cfg.HeightCheckInterval
}
}