-
Notifications
You must be signed in to change notification settings - Fork 18
/
main.go
318 lines (281 loc) · 9.69 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"log"
"math"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/IBM/sarama"
"github.com/kouhin/envflag"
"github.com/prometheus/client_golang/prometheus"
)
var (
activeOnly = flag.Bool("active-only", false, "Show only consumers with an active consumer protocol.")
kafkaBrokers = flag.String("kafka-brokers", "localhost:9092", "Comma separated list of kafka brokers.")
prometheusAddr = flag.String("prometheus-addr", ":7979", "Prometheus listen interface and port.")
refreshInt = flag.Int("refresh-interval", 15, "Time between offset refreshes in seconds.")
saslUser = flag.String("sasl-user", "", "SASL username.")
saslPass = flag.String("sasl-pass", "", "SASL password.")
debug = flag.Bool("debug", false, "Enable debug output.")
algorithm = flag.String("algorithm", "", "The SASL algorithm sha256 or sha512 as mechanism")
enableCurrentOffset = flag.Bool("enable-current-offset", false, "Enables metrics for current offset of a consumer group")
enableNewAPI = flag.Bool("enable-new-api", false, "Enables new API, which allows to use optimized Kafka API calls")
groupPattern = flag.String("group-pattern", "", "Regular expression to filter consumer groups")
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
keyFile = flag.String("key", "", "The optional key file for client authentication")
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
skipVerifySSL = flag.Bool("skip-verify", false, "Optional verify ssl certificates chain")
useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
)
type TopicSet map[string]map[int32]int64
func init() {
envflag.SetMinLength(2)
if err := envflag.Parse(); err != nil {
log.Fatal(err)
}
if *debug {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
}
func createTLSConfiguration() (t *tls.Config) {
t = &tls.Config{
InsecureSkipVerify: *skipVerifySSL,
}
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := os.ReadFile(*caFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *skipVerifySSL,
}
}
return t
}
func main() {
go func() {
var cycle uint8
var groupRegexp *regexp.Regexp
if *groupPattern != "" {
var err error
groupRegexp, err = regexp.Compile(*groupPattern)
if err != nil {
log.Fatal("Failed to compile regex")
}
}
config := sarama.NewConfig()
config.ClientID = "kafka-offset-lag-for-prometheus"
config.Version = sarama.V0_9_0_0
if *enableNewAPI {
config.Version = sarama.V1_0_0_0
}
if *saslUser != "" {
config.Net.SASL.Enable = true
config.Net.SASL.User = *saslUser
config.Net.SASL.Password = *saslPass
}
if *algorithm == "sha512" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
} else if *algorithm == "sha256" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
}
if *useTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config = createTLSConfiguration()
}
client, err := sarama.NewClient(strings.Split(*kafkaBrokers, ","), config)
if err != nil {
log.Fatal("Unable to connect to given brokers.")
}
defer client.Close()
for {
topicSet := make(TopicSet)
time.Sleep(time.Duration(*refreshInt) * time.Second)
timer := prometheus.NewTimer(LookupHist)
client.RefreshMetadata()
// First grab our topics, all partiton IDs, and their offset head
topics, err := client.Topics()
if err != nil {
log.Printf("Error fetching topics: %s", err.Error())
continue
}
for _, topic := range topics {
// Don't include internal topics
if strings.HasPrefix(topic, "__") {
continue
}
partitions, err := client.Partitions(topic)
if err != nil {
log.Printf("Error fetching partitions: %s", err.Error())
continue
}
if *debug {
log.Printf("Found topic '%s' with %d partitions", topic, len(partitions))
}
topicSet[topic] = make(map[int32]int64)
for _, partition := range partitions {
toff, err := client.GetOffset(topic, partition, sarama.OffsetNewest)
if *debug {
log.Printf("topic:partition:offset %s:%d:%d", topic, partition, toff)
}
if err != nil {
log.Printf("Problem fetching offset for topic '%s', partition '%d'", topic, partition)
continue
}
topicSet[topic][partition] = toff
}
}
// Prometheus SDK never TTLs out data points so tmp consumers last
// forever. Ugly hack to clean up data points from time to time.
if cycle >= 99 {
OffsetLag.Reset()
CurrentOffset.Reset()
cycle = 0
}
cycle++
var wg sync.WaitGroup
// Now lookup our group data using the metadata
for _, broker := range client.Brokers() {
// Sarama plays russian roulette with the brokers
broker.Open(client.Config())
_, err := broker.Connected()
if err != nil {
log.Printf("Could not speak to broker %s. Your advertised.listeners may be incorrect.", broker.Addr())
continue
}
wg.Add(1)
go func(broker *sarama.Broker) {
defer wg.Done()
if *enableNewAPI {
refreshBrokerV2(broker, client, groupRegexp)
} else {
refreshBroker(broker, topicSet, groupRegexp)
}
}(broker)
}
wg.Wait()
timer.ObserveDuration()
}
}()
prometheusListen(*prometheusAddr)
}
func refreshBroker(broker *sarama.Broker, topicSet TopicSet, groupRegexp *regexp.Regexp) {
groupsRequest := new(sarama.ListGroupsRequest)
groupsResponse, err := broker.ListGroups(groupsRequest)
if err != nil {
log.Printf("Could not list groups: %s\n", err.Error())
return
}
for group, ptype := range groupsResponse.Groups {
// do we want to filter by active consumers?
if *activeOnly && ptype != "consumer" {
if *debug {
log.Printf("Skipped group %s because it is not a consumer", group)
}
continue
}
if groupRegexp != nil && !groupRegexp.MatchString(group) {
if *debug {
log.Printf("Skipping group '%s' because it does not match regexp '%s'", group, *groupRegexp)
}
continue
}
// This is not very efficient but the kafka API sucks
for topic, data := range topicSet {
offsetsRequest := new(sarama.OffsetFetchRequest)
offsetsRequest.Version = 1
offsetsRequest.ConsumerGroup = group
for partition := range data {
offsetsRequest.AddPartition(topic, partition)
}
offsetsResponse, err := broker.FetchOffset(offsetsRequest)
if err != nil {
log.Printf("Could not get offset: %s\n", err.Error())
}
for _, blocks := range offsetsResponse.Blocks {
for partition, block := range blocks {
if *debug {
log.Printf("Discovered group: %s, topic: %s, partition: %d, offset: %d\n", group, topic, partition, block.Offset)
}
// Offset will be -1 if the group isn't active on the topic
if block.Offset >= 0 {
// Because our offset operations aren't atomic we could end up with a negative lag
lag := math.Max(float64(data[partition]-block.Offset), 0)
OffsetLag.With(prometheus.Labels{
"topic": topic, "group": group,
"partition": strconv.FormatInt(int64(partition), 10),
}).Set(lag)
if *enableCurrentOffset {
CurrentOffset.With(prometheus.Labels{
"topic": topic, "group": group,
"partition": strconv.FormatInt(int64(partition), 10),
}).Set(math.Max(float64(block.Offset), 0))
}
}
}
}
}
}
}
func refreshBrokerV2(broker *sarama.Broker, client sarama.Client, groupRegexp *regexp.Regexp) {
groupsRequest := new(sarama.ListGroupsRequest)
groupsResponse, err := broker.ListGroups(groupsRequest)
if err != nil {
log.Printf("Could not list groups: %s\n", err.Error())
return
}
for group, ptype := range groupsResponse.Groups {
// do we want to filter by active consumers?
if *activeOnly && ptype != "consumer" {
continue
}
if groupRegexp != nil && !groupRegexp.MatchString(group) {
continue
}
offsetsRequest := new(sarama.OffsetFetchRequest)
offsetsRequest.Version = 2
offsetsRequest.ConsumerGroup = group
offsetsResponse, err := broker.FetchOffset(offsetsRequest)
if err != nil {
log.Printf("Could not get offset: %s\n", err.Error())
}
for topic, partitions := range offsetsResponse.Blocks {
for partition, block := range partitions {
if *debug {
log.Printf("Discovered group: %s, topic: %s, partition: %d, offset: %d\n", group, topic, partition, block.Offset)
}
partitionLatestOffset, err := client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Printf("Failed to obtain Latest Offset for topic: %s, partition: %d", topic, partition)
}
lag := math.Max(float64(partitionLatestOffset-block.Offset), 0)
OffsetLag.With(prometheus.Labels{
"topic": topic, "group": group,
"partition": strconv.FormatInt(int64(partition), 10),
}).Set(lag)
if *enableCurrentOffset {
CurrentOffset.With(prometheus.Labels{
"topic": topic, "group": group,
"partition": strconv.FormatInt(int64(partition), 10),
}).Set(math.Max(float64(block.Offset), 0))
}
}
}
}
}