forked from rewardStyle/kinetic
-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.go
322 lines (285 loc) · 9.78 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
package kinetic
import (
"context"
"net"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
)
// consumerOptions is used to hold all of the configurable settings of a Consumer.
type consumerOptions struct {
reader StreamReader // interface for abstracting the GetRecord/GetRecords call
queueDepth int // size of the consumer's message channel
concurrency int // number of concurrent routines processing messages off of the message channel
logLevel aws.LogLevelType // log level for configuring the LogHelper's log level
Stats ConsumerStatsCollector // stats collection mechanism
}
// defaultConsumerOptions instantiates a consumerOptions with default values.
func defaultConsumerOptions() *consumerOptions {
return &consumerOptions{
queueDepth: 10000,
concurrency: 10,
Stats: &NilConsumerStatsCollector{},
}
}
// ConsumerOptionsFn is a method signature for defining functional option methods for configuring the Consumer.
type ConsumerOptionsFn func(*Consumer) error
// ConsumerReader is a functional option method for configuring the consumer's stream reader.
func ConsumerReader(r StreamReader) ConsumerOptionsFn {
return func(o *Consumer) error {
o.reader = r
return nil
}
}
// ConsumerQueueDepth is a functional option method for configuring the consumer's queueDepth.
func ConsumerQueueDepth(depth int) ConsumerOptionsFn {
return func(o *Consumer) error {
if depth > 0 {
o.queueDepth = depth
return nil
}
return ErrInvalidQueueDepth
}
}
// ConsumerConcurrency is a functional option method for configuring the consumer's concurrency.
func ConsumerConcurrency(count int) ConsumerOptionsFn {
return func(o *Consumer) error {
if count > 0 {
o.concurrency = count
return nil
}
return ErrInvalidConcurrency
}
}
// ConsumerLogLevel is a functional option method for configuring the consumer's log level.
func ConsumerLogLevel(ll aws.LogLevelType) ConsumerOptionsFn {
return func(o *Consumer) error {
o.logLevel = ll & 0xffff0000
return nil
}
}
// ConsumerStats is a functional option method for configuring the consumer's stats collector.
func ConsumerStats(sc ConsumerStatsCollector) ConsumerOptionsFn {
return func(o *Consumer) error {
o.Stats = sc
return nil
}
}
// Consumer polls the StreamReader for messages.
type Consumer struct {
*consumerOptions // contains all of the configuration settings for the Consumer
*LogHelper // object for help with logging
messages chan *Message // channel for storing messages that have been retrieved from the stream
concurrencySem chan empty // channel for controlling the number of concurrent workers processing messages from the message channel
consuming bool // flag for indicating whether or not the consumer is consuming
consumingMu sync.Mutex // mutex for making the consuming flag thread safe
_ noCopy // prevents the Consumer from being copied
}
// NewConsumer creates a new Consumer object for retrieving and listening to message(s) on a StreamReader.
func NewConsumer(c *aws.Config, stream string, shard string, optionFns ...ConsumerOptionsFn) (*Consumer, error) {
consumer := &Consumer{consumerOptions: defaultConsumerOptions()}
for _, optionFn := range optionFns {
optionFn(consumer)
}
if consumer.reader == nil {
r, err := NewKinesisReader(c, stream, shard)
if err != nil {
return nil, err
}
consumer.reader = r
}
consumer.LogHelper = &LogHelper{
LogLevel: consumer.logLevel,
Logger: c.Logger,
}
return consumer, nil
}
// startConsuming will initialize the message channel and set consuming to true if there is not already another consume
// loop running.
func (c *Consumer) startConsuming() bool {
c.consumingMu.Lock()
defer c.consumingMu.Unlock()
if !c.consuming {
c.consuming = true
c.messages = make(chan *Message, c.queueDepth)
c.concurrencySem = make(chan empty, c.concurrency)
return true
}
return false
}
// shouldConsume is a convenience function that allows functions to break their loops if the context receives a
// cancellation or a pipe of death.
func (c *Consumer) shouldConsume(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
return true, nil
}
}
// stopConsuming handles any cleanup after consuming has stopped.
func (c *Consumer) stopConsuming() {
c.consumingMu.Lock()
defer c.consumingMu.Unlock()
if c.consuming && c.messages != nil {
close(c.messages)
}
c.consuming = false
}
// enqueueSingle calls the readers's GetRecord method and enqueus a single message on the message channel.
func (c *Consumer) enqueueSingle(ctx context.Context) error {
err := c.reader.GetRecord(ctx,
func(msg *Message) error {
c.messages <- msg
return nil
})
if err != nil {
c.handleErrorLogging(err)
return err
}
return nil
}
// enqueueBatch calls the reader's GetRecords method and enqueues a batch of messages on the message chanel.
func (c *Consumer) enqueueBatch(ctx context.Context) error {
err := c.reader.GetRecords(ctx,
func(msg *Message) error {
c.messages <- msg
return nil
})
if err != nil {
c.handleErrorLogging(err)
return err
}
return nil
}
// handleErrorLogging is a helper method for handling and logging errors from calling the reader's
// GetRecord and GetRecords method.
func (c *Consumer) handleErrorLogging(err error) {
switch err := err.(type) {
case net.Error:
if err.Timeout() {
c.Stats.AddGetRecordsTimeout(1)
c.LogError("Received net error:", err.Error())
} else {
c.LogError("Received unknown net error:", err.Error())
}
case error:
switch err {
case ErrTimeoutReadResponseBody:
c.Stats.AddGetRecordsReadTimeout(1)
c.LogError("Received error:", err.Error())
default:
c.LogError("Received error:", err.Error())
}
default:
c.LogError("Received unknown error:", err.Error())
}
}
// RetrieveWithContext waits for a message from the stream and returns the Cancellation is supported through
// contexts.
func (c *Consumer) RetrieveWithContext(ctx context.Context) (*Message, error) {
if !c.startConsuming() {
return nil, ErrAlreadyConsuming
}
defer c.stopConsuming()
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
for {
// A cancellation or closing the pipe of death will cause Retrieve (and related functions) to abort in
// between getRecord calls. Note, that this would only occur when there are no new records to retrieve.
// Otherwise, getRecords will be allowed to run to completion and deliver one record.
ok, err := c.shouldConsume(ctx)
if !ok {
return nil, err
}
c.enqueueSingle(childCtx)
select {
case msg := <-c.messages:
return msg, nil
default:
}
}
}
// Retrieve waits for a message from the stream and returns the value.
func (c *Consumer) Retrieve() (*Message, error) {
return c.RetrieveWithContext(context.TODO())
}
// RetrieveFnWithContext retrieves a message from the stream and dispatches it to the supplied function. RetrieveFn
// will wait until the function completes. Cancellation is supported through context.
func (c *Consumer) RetrieveFnWithContext(ctx context.Context, fn MessageProcessor) error {
msg, err := c.RetrieveWithContext(ctx)
if err != nil {
return err
}
if fn != nil {
start := time.Now()
if err := fn(msg); err != nil {
return err
}
c.Stats.UpdateProcessedDuration(time.Since(start))
c.Stats.AddProcessed(1)
}
return nil
}
// RetrieveFn retrieves a message from the stream and dispatches it to the supplied function. RetrieveFn will wait
// until the function completes.
func (c *Consumer) RetrieveFn(fn MessageProcessor) error {
return c.RetrieveFnWithContext(context.TODO(), fn)
}
// consume calls getRecords with configured batch size in a loop until the consumer is stopped.
func (c *Consumer) consume(ctx context.Context) {
// We need to run startConsuming to make sure that we are okay and ready to start consuming. This is mainly to
// avoid a race condition where Listen() will attempt to read the messages channel prior to consume()
// initializing it. We can then launch a goroutine to handle the actual consume operation.
if !c.startConsuming() {
return
}
go func() {
defer c.stopConsuming()
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
for {
// The consume loop can be cancelled by a calling the cancellation function on the context or by
// closing the pipe of death. Note that in the case of context cancellation, the getRecords
// call below will be allowed to complete (as getRecords does not regard context cancellation).
// In the case of cancellation by pipe of death, however, the getRecords will immediately abort
// and allow the consume function to immediately abort as well.
if ok, _ := c.shouldConsume(ctx); !ok {
return
}
c.enqueueBatch(childCtx)
}
}()
}
// ListenWithContext listens and delivers message to the supplied function. Upon cancellation, Listen will stop the
// consumer loop and wait until the messages channel is closed and all messages are delivered.
func (c *Consumer) ListenWithContext(ctx context.Context, fn MessageProcessor) {
c.consume(ctx)
var wg sync.WaitGroup
defer wg.Wait()
for {
msg, ok := <-c.messages
if !ok {
return
}
c.Stats.AddDelivered(1)
// For simplicity, did not do the pipe of death here. If POD is received, we may deliver a
// couple more messages (especially since select is random in which channel is read from).
c.concurrencySem <- empty{}
wg.Add(1)
go func(msg *Message) {
defer func() {
<-c.concurrencySem
}()
start := time.Now()
fn(msg)
c.Stats.UpdateProcessedDuration(time.Since(start))
c.Stats.AddProcessed(1)
wg.Done()
}(msg)
}
}
// Listen listens and delivers message to the supplied function.
func (c *Consumer) Listen(fn MessageProcessor) {
c.ListenWithContext(context.TODO(), fn)
}