forked from content-services/content-sources-backend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent.go
69 lines (62 loc) · 2.26 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package config
import (
"os"
"strings"
"github.com/content-services/content-sources-backend/pkg/event"
clowder "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/spf13/viper"
)
func addEventConfigDefaults(options *viper.Viper) {
options.SetDefault("kafka.timeout", 10000)
options.SetDefault("kafka.group.id", "content-sources")
options.SetDefault("kafka.auto.offset.reset", "latest")
options.SetDefault("kafka.auto.commit.interval.ms", 5000)
options.SetDefault("kafka.request.required.acks", -1) // -1 == "all"
options.SetDefault("kafka.message.send.max.retries", 15)
options.SetDefault("kafka.retry.backoff.ms", 100)
if clowder.IsClowderEnabled() {
cfg := clowder.LoadedConfig
event.TopicTranslationConfig = event.NewTopicTranslationWithClowder(cfg)
options.SetDefault("kafka.bootstrap.servers", strings.Join(clowder.KafkaServers, ","))
// Prepare topics
topics := []string{}
for _, value := range clowder.KafkaTopics {
if strings.Contains(value.Name, "content-sources") {
topics = append(topics, value.Name)
}
}
options.SetDefault("kafka.topics", strings.Join(topics, ","))
if cfg != nil && cfg.Kafka != nil && cfg.Kafka.Brokers != nil && len(cfg.Kafka.Brokers) > 0 {
if cfg.Kafka.Brokers[0].Cacert != nil {
// This method is writing only the first CA but if
// that behavior changes in the future, nothing will
// be changed here
caPath, err := cfg.KafkaCa(cfg.Kafka.Brokers...)
if err != nil {
panic("Kafka CA failed to write")
}
options.Set("kafka.capath", caPath)
}
broker := cfg.Kafka.Brokers[0]
if broker.Authtype != nil {
options.Set("kafka.sasl.username", *broker.Sasl.Username)
options.Set("kafka.sasl.password", *broker.Sasl.Password)
options.Set("kafka.sasl.mechanism", *broker.Sasl.SaslMechanism)
if broker.SecurityProtocol != nil {
options.Set("kafka.sasl.protocol", *broker.SecurityProtocol)
}
}
}
} else {
// If clowder is not present, set defaults to local configuration
event.TopicTranslationConfig = event.NewTopicTranslationWithClowder(nil)
options.SetDefault("kafka.bootstrap.servers", readEnv("KAFKA_BOOTSTRAP_SERVERS", ""))
}
}
func readEnv(key string, def string) string {
value, ok := os.LookupEnv(key)
if !ok {
value = def
}
return value
}