forked from rewardStyle/kinetic
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer_stats.go
306 lines (262 loc) · 15 KB
/
consumer_stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package kinetic
import (
"log"
"time"
metrics "github.com/rcrowley/go-metrics"
)
// ConsumerStatsCollector allows for a collector to collect various metrics produced by
// the Kinetic Consumer library. This was really built with rcrowley/go-metrics
// in mind.
type ConsumerStatsCollector interface {
AddConsumed(int)
AddDelivered(int)
AddProcessed(int)
UpdateBatchSize(int)
AddGetRecordsCalled(int)
AddReadProvisionedThroughputExceeded(int)
AddGetRecordsTimeout(int)
AddGetRecordsReadTimeout(int)
UpdateProcessedDuration(time.Duration)
UpdateGetRecordsDuration(time.Duration)
UpdateGetRecordsReadResponseDuration(time.Duration)
UpdateGetRecordsUnmarshalDuration(time.Duration)
AddCheckpointInsert(int)
AddCheckpointDone(int)
UpdateCheckpointSize(int)
AddCheckpointSent(int)
AddCheckpointSuccess(int)
AddCheckpointError(int)
}
// NilConsumerStatsCollector is a stats consumer that ignores all metrics.
type NilConsumerStatsCollector struct{}
// AddConsumed records a count of the number of messages received from AWS
// Kinesis by the consumer.
func (nsc *NilConsumerStatsCollector) AddConsumed(int) {}
// AddDelivered records a count of the number of messages delivered to the
// application by the consumer.
func (nsc *NilConsumerStatsCollector) AddDelivered(int) {}
// AddProcessed records a count of the number of messages processed by the
// application by the consumer. This is based on a WaitGroup that is sent to
// the RetrieveFn and Listen functions. Retrieve does not count processed
// messages.
func (nsc *NilConsumerStatsCollector) AddProcessed(int) {}
// UpdateBatchSize records a count of the number of messages returned by
// GetRecords in the consumer.
func (nsc *NilConsumerStatsCollector) UpdateBatchSize(int) {}
// AddGetRecordsCalled records the number of times the GetRecords API was called
// by the consumer.
func (nsc *NilConsumerStatsCollector) AddGetRecordsCalled(int) {}
// AddReadProvisionedThroughputExceeded records the number of times the GetRecords
// API returned a ErrCodeProvisionedThroughputExceededException by the consumer.
func (nsc *NilConsumerStatsCollector) AddReadProvisionedThroughputExceeded(int) {}
// AddGetRecordsTimeout records the number of times the GetRecords API timed out
// on the HTTP level. This is influenced by the WithHTTPClientTimeout
// configuration.
func (nsc *NilConsumerStatsCollector) AddGetRecordsTimeout(int) {}
// AddGetRecordsReadTimeout records the number of times the GetRecords API timed
// out while reading the response body. This is influenced by the
// WithGetRecordsReadTimeout configuration.
func (nsc *NilConsumerStatsCollector) AddGetRecordsReadTimeout(int) {}
// UpdateProcessedDuration records the duration to process a record. See notes on
// AddProcessed.
func (nsc *NilConsumerStatsCollector) UpdateProcessedDuration(time.Duration) {}
// UpdateGetRecordsDuration records the duration that the GetRecords API request
// took. Only the times of successful calls are measured.
func (nsc *NilConsumerStatsCollector) UpdateGetRecordsDuration(time.Duration) {}
// UpdateGetRecordsReadResponseDuration records the duration that it took to read
// the response body of a GetRecords API request.
func (nsc *NilConsumerStatsCollector) UpdateGetRecordsReadResponseDuration(time.Duration) {}
// UpdateGetRecordsUnmarshalDuration records the duration that it took to unmarshal
// the response body of a GetRecords API request.
func (nsc *NilConsumerStatsCollector) UpdateGetRecordsUnmarshalDuration(time.Duration) {}
// AddCheckpointInsert records the number of times the CheckpointInsert API was called.
func (nsc *NilConsumerStatsCollector) AddCheckpointInsert(int) {}
// AddCheckpointDone records the number of times the CheckpointDone API was called.
func (nsc *NilConsumerStatsCollector) AddCheckpointDone(int) {}
// UpdateCheckpointSize records the current size of the checkpointer.
func (nsc *NilConsumerStatsCollector) UpdateCheckpointSize(int) {}
// AddCheckpointSent records the number of times a checkpoint action message was sent to KCL.
func (nsc *NilConsumerStatsCollector) AddCheckpointSent(int) {}
// AddCheckpointSuccess records the number of times KCL send a checkpoint acknowledgement indicating that
// checkpointing was successful
func (nsc *NilConsumerStatsCollector) AddCheckpointSuccess(int) {}
// AddCheckpointError records the number of times KCL send a checkpoint acknowledgement indicating that
// checkpointing was not successful
func (nsc *NilConsumerStatsCollector) AddCheckpointError(int) {}
// Metric names to be exported
const (
MetricsConsumed = "kinetic.consumer.consumed"
MetricsDelivered = "kinetic.consumer.delivered"
MetricsProcessed = "kinetic.consumer.processed"
MetricsBatchSize = "kinetic.consumer.batchsize"
MetricsSent = "kinetic.consumer.sent"
MetricsReadProvisionedThroughputExceeded = "kinetic.consumer.getrecords.provisionedthroughputexceeded"
MetricsGetRecordsTimeout = "kinetic.consumer.getrecords.timeout"
MetricsGetRecordsReadTimeout = "kinetic.consumer.getrecords.readtimeout"
MetricsProcessedDuration = "kinetic.consumer.processed.duration"
MetricsGetRecordsDuration = "kinetic.consumer.getrecords.duration"
MetricsGetRecordsReadResponseDuration = "kinetic.consumer.getrecords.readresponse.duration"
MetricsGetRecordsUnmarshalDuration = "kinetic.consumer.getrecords.unmarshal.duration"
MetricsCheckpointInsert = "kinetic.consumer.checkpoint.insert"
MetricsCheckpointDone = "kinetic.consumer.checkpoint.done"
MetricsCheckpointSize = "kinetic.consumer.checkpoint.size"
MetricsCheckpointSent = "kinetic.consumer.checkpoint.sent"
MetricsCheckpointSuccess = "kinetic.consumer.checkpoint.success"
MetricsCheckpointError = "kinetic.consumer.checkpoint.error"
)
// DefaultConsumerStatsCollector is a type that implements the consumer's StatsCollector interface using the
// rcrowley/go-metrics library
type DefaultConsumerStatsCollector struct {
Consumed metrics.Counter
Delivered metrics.Counter
Processed metrics.Counter
BatchSize metrics.Gauge
GetRecordsCalled metrics.Counter
ReadProvisionedThroughputExceeded metrics.Counter
GetRecordsTimeout metrics.Counter
GetRecordsReadTimeout metrics.Counter
ProcessedDuration metrics.Gauge
GetRecordsDuration metrics.Gauge
GetRecordsReadResponseDuration metrics.Gauge
GetRecordsUnmarshalDuration metrics.Gauge
CheckpointInsert metrics.Counter
CheckpointDone metrics.Counter
CheckpointSize metrics.Gauge
CheckpointSent metrics.Counter
CheckpointSuccess metrics.Counter
CheckpointError metrics.Counter
}
// NewDefaultConsumerStatsCollector instantiates a new DefaultStatsCollector object
func NewDefaultConsumerStatsCollector(r metrics.Registry) *DefaultConsumerStatsCollector {
return &DefaultConsumerStatsCollector{
Consumed: metrics.GetOrRegisterCounter(MetricsConsumed, r),
Delivered: metrics.GetOrRegisterCounter(MetricsDelivered, r),
Processed: metrics.GetOrRegisterCounter(MetricsProcessed, r),
BatchSize: metrics.GetOrRegisterGauge(MetricsBatchSize, r),
GetRecordsCalled: metrics.GetOrRegisterCounter(MetricsSent, r),
ReadProvisionedThroughputExceeded: metrics.GetOrRegisterCounter(MetricsReadProvisionedThroughputExceeded, r),
GetRecordsTimeout: metrics.GetOrRegisterCounter(MetricsGetRecordsTimeout, r),
GetRecordsReadTimeout: metrics.GetOrRegisterCounter(MetricsGetRecordsReadTimeout, r),
ProcessedDuration: metrics.GetOrRegisterGauge(MetricsProcessedDuration, r),
GetRecordsDuration: metrics.GetOrRegisterGauge(MetricsGetRecordsDuration, r),
GetRecordsReadResponseDuration: metrics.GetOrRegisterGauge(MetricsGetRecordsReadResponseDuration, r),
GetRecordsUnmarshalDuration: metrics.GetOrRegisterGauge(MetricsGetRecordsUnmarshalDuration, r),
CheckpointInsert: metrics.GetOrRegisterCounter(MetricsCheckpointInsert, r),
CheckpointDone: metrics.GetOrRegisterCounter(MetricsCheckpointDone, r),
CheckpointSize: metrics.GetOrRegisterGauge(MetricsCheckpointSize, r),
CheckpointSent: metrics.GetOrRegisterCounter(MetricsCheckpointSent, r),
CheckpointSuccess: metrics.GetOrRegisterCounter(MetricsCheckpointSuccess, r),
CheckpointError: metrics.GetOrRegisterCounter(MetricsCheckpointError, r),
}
}
// AddConsumed records a count of the number of messages received from AWS
// Kinesis by the consumer.
func (dsc *DefaultConsumerStatsCollector) AddConsumed(count int) {
dsc.Consumed.Inc(int64(count))
}
// AddDelivered records a count of the number of messages delivered to the
// application by the consumer.
func (dsc *DefaultConsumerStatsCollector) AddDelivered(count int) {
dsc.Delivered.Inc(int64(count))
}
// AddProcessed records a count of the number of messages processed by the
// application by the consumer. This is based on a WaitGroup that is sent to
// the RetrieveFn and Listen functions. Retrieve does not count processed
// messages.
func (dsc *DefaultConsumerStatsCollector) AddProcessed(count int) {
dsc.Processed.Inc(int64(count))
}
// UpdateBatchSize records a count of the number of messages returned by
// GetRecords in the consumer.
func (dsc *DefaultConsumerStatsCollector) UpdateBatchSize(count int) {
dsc.BatchSize.Update(int64(count))
}
// AddGetRecordsCalled records the number of times the GetRecords API was called
// by the consumer.
func (dsc *DefaultConsumerStatsCollector) AddGetRecordsCalled(count int) {
dsc.GetRecordsCalled.Inc(int64(count))
}
// AddReadProvisionedThroughputExceeded records the number of times the GetRecords
// API returned a ErrCodeProvisionedThroughputExceededException by the consumer.
func (dsc *DefaultConsumerStatsCollector) AddReadProvisionedThroughputExceeded(count int) {
dsc.ReadProvisionedThroughputExceeded.Inc(int64(count))
}
// AddGetRecordsTimeout records the number of times the GetRecords API timed out
// on the HTTP level. This is influenced by the WithHTTPClientTimeout
// configuration.
func (dsc *DefaultConsumerStatsCollector) AddGetRecordsTimeout(count int) {
dsc.GetRecordsTimeout.Inc(int64(count))
}
// AddGetRecordsReadTimeout records the number of times the GetRecords API timed
// out while reading the response body. This is influenced by the
// WithGetRecordsReadTimeout configuration.
func (dsc *DefaultConsumerStatsCollector) AddGetRecordsReadTimeout(count int) {
dsc.GetRecordsReadTimeout.Inc(int64(count))
}
// UpdateProcessedDuration records the duration to process a record. See notes on
// AddProcessed.
func (dsc *DefaultConsumerStatsCollector) UpdateProcessedDuration(duration time.Duration) {
dsc.ProcessedDuration.Update(duration.Nanoseconds())
}
// UpdateGetRecordsDuration records the duration that the GetRecords API request
// took. Only the times of successful calls are measured.
func (dsc *DefaultConsumerStatsCollector) UpdateGetRecordsDuration(duration time.Duration) {
dsc.GetRecordsDuration.Update(duration.Nanoseconds())
}
// UpdateGetRecordsReadResponseDuration records the duration that it took to read
// the response body of a GetRecords API request.
func (dsc *DefaultConsumerStatsCollector) UpdateGetRecordsReadResponseDuration(duration time.Duration) {
dsc.GetRecordsReadResponseDuration.Update(duration.Nanoseconds())
}
// UpdateGetRecordsUnmarshalDuration records the duration that it took to unmarshal
// the response body of a GetRecords API request.
func (dsc *DefaultConsumerStatsCollector) UpdateGetRecordsUnmarshalDuration(duration time.Duration) {
dsc.GetRecordsUnmarshalDuration.Update(duration.Nanoseconds())
}
// AddCheckpointInsert records the number of times the CheckpointInsert API was called.
func (dsc *DefaultConsumerStatsCollector) AddCheckpointInsert(count int) {
dsc.CheckpointInsert.Inc(int64(count))
}
// AddCheckpointDone records the number of times the CheckpointDone API was called.
func (dsc *DefaultConsumerStatsCollector) AddCheckpointDone(count int) {
dsc.CheckpointDone.Inc(int64(count))
}
// UpdateCheckpointSize records the current size of the checkpointer.
func (dsc *DefaultConsumerStatsCollector) UpdateCheckpointSize(size int) {
dsc.CheckpointSize.Update(int64(size))
}
// AddCheckpointSent records the number of times a checkpoint action message was sent to KCL.
func (dsc *DefaultConsumerStatsCollector) AddCheckpointSent(count int) {
dsc.CheckpointSent.Inc(int64(count))
}
// AddCheckpointSuccess records the number of times KCL send a checkpoint acknowledgement indicating that
// checkpointing was successful
func (dsc *DefaultConsumerStatsCollector) AddCheckpointSuccess(count int) {
dsc.CheckpointSuccess.Inc(int64(count))
}
// AddCheckpointError records the number of times KCL send a checkpoint acknowledgement indicating that
// checkpointing was not successful
func (dsc *DefaultConsumerStatsCollector) AddCheckpointError(count int) {
dsc.CheckpointError.Inc(int64(count))
}
// PrintStats logs the stats
func (dsc *DefaultConsumerStatsCollector) PrintStats() {
log.Printf("Consumer stats: Consumed: [%d]\n", dsc.Consumed.Count())
log.Printf("Consumer stats: Delivered: [%d]\n", dsc.Delivered.Count())
log.Printf("Consumer stats: Processed: [%d]\n", dsc.Processed.Count())
log.Printf("Consumer stats: Batch Size: [%d]\n", dsc.BatchSize.Value())
log.Printf("Consumer stats: GetRecords Called: [%d]\n", dsc.GetRecordsCalled.Count())
log.Printf("Consumer stats: GetRecords Timeout: [%d]\n", dsc.GetRecordsTimeout.Count())
log.Printf("Consumer stats: GetRecords Read Timeout: [%d]\n", dsc.GetRecordsReadTimeout.Count())
log.Printf("Consumer stats: GetRecords Provisioned Throughput Exceeded: [%d]\n", dsc.ReadProvisionedThroughputExceeded.Count())
log.Printf("Consumer stats: Processed Duration (ns): [%d]\n", dsc.ProcessedDuration.Value())
log.Printf("Consumer stats: GetRecords Duration (ns): [%d]\n", dsc.GetRecordsDuration.Value())
log.Printf("Consumer stats: GetRecords Read Response Duration (ns): [%d]\n", dsc.GetRecordsReadResponseDuration.Value())
log.Printf("Consumer stats: GetRecords Unmarshal Duration (ns): [%d]\n", dsc.GetRecordsUnmarshalDuration.Value())
log.Printf("Consumer stats: Checkpoint Insert: [%d]\n", dsc.CheckpointInsert.Count())
log.Printf("Consumer stats: Checkpoint Done: [%d]\n", dsc.CheckpointDone.Count())
log.Printf("Consumer stats: Checkpoint Size: [%d]\n", dsc.CheckpointSize.Value())
log.Printf("Consumer stats: Checkpoint Sent: [%d]\n", dsc.CheckpointSent.Count())
log.Printf("Consumer stats: Checkpoint Success: [%d]\n", dsc.CheckpointSuccess.Count())
log.Printf("Consumer stats: Checkpoint Error: [%d]\n", dsc.CheckpointError.Count())
}