-
Notifications
You must be signed in to change notification settings - Fork 4
/
redis_sentinel_k8s.go
433 lines (358 loc) · 11.9 KB
/
redis_sentinel_k8s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"net"
"os"
"sort"
"strconv"
"strings"
"sync"
"encoding/json"
"time"
log "github.com/golang/glog"
"github.com/mediocregopher/radix.v2/redis"
)
//Constants to be used in the program
const (
COLLECTSTATS_INVALID_INPUT = "Invalid Endpoint"
COLLECTSTATS_SERVER_NOT_REACHABLE = "Redis Server Not Reachable"
REDIS_ROLE_MASTER = "master"
REDIS_ROLE_SLAVE = "slave"
LOOKUP_RETIRES = 10
)
//Redis This type will contain the parsed info of each and every redis-instance we are going to operate on
type Redis struct {
EndPoint string //End point of the redis-server
Role string //Role of this redis-sever Master or Slave
LastUpdated int // When did last sync happened seconds
MasterDownSince int //Since how long the master is not reachable?
SyncBytes int64 //How much of data did this sync
MasterHost string //Masters ip addres
MasterPort string //Master port
Priority int //Slave priority
MasterLinkStatus bool //true means up false mean down
Client *redis.Client //Redis client
}
//RedisSlaves Array of redis we need this to make sorting easier
type RedisSlaves []*Redis
func (rs RedisSlaves) Len() int { return len(rs) }
func (rs RedisSlaves) Swap(i int, j int) {
var tmp *Redis
tmp = rs[i]
rs[i] = rs[j]
rs[j] = tmp
}
func (rs RedisSlaves) Less(i int, j int) bool {
//Choose the slave with least priority
if rs[i].Priority != 0 && rs[j].Priority != 0 {
if rs[i].Priority < rs[j].Priority {
return true
}
}
//Choose the slave with maximum SyncBytes
if rs[i].SyncBytes > rs[j].SyncBytes {
return true
}
//Choose the slave with least Updated time
if rs[i].LastUpdated < rs[j].LastUpdated {
return true
}
return false
}
//ParseResponse This function will convert the text output of 'info replication' and populate fields in the type R
func (R *Redis) ParseResponse(Res string) bool {
res := strings.Split(Res, "\\r\\n")
if len(res) == 1 {
log.Errorf("ParseResponse(): Invalid Redis-server response. Nothing to parse")
return false
}
for _, field := range res {
kv := strings.Split(field, ":")
if len(kv) == 2 {
switch kv[0] {
case "role":
R.Role = kv[1]
case "master_host":
R.MasterHost = kv[1]
case "master_port":
R.MasterPort = kv[1]
case "slave_repl_offset":
i, err := strconv.Atoi(kv[1])
if err == nil {
R.SyncBytes = int64(i)
}
case "master_repl_offset":
i, err := strconv.Atoi(kv[1])
if err == nil && i > 0 {
R.SyncBytes = int64(i)
}
case "master_link_down_since_seconds":
i, err := strconv.Atoi(kv[1])
if err == nil {
R.MasterDownSince = i
}
case "master_link_status":
if kv[1] == "on" || kv[1] == "up" {
R.MasterLinkStatus = true
} else {
R.MasterLinkStatus = false
}
case "master_last_io_seconds_ago":
i, err := strconv.Atoi(kv[1])
if err == nil {
R.LastUpdated = i
}
case "slave_priority":
i, err := strconv.Atoi(kv[1])
if err == nil {
R.Priority = i
}
}
}
}
fmt.Printf("R=%+v\n", R)
return true
}
//CollectStats This function will take the endpoint
func CollectStats(EndPoint string) (*Redis, error) {
var R Redis
//if this is comming for kuberntes add the prot number yourself
if strings.Contains(EndPoint, "svc") {
epList := strings.Split(EndPoint, ".")
if len(epList) > 3 {
//Endpoints are of the form hostname.podname.namespace.domain
EndPoint = strings.Join(epList[:3], ".")
}
EndPoint += ":6379"
}
//Check if the supplied EP is valid
if len(strings.Split(EndPoint, ":")) != 2 {
return nil, fmt.Errorf(COLLECTSTATS_INVALID_INPUT)
}
//Try to connect to the redis-servers
C, err := redis.Dial("tcp", EndPoint)
if err != nil {
log.Infof("CollectStats(%s) %s Error:%v", EndPoint, COLLECTSTATS_SERVER_NOT_REACHABLE, err)
return nil, fmt.Errorf(COLLECTSTATS_SERVER_NOT_REACHABLE)
}
Res := C.Cmd("INFO", "REPLICATION")
log.Infof("CollectStats(%s)=%v", EndPoint, Res.String())
R.ParseResponse(Res.String())
R.EndPoint = EndPoint
R.Client = C
return &R, nil
}
//CollectStatsAll Contact all the redis containers and collect statistics required to perform a Slave Promotion
func CollectStatsAll(EndPoints []string) []*Redis {
var Servers []*Redis
var wg sync.WaitGroup
var lck sync.Mutex
for _, S := range EndPoints {
log.Infof("Processing %v", S)
wg.Add(1)
go func(S string) {
defer wg.Done()
R, err := CollectStats(S)
if err == nil {
lck.Lock()
Servers = append(Servers, R)
lck.Unlock()
} else {
log.Warningf("Error collecting stats for %v Error=%v", S, err)
}
}(S)
}
wg.Wait()
return Servers
}
//FindNxtMaster This function will return next suitable master if there is such a situation otherwise it simply returns nil, for instance if the supplied list of containrs already form a proper master-slave cluster then it will leave the setup intact.
func FindNxtMaster(Servers []*Redis) (*Redis, *Redis) {
var Slaves []*Redis
//Check if Master is already there
var isMasterAvailable bool
var availableMaster *Redis
var availableMasterHits int
//Loop through all the servers and find of if there is already a master
for _, rs := range Servers {
//TODO: There might be a situation where there are multiple mis-configured masters, should handle that later
if strings.Contains(rs.Role, REDIS_ROLE_MASTER) {
isMasterAvailable = true
availableMaster = rs
break
}
}
for _, rs := range Servers {
if isMasterAvailable {
if rs.EndPoint != availableMaster.EndPoint {
Slaves = append(Slaves, rs)
log.Infof("RSMaster_EP=%s available MasterEP=%v", rs.MasterHost+":"+rs.MasterPort, availableMaster.EndPoint)
if (rs.MasterHost+":"+rs.MasterPort == availableMaster.EndPoint) && rs.MasterLinkStatus {
availableMasterHits++
}
}
} else {
Slaves = append(Slaves, rs)
}
}
//If master is available check if its already pouparly configured
if isMasterAvailable {
if availableMaster.SyncBytes > 0 && availableMasterHits == len(Slaves) {
//Looks like the master is active and configured properly
log.Warningf("The redis master is already configured, dont do anything SyncBytes=%v availableMasterHits=%v len(Slaves)=%v", availableMaster.SyncBytes, availableMasterHits, len(Slaves))
return availableMaster, nil
}
log.Warningf("A Redis master is found, but misconfigured, considering it as a slave")
Slaves = append(Slaves, availableMaster)
}
if len(Slaves) == 0 {
return availableMaster, nil
}
//Sort the slaves according to the parameters
sort.Sort(RedisSlaves(Slaves))
//return the selected slaves
return nil, Slaves[0]
}
//PromoteASlave It will look at all the eligible redis-servers and promote the most eligible one as a new master
func PromoteASlave(NewMaster *Redis, Servers []*Redis) bool {
result := true
//Make the slave as the master first
resp := NewMaster.Client.Cmd("SLAVEOF", "NO", "ONE").String()
if !strings.Contains(resp, "OK") {
log.Errorf("Unable to make the slave as master response=%v", resp)
return false
}
hostPort := strings.Split(NewMaster.EndPoint, ":")
NewMaster.MasterHost = hostPort[0]
NewMaster.MasterPort = hostPort[1]
for _, rs := range Servers {
if rs.EndPoint == NewMaster.EndPoint {
continue
}
resp = rs.Client.Cmd("SLAVEOF", NewMaster.MasterHost, NewMaster.MasterPort).String()
if !strings.Contains(resp, "OK") {
log.Errorf("Unable to make the slave point to new master response=%v", resp)
return false
}
//Make the slaves replication timeout as small as possible
resp = rs.Client.Cmd("config", "set", "repl-ping-slave-period", "1").String()
if !strings.Contains(resp, "OK") {
log.Errorf("Unable to make slave ping frequenc to 1 second=%v", resp)
return false
}
resp = rs.Client.Cmd("config", "set", "repl-timeout", "5").String()
if !strings.Contains(resp, "OK") {
log.Errorf("Unable to make replication timout to 5 seconds = %v", resp)
return false
}
}
return result
}
//LookupSrv Given a Kubernetes service name lookup its endpoints
func LookupSrv(svcName string) ([]string, error) {
var endpoints []string
log.V(2).Infof("lookup(%s)", svcName)
_, srvRecords, err := net.LookupSRV("", "", svcName)
if err != nil {
return endpoints, err
}
for _, srvRecord := range srvRecords {
// The SRV records ends in a "." for the root domain
ep := fmt.Sprintf("%v", srvRecord.Target[:len(srvRecord.Target)-1])
endpoints = append(endpoints, ep)
}
return endpoints, nil
}
//PrintServers A Debug function that prints state of the redis servers along with supplied 'message'
func PrintServers(message string, Servers []*Redis) {
var result string
result = fmt.Sprintf("PrintServers()\n")
result += fmt.Sprintf("******%s******\n", message)
r, _ := json.MarshalIndent(Servers, "", " ")
result += string(r)
result += fmt.Sprintf("*****************\n")
log.V(2).Info(result)
}
func main() {
svc := flag.String("service", "cache", "Provide the redis statefulset's service name")
flag.Set("logtostderr", "true")
flag.Parse()
//Lookup for the provided service, if not available, retry a few times
lookupRetry := 0
var ServersEndPoint []string
var err error
for {
ServersEndPoint, err = LookupSrv(*svc)
if err != nil {
if lookupRetry <= LOOKUP_RETIRES {
log.Errorf("Unable to lookup for the service err:%v", err)
os.Exit(1)
}
log.Infof("Service not ready retrying after 5 seconds err=%V", err)
time.Sleep(time.Second * 5)
continue
}
break
}
log.Infof("Available endpoints are %v", ServersEndPoint)
//Collect stats on all the redis-servers supplied
Servers := CollectStatsAll(ServersEndPoint)
if len(Servers) == 0 {
log.Infof("The cluster is empty or all redis-servers are not reachable")
os.Exit(0)
}
PrintServers("Supplied Servers", Servers)
//Does it really need a master
OldMaster, NewMaster := FindNxtMaster(Servers)
log.Infof("OldMaster=%+v NewMaster=%+v", OldMaster, NewMaster)
if NewMaster == nil && OldMaster != nil {
log.Errorf("Redis Instance doesn't need a Slave Promotion")
NewMaster = OldMaster
// When master is found, Endpoint is set but host and port are not
NewMaster.MasterHost, NewMaster.MasterPort, err = net.SplitHostPort(NewMaster.EndPoint)
if err != nil {
log.Infof("Error getting host and port from host (%s): err :%v", NewMaster.EndPoint, err)
os.Exit(1)
}
} else if OldMaster == nil && NewMaster != nil {
//Now we have a potential master
if !PromoteASlave(NewMaster, Servers) {
PrintServers("In-consistantly configured", Servers)
log.Errorf("Error occured in Slave Promotion")
os.Exit(1)
}
log.Infof("New Master is %v, All the slaves are re-configured to replicate from this", NewMaster.EndPoint)
PrintServers("Processed Servers", Servers)
} else {
//Both are nil or both are valid
log.Errorf("Inconsitant Redis Cluster")
return
}
//At this point
//write the master information to /config/master.txt
f, err := os.Create("/config/master.txt")
if err != nil {
log.Errorf("Unable to open the config file err:%v", err)
os.Exit(1)
}
defer f.Close()
log.Infof("OldMaster=%+v NewMaster=%+v", OldMaster, NewMaster)
_, err = f.WriteString(fmt.Sprintf("%v %v", NewMaster.MasterHost, NewMaster.MasterPort))
if err != nil {
log.Errorf("Error writing to the config file err:%v", err)
os.Exit(1)
}
log.Infof("Redis-Sentinal-micro Finished")
}