-
Notifications
You must be signed in to change notification settings - Fork 3
/
consumer.go
69 lines (53 loc) · 1.26 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package kafka
import (
"context"
"os"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
type Message []byte
type Consumer struct {
client *kgo.Client
}
func NewConsumer(groupName string, topics []string, brokers []string, logger bool) (*Consumer, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(brokers...),
kgo.ConsumerGroup(groupName),
kgo.ConsumeTopics(topics...),
kgo.DisableAutoCommit(),
kgo.GroupProtocol("roundrobin"),
kgo.Balancers(kgo.RoundRobinBalancer()),
}
if logger {
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, nil)))
}
cl, err := kgo.NewClient(opts...)
if err != nil {
return nil, err
}
return &Consumer{
client: cl,
}, nil
}
func (c *Consumer) ConsumeBatch(ctx context.Context, batchSize int) []Message {
var messages []Message
for batchSize > 0 {
timeout, cancel := context.WithTimeout(ctx, time.Minute*1)
defer cancel()
fetches := c.client.PollRecords(timeout, batchSize)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
messages = append(messages, record.Value)
}
batchSize = batchSize - len(messages)
if ctx.Err() != nil {
break
}
}
c.client.CommitUncommittedOffsets(ctx)
return messages
}
func (c *Consumer) Close() {
c.client.Close()
}