forked from CrowdStrike/gotel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitoring.go
386 lines (341 loc) · 9.88 KB
/
monitoring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
package gotel
import (
"database/sql"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strings"
"time"
)
type alerter interface {
Alert(res reservation) bool
Name() string
Bootstrap()
}
var (
// holds info so we don't spam alerters every n seconds
sentAlerts = make(map[string]time.Time)
// this designates the instance as the coordinating instanceS
coordinator = false
// stores a slice of alerter functions to call when we have an alert
alertFuncs = []alerter{}
cfg config
// our current IP address
myIP string
)
func init() {
var err error
myIP, err = externalIP()
if err != nil {
myIP = "N/A"
l.err("Unable to aquire own IP Address")
}
}
// Monitor checks existing reservations for late arrivals
func Monitor(db *sql.DB) {
if !coordinator {
coordinator = isCoordinator(db)
}
printCoordinatorStatus()
jobChecker(db)
}
// InitializeMonitoring sets up alerters based on configuration
func InitializeMonitoring(c config, db *sql.DB) {
cfg = c
if cfg.Smtp.Enabled {
smtp := new(smtpAlerter)
smtp.Cfg = c
alertFuncs = append(alertFuncs, smtp)
} else {
l.info("SMTP Alerting disabled")
}
if cfg.Pagerduty.Enabled {
pd := new(pagerDutyAlerter)
pd.Cfg = c
alertFuncs = append(alertFuncs, pd)
} else {
l.info("PagerDuty Alerting disabled")
}
for _, alerter := range alertFuncs {
alerter.Bootstrap()
}
// set up a ticker that runs every day that checks to clean up old logs to preserve disk space
ticker := time.NewTicker(24 * time.Hour)
go func() {
for t := range ticker.C {
if coordinator {
l.info("Running log cleanup at [%v]", t)
cleanUp(db, c.Main.DaysToStoreLogs)
}
}
}()
}
//--------------------- PRIVATE FUNCS ------------------------------
func hasLock(db *sql.DB) bool {
var lck int
query := "SELECT GET_LOCK('gotel_lock', 3) as lck"
rows, err := db.Query(query)
if err != nil {
l.warn("Unable to aquire lock\n")
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&lck)
if err != nil {
l.warn("Unable to aquire locking rows\n")
}
if lck == 1 {
// holds a lock while the connection is alive
l.info("Lock Aquired")
return true
}
l.info("Unable to aquire coordinator lock. I must be a worker [%v]", lck)
}
return false
}
func releaseLock(db *sql.DB) (bool, error) {
releaseQuery := "SELECT RELEASE_LOCK('gotel_lock');"
rows, err := db.Query(releaseQuery)
if err != nil {
l.warn("Unable to release lock\n")
return false, errors.New("Unable to release lock")
}
defer rows.Close()
return true, nil
}
// attempt to aquire coordinator lock to indicate this node should do the job checking
// in the future I'd like to have a zookeeper integration for a more "true" leader election scheme
// this is somewhat of a quickstart method so people don't have to also have ZK in their env
// split brain would be detected by the coordinator not checking in so we'd be firing off an alert
func isCoordinator(db *sql.DB) bool {
coordinatorNodeCnt := 0
lockAquired := hasLock(db)
if lockAquired {
var (
ipAddress string
nodeID int64
)
rows, err := db.Query("select ip_address, node_id from nodes")
if err != nil {
l.err("Unable to select nodes [%v]", err)
return false
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&ipAddress, &nodeID)
if err != nil {
l.err("Unable to scan node rows [%v]", err)
return false
}
if ipAddress == myIP {
continue
}
// check to see if we have any other coordinator nodes, or am i it?
l.info("Checking ip [%s] for coordinator status", ipAddress)
resp, err := http.Get(fmt.Sprintf("http://%s:8080/is-coordinator", ipAddress))
if err != nil {
l.warn("Unable to contact node [%s] assuming offline", ipAddress)
removeNode(db, ipAddress)
continue
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
l.warn("Didn't get a 200OK reply back from ip [%s]", ipAddress)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
l.warn("Unable to read node response")
}
if string(body) == "true" {
l.info("IP [%s] is reporting as a coordinator", ipAddress)
coordinatorNodeCnt++
}
l.info("ip [%s] http coordinator check returned [%s]", ipAddress, body)
}
insertSelf(db)
releaseLock(db)
if coordinatorNodeCnt == 0 {
// I'm the coordinator!
return true
}
}
return false
}
func removeNode(db *sql.DB, ipAddress string) {
external, err := externalIP()
if external == ipAddress {
return
}
if err != nil {
l.warn("Unable to delete offline node [%v]", err)
}
stmt, err := db.Prepare("DELETE FROM nodes WHERE ip_address=?")
if err != nil {
l.warn("Unable to prepare record %s", err)
}
defer stmt.Close()
_, err = stmt.Exec(ipAddress)
if err != nil {
l.warn("Unable to run delete operation for remove node [%v]", err)
}
l.info("Node [%s] was removed from DB", ipAddress)
}
func insertSelf(db *sql.DB) {
ip, err := externalIP()
if err != nil {
l.err("Unable to get external IP [%v]", err)
}
rand.Seed(time.Now().UnixNano())
seedID := rand.Intn(10000)
stmt, err := db.Prepare("insert into nodes(ip_address, node_id) values(?, ?)")
if err != nil {
l.err("Unable to prepare insertself record %s", err)
return
}
defer stmt.Close()
_, err = stmt.Exec(ip, seedID)
if err != nil {
if strings.Contains(err.Error(), "Duplicate entry") {
l.info("[%s] has already registered as a node", ip)
} else {
l.warn("Unable to insert insertself record %s", err)
}
return
}
}
func printCoordinatorStatus() {
if coordinator {
l.info("I am the coordinator node!\n")
} else {
l.info("I am the worker node!\n")
}
}
// checks jobs and sends to workers to check on last update time
// we're not on the master we want to monitor the master to make sure it's running it's job checker
// mode will be master if the main jobs should run on this node
func jobChecker(db *sql.DB) {
var query string
if coordinator {
query = "select id, app, component, owner, notify, frequency, time_units, last_checkin_timestamp from reservations"
} else {
// if we're a worker we just want to monitor the co-ordinator
query = "select id, app, component, owner, notify, frequency, time_units, last_checkin_timestamp from reservations where app='gotel' and component='coordinator'"
}
rows, err := db.Query(query)
defer rows.Close()
if err != nil {
l.err("Unable to run job checker [%v]", err)
return
}
for rows.Next() {
res := reservation{}
rows.Scan(&res.JobID, &res.App, &res.Component, &res.Owner, &res.Notify, &res.Frequency, &res.TimeUnits, &res.LastCheckin)
if FailsSLA(res) {
for _, alerter := range alertFuncs {
if !alreadySentRecently(res, alerter.Name()) {
if alerter.Alert(res) {
updateSentRecently(res, alerter.Name())
storeAlert(res, db, []string{alerter.Name()})
}
} else {
l.info("Already sent alert for [%s/%s/%s]", res.App, res.Component, alerter.Name())
}
}
}
}
storeJobRun(db)
}
func (r *reservation) mapKey(alerterName string) string {
return r.App + r.Component + alerterName
}
// check to see if we've already sent this alert recently
func alreadySentRecently(res reservation, alerterName string) bool {
timeNow := time.Now().UTC()
waitForNotifyTime := time.Duration(cfg.Main.HoursBetweenAlerts) * time.Hour
lastSent, ok := sentAlerts[res.mapKey(alerterName)]
if !ok {
return false
}
// check to see if the time elapsed goes over our threshold
duration := timeNow.Sub(lastSent)
if duration >= waitForNotifyTime {
return false
}
return true
}
func updateSentRecently(res reservation, alerterName string) {
sentAlerts[res.mapKey(alerterName)] = time.Now()
}
// FailsSLA monitors the reservations and determines if any jobs haven't checked in within
// their allotted timeframe
func FailsSLA(res reservation) bool {
l.info("Working on app [%s] component [%s]", res.App, res.Component)
timeNow := time.Now().UTC()
startTime := time.Unix(res.LastCheckin, 0)
secondsAgo := int(timeNow.Sub(startTime).Seconds())
if secondsAgo > getSecondsFromUnits(res.Frequency, res.TimeUnits) {
// send job to alert on
l.info("App Failed SLA [%s/%s] that is: %d seconds old\n", res.App, res.Component, secondsAgo)
return true
}
return false
}
func storeAlert(res reservation, db *sql.DB, alerters []string) {
now := time.Now().UTC().Unix()
altertNames := strings.Join(alerters, ",")
stmt, err := db.Prepare("insert into alerts(app, component, alert_time, alerters) values(?, ?, ?, ?)")
if err != nil {
l.info("[ERROR] Unable to prepare storealert record %s", err)
return
}
defer stmt.Close()
_, err = stmt.Exec(res.App, res.Component, now, altertNames)
if err != nil {
l.info("[ERROR] Unable to insert alert record %s", err)
return
}
}
func storeJobRun(db *sql.DB) {
mode := "worker"
if coordinator {
mode = "coordinator"
}
now := time.Now().UTC().Unix()
l.info("Storing job run, mode: [%s]\n", mode)
storeCheckin(db, checkin{
App: "gotel",
Component: mode,
}, now)
}
// Cleanup should run on a scheduled ticker to allow GoTel to clean up after itself to prevent disk space issues in the
// DB as the process is meant to run for years.
func cleanUp(db *sql.DB, daysToStoreLogs int) {
// grab the unix time that was daysToStoreLogs ago, cleanup anything older than that to keep db size down
timeNow := time.Now().UTC().AddDate(0, 0, -daysToStoreLogs).Unix()
// clean up housekeeping
stmt, err := db.Prepare("DELETE FROM housekeeping WHERE last_checkin_timestamp < ?")
if err != nil {
l.err("Unable to prepare cleaup housekeeping statement")
return
}
defer stmt.Close()
_, err = stmt.Exec(timeNow)
if err != nil {
l.err("Unable cleanup old housekeeping logs, this could be bad [%v]", err)
return
}
// clean up alerts
stmt, err = db.Prepare("DELETE FROM alerts WHERE alert_time < ?")
if err != nil {
l.err("Unable to prepare cleaup alerts statement")
return
}
defer stmt.Close()
_, err = stmt.Exec(timeNow)
if err != nil {
l.err("Unable cleanup old alerts logs, this could be bad [%v]", err)
return
}
}