-
Notifications
You must be signed in to change notification settings - Fork 15
/
config.go
67 lines (62 loc) · 2.16 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package consumergroup
import (
"errors"
"time"
"github.com/Shopify/sarama"
)
// Config is used to pass multiple configuration options to ConsumerGroup's constructors
type Config struct {
Chroot string
// ZkList is required, zookeeper address's list
ZkList []string
// Zookeeper session timeout, default is 6s
ZkSessionTimeout time.Duration
// GroupID is required, identifer to determin which ConsumerGroup would be joined
GroupID string
// ConsumerID is optional, identifer to sign partition's owner
ConsumerID string
// TopicList is required, topics that ConsumerGroup would be consumed
TopicList []string
// Just export Sarama Config
SaramaConfig *sarama.Config
// Size of error channel, default is 1024
ErrorChannelBufferSize int
// Whether auto commit the offset or not, default is true
OffsetAutoCommitEnable bool
// Offset auto commit interval, default is 10s
OffsetAutoCommitInterval time.Duration
// Where to fetch messages when offset was not found, default is newest
OffsetAutoReset int64
// Claim the partition would give up after ClaimPartitionRetryTimes(>0) retires,
// ClaimPartitionRetryTimes <= 0 would retry until success or receive stop signal
ClaimPartitionRetryTimes int
// Retry interval when fail to clain the partition
ClaimPartitionRetryInterval time.Duration
}
// NewConfig return the new config with default value.
func NewConfig() *Config {
config := new(Config)
config.SaramaConfig = sarama.NewConfig()
config.ErrorChannelBufferSize = 1024
config.OffsetAutoCommitEnable = true
config.OffsetAutoCommitInterval = 10 * time.Second
config.OffsetAutoReset = sarama.OffsetNewest
config.ClaimPartitionRetryTimes = 10
config.ClaimPartitionRetryInterval = 3 * time.Second
config.SaramaConfig.Consumer.Return.Errors = true
return config
}
func (c *Config) validate() error {
if c.ZkList == nil || len(c.ZkList) <= 0 {
return errors.New("ZkList can't be empty")
}
if c.GroupID == "" {
return errors.New("GroupID can't be empty")
}
if c.TopicList == nil || len(c.TopicList) <= 0 {
return errors.New("GroupId can't be empty")
}
c.TopicList = sliceRemoveDuplicates(c.TopicList)
c.ZkList = sliceRemoveDuplicates(c.ZkList)
return nil
}