Skip to content

Commit

Permalink
kafka: use clowder KafkaServers
Browse files Browse the repository at this point in the history
To allow multi kafka servers usage, use clowder KafkaServers.

FIXES: https://issues.redhat.com/browse/THEEDGE-3708
  • Loading branch information
ldjebran authored Nov 27, 2023
1 parent b3037ec commit ffb0f67
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type EdgeConfig struct {
UploadWorkers int `json:"upload_workers,omitempty"`
KafkaConfig *clowder.KafkaConfig `json:"kafka,omitempty"`
KafkaBrokers []clowder.BrokerConfig `json:"kafka_brokers,omitempty"`
KafkaServers string `json:"kafka_servers,omitempty"`
KafkaBroker *clowder.BrokerConfig `json:"kafka_broker,omitempty"`
KafkaBrokerCaCertPath string `json:"kafka_broker_ca_cert_path,omitempty"`
KafkaRequestRequiredAcks int `json:"kafka_request_required_acks,omitempty"`
Expand Down Expand Up @@ -331,6 +332,7 @@ func CreateEdgeAPIConfig() (*EdgeConfig, error) {
}

edgeConfig.KafkaConfig = cfg.Kafka
edgeConfig.KafkaServers = strings.Join(clowder.KafkaServers, ",")
}

// get edgeConfig from file if running in developer mode
Expand Down
8 changes: 7 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ func TestKafkaBroker(t *testing.T) {
testCases := []struct {
Name string
clowderConfig *clowder.AppConfig
KafkaServers []string
ExpectedKafkaBroker *clowder.BrokerConfig
ExpectedKafkaBrokerCaCert string
ExpectedKafkaServers string
}{
{

Expand All @@ -132,7 +134,9 @@ func TestKafkaBroker(t *testing.T) {
},
},

ExpectedKafkaBroker: &kafkaBroker,
ExpectedKafkaBroker: &kafkaBroker,
KafkaServers: []string{"kafka-1.example.com:9099", "kafka-2.example.com:9099"},
ExpectedKafkaServers: "kafka-1.example.com:9099,kafka-2.example.com:9099",
},

{
Expand Down Expand Up @@ -162,9 +166,11 @@ func TestKafkaBroker(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
clowder.LoadedConfig = testCase.clowderConfig
clowder.KafkaServers = testCase.KafkaServers
// init the configuration
Init()
assert.Equal(t, Config.KafkaBroker, testCase.ExpectedKafkaBroker)
assert.Equal(t, Config.KafkaServers, testCase.ExpectedKafkaServers)
if testCase.ExpectedKafkaBroker != nil {
if testCase.ExpectedKafkaBroker.Cacert != nil && *testCase.ExpectedKafkaBroker.Cacert != "" {
assert.NotEmpty(t, Config.KafkaBrokerCaCertPath)
Expand Down
4 changes: 1 addition & 3 deletions pkg/common/kafka/kafkaconfigmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package kafkacommon

import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/redhatinsights/edge-api/config"
Expand Down Expand Up @@ -33,7 +31,7 @@ func (k *KafkaConfigMapService) getKafkaCommonConfigMap() kafka.ConfigMap {

// use the first kafka broker from config
if cfg.KafkaBroker != nil {
kafkaConfigMap.SetKey("bootstrap.servers", fmt.Sprintf("%s:%d", cfg.KafkaBroker.Hostname, *cfg.KafkaBroker.Port))
kafkaConfigMap.SetKey("bootstrap.servers", cfg.KafkaServers)
var securityProtocol string
if cfg.KafkaBroker.SecurityProtocol != nil {
securityProtocol = *cfg.KafkaBroker.SecurityProtocol
Expand Down
20 changes: 14 additions & 6 deletions pkg/common/kafka/kafkaconfigmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ func TestGetKafkaProducerConfigMap(t *testing.T) {

conf := cfg.KafkaConfig
originalKafkaBroker := cfg.KafkaBroker
originalKafkaServers := cfg.KafkaServers

authType := clowder.BrokerConfigAuthtype("sasl")
dummyString := "something"
mech := "PLAIN"
proto := "SASL_SSL"
port := 80
kafkaServers := "192.168.1.7:80,192.168.1.8:90"
brokerConfig := clowder.BrokerConfig{
Authtype: &authType,
Cacert: &dummyString,
Expand All @@ -39,7 +41,7 @@ func TestGetKafkaProducerConfigMap(t *testing.T) {
},
}
kafkaConfigMap := kafka.ConfigMap{
"bootstrap.servers": "192.168.1.7:80",
"bootstrap.servers": kafkaServers,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": "something",
Expand All @@ -51,14 +53,16 @@ func TestGetKafkaProducerConfigMap(t *testing.T) {
cfg.KafkaBroker = &brokerConfig

// Reset config.kafkaconfig back to its original value
defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig) {
defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig, kafkaServers string) {
config.Get().KafkaConfig = conf
config.Get().KafkaBroker = kafkaBroker
}(conf, originalKafkaBroker)
config.Get().KafkaServers = kafkaServers
}(conf, originalKafkaBroker, originalKafkaServers)

ctx := context.Background()
cfg.KafkaConfig = &v1.KafkaConfig{}
cfg.KafkaConfig.Brokers = []clowder.BrokerConfig{brokerConfig}
cfg.KafkaServers = kafkaServers

cases := []struct {
Name string
Expand Down Expand Up @@ -89,6 +93,7 @@ func TestGetKafkaConsumerConfigMap(t *testing.T) {

conf := cfg.KafkaConfig
originalKafkaBroker := cfg.KafkaBroker
originalKafkaServers := cfg.KafkaServers

authType := clowder.BrokerConfigAuthtype("sasl")
dummyString := "something"
Expand All @@ -107,8 +112,9 @@ func TestGetKafkaConsumerConfigMap(t *testing.T) {
Password: &dummyString,
},
}
kafkaServers := "192.168.1.7:80,192.168.1.8:90"
kafkaConfigMap := kafka.ConfigMap{
"bootstrap.servers": "192.168.1.7:80",
"bootstrap.servers": kafkaServers,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": "something",
Expand All @@ -121,12 +127,14 @@ func TestGetKafkaConsumerConfigMap(t *testing.T) {
brokerSlice := []clowder.BrokerConfig{brokerConfig}
cfg.KafkaBrokers = brokerSlice
cfg.KafkaBroker = &brokerConfig
cfg.KafkaServers = kafkaServers

// Reset config.kafkaconfig back to its original value
defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig) {
defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig, kafkaServers string) {
config.Get().KafkaConfig = conf
config.Get().KafkaBroker = kafkaBroker
}(conf, originalKafkaBroker)
config.Get().KafkaServers = kafkaServers
}(conf, originalKafkaBroker, originalKafkaServers)

ctx := context.Background()
cfg.KafkaConfig = &v1.KafkaConfig{}
Expand Down

0 comments on commit ffb0f67

Please sign in to comment.