This repository has been archived by the owner on Feb 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.go
135 lines (117 loc) · 5.03 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package kafkalock
import (
"errors"
"os"
"path"
"time"
"github.com/Shopify/sarama"
)
// TopicPolicy dictates if/how KafkaLock should "validate" the topic configuration.
type TopicPolicy uint8
const (
// TopicPolicyNoValidation means that no validation will be done at all.
// KafkaLock will misbehave if the topic is not configured properly.
TopicPolicyNoValidation TopicPolicy = iota
// TopicPolicyValidateOnly makes KafkaLocker return an error if the topic isn't configured as expected
// or doesn't exist at all.
TopicPolicyValidateOnly
// TopicPolicyCreate makes KafkaLocker return an error if the topic isn't configured as expected.
// However, if the topic doesn't exist at all then it will be created with the expected configuration.
// This is recommended (and default) configuration.
TopicPolicyCreate
// TopicPolicyDropAndCreate behaves as TopicPolicyCreate but it will re-create (drop+create) the topic if it
// isn't configured as expected. This isn't a recommended configuration as it will hide errors in the KafkaLocker
// configuration and will cause misbehaviour if multiple KafkaLocker instances try to acquire lock for the same
// topic but will use different configurations. Use this for manual "maintenance" only.
TopicPolicyDropAndCreate
)
// Config is a struct holding the KafkaLocker configuration. Use NewConfig() to create one with defaults.
type Config struct {
// Contains list of Kafka bootstrap servers (servers to which KafkaLock will connect to).
BootstrapServers []string
// Sarama's configuration struct - use it to tune timeouts to your liking. Leave nil to use the defaults.
KafkaConfig *sarama.Config
// Name of the topic you wan't to use for the lock. It serves as the lock identifier (the "thing" you are locking).
Topic string
// Maximum number of KafkaLocker instances that may hold the lock to certain topic at the same time.
// For exclusive lock use MaxHolders = 1 (default).
MaxHolders int
// Replication factor says on how many brokers we want the topic to be replicated on.
// Kafka's default is 1, recommended value is the number of your brokers.
ReplicationFactor int
// Dictates if/how KafkaLock should "validate" the topic configuration. Check TopicPolicy constants for more info.
// Default value is TopicPolicyCreate.
TopicPolicy TopicPolicy
// If true then KafkaLock won't alter Sarama's timeouts configuration to preferred/recommended values.
// Be aware that the recommended values require you to adjust Kafka's configuration (group.min.session.timeout.ms).
NoTimeoutsTweaking bool
// If true then KafkaLock won't alter your Kafka's ClientID at all. You are advised to provide your own anyway.
NoClientIDSuffix bool
// String identifier that is unique for all KafkaLocker instances. It mustn't change between application restarts.
// Currently it is used only for default FlapGuard, so if you provide your own implementation
// then you may leave it empty.
AppID string
// Utility class that protects the KafkaLocker against application "flapping". Leave nil to use the
// default implementation. Check the FlapGuard interface for more details.
FlapGuard FlapGuard
// You can set this to customize logging behaviour of KafkaLock. Default is to log nothing (NopLogger).
Logger *Logger
}
func processConfig(c *Config) (*Config, error) {
if c.MaxHolders < 1 {
return nil, errors.New("Config.MaxHolders must be greater then or equal to 1")
}
if c.ReplicationFactor < 1 {
return nil, errors.New("Config.ReplicationFactor must be greater then or equal to 1")
}
if c.Logger == nil {
c.Logger = NewNopLogger()
}
if c.KafkaConfig == nil {
c.KafkaConfig = sarama.NewConfig()
}
kafkaConfig := *c.KafkaConfig
c.KafkaConfig = &kafkaConfig
kafkaConfig.Metadata.Full = false
kafkaConfig.Metadata.RefreshFrequency = 0
kafkaConfig.Consumer.Return.Errors = false
kafkaConfig.Consumer.Group.Rebalance.Strategy = &kafkaLockBalanceStrategy{c.Logger}
if !c.NoTimeoutsTweaking {
kafkaConfig.Consumer.Group.Session.Timeout = time.Second * 3
kafkaConfig.Consumer.Group.Heartbeat.Interval = time.Second
kafkaConfig.Consumer.Group.Rebalance.Timeout = time.Second
kafkaConfig.Consumer.Retry.Backoff = time.Millisecond * 500
}
if !c.NoClientIDSuffix {
if kafkaConfig.ClientID == "" {
kafkaConfig.ClientID = "kafkalock"
} else {
kafkaConfig.ClientID = c.KafkaConfig.ClientID + ".kafkalock"
}
}
if c.FlapGuard == nil {
if c.AppID == "" {
return nil, errors.New("unique static AppID must be set to use the default FlapGuard")
}
dir := path.Join(os.TempDir(), "kafkalock")
err := os.MkdirAll(dir, 0775)
if err != nil {
return nil, err
}
c.FlapGuard, err = NewFileFlapGuard(c.Logger, path.Join(dir, c.AppID), 3, true)
if err != nil {
return nil, err
}
}
return c, nil
}
// NewConfig returns a new Config struct filled with the default values.
func NewConfig(bootstrapServers []string, topic string) *Config {
return &Config{
BootstrapServers: bootstrapServers,
Topic: topic,
MaxHolders: 1,
ReplicationFactor: 3,
TopicPolicy: TopicPolicyCreate,
}
}