Skip to content

Commit d6449a1

Browse files
committed
modifies DelayingConsumer to use a message header instead of a delay
Instead of a static delay, uses a "not before" time found in a Kafka message header. Consumption of the message will not be attempted until the time has passed. This allows for more accurate delays, as the time required to process an earlier message doesn't further delay the current message's processing. BACK-2449
1 parent 2920de5 commit d6449a1

File tree

2 files changed

+216
-25
lines changed

2 files changed

+216
-25
lines changed

data/events/events.go

Lines changed: 93 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package events
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"os"
8+
"strconv"
79
"sync"
810
"time"
911

@@ -341,9 +343,8 @@ func (r *CascadingSaramaEventsRunner) buildConsumer(ctx context.Context, idx int
341343
}
342344
}
343345
if delay > 0 {
344-
consumer = &DelayingConsumer{
346+
consumer = &NotBeforeConsumer{
345347
Consumer: consumer,
346-
Delay: delay,
347348
Logger: r.Logger,
348349
}
349350
}
@@ -370,28 +371,73 @@ func (r *CascadingSaramaEventsRunner) logger(ctx context.Context) log.Logger {
370371
return r.Logger
371372
}
372373

373-
// DelayingConsumer injects a delay before consuming a message.
374-
type DelayingConsumer struct {
374+
// NotBeforeConsumer delays consumption until a specified time.
375+
type NotBeforeConsumer struct {
375376
Consumer asyncevents.SaramaMessageConsumer
376-
Delay time.Duration
377377
Logger log.Logger
378378
}
379379

380-
func (c *DelayingConsumer) Consume(ctx context.Context, session sarama.ConsumerGroupSession,
380+
func (c *NotBeforeConsumer) Consume(ctx context.Context, session sarama.ConsumerGroupSession,
381381
msg *sarama.ConsumerMessage) error {
382382

383+
notBefore, err := c.notBeforeFromMsgHeaders(msg)
384+
if err != nil {
385+
c.Logger.WithError(err).Info("Unable to parse kafka header not-before value")
386+
}
387+
delay := time.Until(notBefore)
388+
383389
select {
384390
case <-ctx.Done():
385391
if ctxErr := ctx.Err(); ctxErr != context.Canceled {
386392
return ctxErr
387393
}
388394
return nil
389-
case <-time.After(c.Delay):
390-
c.Logger.WithFields(log.Fields{"topic": msg.Topic, "delay": c.Delay}).Debugf("delayed")
395+
case <-time.After(time.Until(notBefore)):
396+
if !notBefore.IsZero() {
397+
fields := log.Fields{"topic": msg.Topic, "not-before": notBefore, "delay": delay}
398+
c.Logger.WithFields(fields).Debugf("delayed")
399+
}
391400
return c.Consumer.Consume(ctx, session, msg)
392401
}
393402
}
394403

404+
// HeaderNotBefore tells consumers not to consume a message before a certain time.
405+
var HeaderNotBefore = []byte("x-tidepool-not-before")
406+
407+
// NotBeforeTimeFormat specifies the [time.Parse] format to use for HeaderNotBefore.
408+
var NotBeforeTimeFormat = time.RFC3339Nano
409+
410+
// HeaderFailures counts the number of failures encountered trying to consume the message.
411+
var HeaderFailures = []byte("x-tidepool-failures")
412+
413+
// FailuresToDelay maps the number of consumption failures to the next delay.
414+
//
415+
// Rather than using a failures header, the name of the topic could be used as a lookup, if
416+
// so desired.
417+
var FailuresToDelay = map[int]time.Duration{
418+
0: 0,
419+
1: 1 * time.Second,
420+
2: 2 * time.Second,
421+
3: 3 * time.Second,
422+
4: 5 * time.Second,
423+
}
424+
425+
func (c *NotBeforeConsumer) notBeforeFromMsgHeaders(msg *sarama.ConsumerMessage) (
426+
time.Time, error) {
427+
428+
for _, header := range msg.Headers {
429+
if bytes.Equal(header.Key, HeaderNotBefore) {
430+
notBefore, err := time.Parse(NotBeforeTimeFormat, string(header.Value))
431+
if err != nil {
432+
return time.Time{}, fmt.Errorf("parsing not before header: %s", err)
433+
} else {
434+
return notBefore, nil
435+
}
436+
}
437+
}
438+
return time.Time{}, fmt.Errorf("header not found: x-tidepool-not-before")
439+
}
440+
395441
// CascadingConsumer cascades messages that failed to be consumed to another topic.
396442
type CascadingConsumer struct {
397443
Consumer asyncevents.SaramaMessageConsumer
@@ -444,6 +490,7 @@ func (c *CascadingConsumer) withTxn(f func() error) (err error) {
444490
return f()
445491
}
446492

493+
// cascadeMessage to the next topic.
447494
func (c *CascadingConsumer) cascadeMessage(msg *sarama.ConsumerMessage) *sarama.ProducerMessage {
448495
pHeaders := make([]sarama.RecordHeader, len(msg.Headers))
449496
for idx, header := range msg.Headers {
@@ -453,6 +500,43 @@ func (c *CascadingConsumer) cascadeMessage(msg *sarama.ConsumerMessage) *sarama.
453500
Key: sarama.ByteEncoder(msg.Key),
454501
Value: sarama.ByteEncoder(msg.Value),
455502
Topic: c.NextTopic,
456-
Headers: pHeaders,
503+
Headers: c.updateCascadeHeaders(pHeaders),
504+
}
505+
}
506+
507+
// updateCascadeHeaders calculates not before and failures header values.
508+
//
509+
// Existing not before and failures headers will be dropped in place of the new ones.
510+
func (c *CascadingConsumer) updateCascadeHeaders(headers []sarama.RecordHeader) []sarama.RecordHeader {
511+
failures := 0
512+
notBefore := time.Now()
513+
514+
keep := make([]sarama.RecordHeader, 0, len(headers))
515+
for _, header := range headers {
516+
switch {
517+
case bytes.Equal(header.Key, HeaderNotBefore):
518+
continue // Drop this header, we'll add a new version below.
519+
case bytes.Equal(header.Key, HeaderFailures):
520+
parsed, err := strconv.ParseInt(string(header.Value), 10, 32)
521+
if err != nil {
522+
c.Logger.WithError(err).Info("Unable to parse consumption failures count")
523+
} else {
524+
failures = int(parsed)
525+
notBefore = notBefore.Add(FailuresToDelay[failures])
526+
}
527+
continue // Drop this header, we'll add a new version below.
528+
}
529+
keep = append(keep, header)
457530
}
531+
532+
keep = append(keep, sarama.RecordHeader{
533+
Key: HeaderNotBefore,
534+
Value: []byte(notBefore.Format(NotBeforeTimeFormat)),
535+
})
536+
keep = append(keep, sarama.RecordHeader{
537+
Key: HeaderFailures,
538+
Value: []byte(strconv.Itoa(failures + 1)),
539+
})
540+
541+
return keep
458542
}

data/events/events_test.go

Lines changed: 123 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package events
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"strconv"
68
"sync"
79
"sync/atomic"
810
"time"
@@ -110,24 +112,32 @@ var _ = DescribeTable("CappedExponentialBinaryDelay",
110112
Entry("cap: 1m; tries: 20", time.Minute, 20, time.Minute),
111113
)
112114

113-
var _ = Describe("DelayingConsumer", func() {
115+
var _ = Describe("NotBeforeConsumer", func() {
114116
Describe("Consume", func() {
115-
var testMsg = &sarama.ConsumerMessage{
116-
Topic: "test.topic",
117+
var newTestMsg = func(notBefore time.Time) *sarama.ConsumerMessage {
118+
headers := []*sarama.RecordHeader{}
119+
if !notBefore.IsZero() {
120+
headers = append(headers, &sarama.RecordHeader{
121+
Key: HeaderNotBefore,
122+
Value: []byte(notBefore.Format(NotBeforeTimeFormat)),
123+
})
124+
}
125+
return &sarama.ConsumerMessage{Topic: "test.topic", Headers: headers}
117126
}
118127

119-
It("delays by the configured duration", func() {
128+
It("delays based on the x-tidepool-not-before header", func() {
120129
logger := newTestDevlog()
121130
testDelay := 10 * time.Millisecond
122131
ctx := context.Background()
123132
start := time.Now()
124-
dc := &DelayingConsumer{
133+
notBefore := start.Add(testDelay)
134+
msg := newTestMsg(notBefore)
135+
dc := &NotBeforeConsumer{
125136
Consumer: &mockSaramaMessageConsumer{Logger: logger},
126-
Delay: testDelay,
127137
Logger: logger,
128138
}
129139

130-
err := dc.Consume(ctx, nil, testMsg)
140+
err := dc.Consume(ctx, nil, msg)
131141

132142
Expect(err).To(BeNil())
133143
Expect(time.Since(start)).To(BeNumerically(">", testDelay))
@@ -137,9 +147,10 @@ var _ = Describe("DelayingConsumer", func() {
137147
logger := newTestDevlog()
138148
testDelay := 10 * time.Millisecond
139149
abortAfter := 1 * time.Millisecond
140-
dc := &DelayingConsumer{
150+
notBefore := time.Now().Add(testDelay)
151+
msg := newTestMsg(notBefore)
152+
dc := &NotBeforeConsumer{
141153
Consumer: &mockSaramaMessageConsumer{Delay: time.Minute, Logger: logger},
142-
Delay: testDelay,
143154
Logger: logger,
144155
}
145156
ctx, cancel := context.WithCancel(context.Background())
@@ -149,7 +160,7 @@ var _ = Describe("DelayingConsumer", func() {
149160
}()
150161
start := time.Now()
151162

152-
err := dc.Consume(ctx, nil, testMsg)
163+
err := dc.Consume(ctx, nil, msg)
153164

154165
Expect(err).To(BeNil())
155166
Expect(time.Since(start)).To(BeNumerically(">", abortAfter))
@@ -158,14 +169,14 @@ var _ = Describe("DelayingConsumer", func() {
158169
})
159170
})
160171

161-
var _ = Describe("ShiftingConsumer", func() {
172+
var _ = Describe("CascadingConsumer", func() {
162173
Describe("Consume", func() {
163174
var testMsg = &sarama.ConsumerMessage{
164175
Topic: "test.topic",
165176
}
166177

167178
Context("on failure", func() {
168-
It("shifts topics", func() {
179+
It("cascades topics", func() {
169180
t := GinkgoT()
170181
logger := newTestDevlog()
171182
ctx := context.Background()
@@ -195,6 +206,102 @@ var _ = Describe("ShiftingConsumer", func() {
195206
Expect(mockProducer.Close()).To(Succeed())
196207
Expect(err).To(BeNil())
197208
})
209+
210+
It("increments the failures header", func() {
211+
t := GinkgoT()
212+
logger := newTestDevlog()
213+
ctx := context.Background()
214+
testConfig := mocks.NewTestConfig()
215+
mockProducer := mocks.NewAsyncProducer(t, testConfig)
216+
msg := &sarama.ConsumerMessage{
217+
Headers: []*sarama.RecordHeader{
218+
{
219+
Key: HeaderFailures, Value: []byte("3"),
220+
},
221+
},
222+
}
223+
nextTopic := "text-next"
224+
sc := &CascadingConsumer{
225+
Consumer: &mockSaramaMessageConsumer{
226+
Err: fmt.Errorf("test error"),
227+
Logger: logger,
228+
},
229+
NextTopic: nextTopic,
230+
Producer: mockProducer,
231+
Logger: logger,
232+
}
233+
234+
cf := func(msg *sarama.ProducerMessage) error {
235+
failures := 0
236+
for _, header := range msg.Headers {
237+
if !bytes.Equal(header.Key, HeaderFailures) {
238+
continue
239+
}
240+
parsed, err := strconv.ParseInt(string(header.Value), 10, 32)
241+
Expect(err).To(Succeed())
242+
failures = int(parsed)
243+
if failures != 4 {
244+
return fmt.Errorf("expected failures == 4, got %d", failures)
245+
}
246+
return nil
247+
}
248+
return fmt.Errorf("expected failures header wasn't found")
249+
}
250+
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(cf)
251+
252+
err := sc.Consume(ctx, nil, msg)
253+
Expect(mockProducer.Close()).To(Succeed())
254+
Expect(err).To(BeNil())
255+
})
256+
257+
It("updates the not before header", func() {
258+
t := GinkgoT()
259+
logger := newTestDevlog()
260+
ctx := context.Background()
261+
testConfig := mocks.NewTestConfig()
262+
mockProducer := mocks.NewAsyncProducer(t, testConfig)
263+
msg := &sarama.ConsumerMessage{
264+
Headers: []*sarama.RecordHeader{
265+
{
266+
Key: HeaderFailures, Value: []byte("2"),
267+
},
268+
},
269+
}
270+
nextTopic := "text-next"
271+
sc := &CascadingConsumer{
272+
Consumer: &mockSaramaMessageConsumer{
273+
Err: fmt.Errorf("test error"),
274+
Logger: logger,
275+
},
276+
NextTopic: nextTopic,
277+
Producer: mockProducer,
278+
Logger: logger,
279+
}
280+
281+
cf := func(msg *sarama.ProducerMessage) error {
282+
for _, header := range msg.Headers {
283+
if !bytes.Equal(header.Key, HeaderNotBefore) {
284+
continue
285+
}
286+
parsed, err := time.Parse(NotBeforeTimeFormat, string(header.Value))
287+
if err != nil {
288+
return err
289+
}
290+
until := time.Until(parsed)
291+
delta := 10 * time.Millisecond
292+
if until < 2*time.Second-delta || until > 2*time.Second+delta {
293+
return fmt.Errorf("expected 2 seconds' delay, got: %s", until)
294+
}
295+
return nil
296+
}
297+
return fmt.Errorf("expected failures header wasn't found")
298+
}
299+
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(cf)
300+
301+
err := sc.Consume(ctx, nil, msg)
302+
Expect(mockProducer.Close()).To(Succeed())
303+
Expect(err).To(BeNil())
304+
})
198305
})
199306

200307
Context("on success", func() {
@@ -244,8 +351,8 @@ var _ = Describe("ShiftingConsumer", func() {
244351
})
245352
})
246353

247-
var _ = Describe("ShiftingSaramaEventsRunner", func() {
248-
It("shifts through configured delays", func() {
354+
var _ = Describe("CascadingSaramaEventsRunner", func() {
355+
It("cascades through configured delays", func() {
249356
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
250357
defer cancel()
251358
testDelays := []time.Duration{0, 1, 2, 3, 5}
@@ -256,7 +363,7 @@ var _ = Describe("ShiftingSaramaEventsRunner", func() {
256363
Logger: testLogger,
257364
}
258365
testConfig := SaramaRunnerConfig{
259-
Topics: []string{"test.shifting"},
366+
Topics: []string{"test.cascading"},
260367
MessageConsumer: testMessageConsumer,
261368
Sarama: mocks.NewTestConfig(),
262369
}
@@ -338,7 +445,7 @@ var _ = Describe("ShiftingSaramaEventsRunner", func() {
338445
})
339446
})
340447

341-
// testSaramaBuilders injects mocks into the ShiftingSaramaEventsRunner
448+
// testSaramaBuilders injects mocks into the CascadingSaramaEventsRunner
342449
type testSaramaBuilders struct {
343450
consumerGroup func([]string, string, *sarama.Config) (sarama.ConsumerGroup, error)
344451
producer func([]string, *sarama.Config) (sarama.AsyncProducer, error)

0 commit comments

Comments
 (0)