From 6f1bf746b590637eec67a94e53a3b31716902c48 Mon Sep 17 00:00:00 2001 From: Djebran Lezzoum Date: Fri, 24 Nov 2023 12:38:29 +0100 Subject: [PATCH] kafka: use clowder KafkaServers To allow multi kafka servers usage, use clowder KafkaServers. FIXES: https://issues.redhat.com/browse/THEEDGE-3708 --- config/config.go | 2 ++ config/config_test.go | 8 +++++++- pkg/common/kafka/kafkaconfigmap.go | 4 +--- pkg/common/kafka/kafkaconfigmap_test.go | 20 ++++++++++++++------ 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/config/config.go b/config/config.go index c8a2ff31c..cb45d2658 100644 --- a/config/config.go +++ b/config/config.go @@ -46,6 +46,7 @@ type EdgeConfig struct { UploadWorkers int `json:"upload_workers,omitempty"` KafkaConfig *clowder.KafkaConfig `json:"kafka,omitempty"` KafkaBrokers []clowder.BrokerConfig `json:"kafka_brokers,omitempty"` + KafkaServers string `json:"kafka_servers,omitempty"` KafkaBroker *clowder.BrokerConfig `json:"kafka_broker,omitempty"` KafkaBrokerCaCertPath string `json:"kafka_broker_ca_cert_path,omitempty"` KafkaRequestRequiredAcks int `json:"kafka_request_required_acks,omitempty"` @@ -331,6 +332,7 @@ func CreateEdgeAPIConfig() (*EdgeConfig, error) { } edgeConfig.KafkaConfig = cfg.Kafka + edgeConfig.KafkaServers = strings.Join(clowder.KafkaServers, ",") } // get edgeConfig from file if running in developer mode diff --git a/config/config_test.go b/config/config_test.go index c2a308bc4..5810913cd 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -117,8 +117,10 @@ func TestKafkaBroker(t *testing.T) { testCases := []struct { Name string clowderConfig *clowder.AppConfig + KafkaServers []string ExpectedKafkaBroker *clowder.BrokerConfig ExpectedKafkaBrokerCaCert string + ExpectedKafkaServers string }{ { @@ -132,7 +134,9 @@ func TestKafkaBroker(t *testing.T) { }, }, - ExpectedKafkaBroker: &kafkaBroker, + ExpectedKafkaBroker: &kafkaBroker, + KafkaServers: []string{"kafka-1.example.com:9099", "kafka-2.example.com:9099"}, + ExpectedKafkaServers: "kafka-1.example.com:9099,kafka-2.example.com:9099", }, { @@ -162,9 +166,11 @@ func TestKafkaBroker(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.Name, func(t *testing.T) { clowder.LoadedConfig = testCase.clowderConfig + clowder.KafkaServers = testCase.KafkaServers // init the configuration Init() assert.Equal(t, Config.KafkaBroker, testCase.ExpectedKafkaBroker) + assert.Equal(t, Config.KafkaServers, testCase.ExpectedKafkaServers) if testCase.ExpectedKafkaBroker != nil { if testCase.ExpectedKafkaBroker.Cacert != nil && *testCase.ExpectedKafkaBroker.Cacert != "" { assert.NotEmpty(t, Config.KafkaBrokerCaCertPath) diff --git a/pkg/common/kafka/kafkaconfigmap.go b/pkg/common/kafka/kafkaconfigmap.go index f9be65391..d8ba443b2 100644 --- a/pkg/common/kafka/kafkaconfigmap.go +++ b/pkg/common/kafka/kafkaconfigmap.go @@ -4,8 +4,6 @@ package kafkacommon import ( - "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/redhatinsights/edge-api/config" @@ -33,7 +31,7 @@ func (k *KafkaConfigMapService) getKafkaCommonConfigMap() kafka.ConfigMap { // use the first kafka broker from config if cfg.KafkaBroker != nil { - kafkaConfigMap.SetKey("bootstrap.servers", fmt.Sprintf("%s:%d", cfg.KafkaBroker.Hostname, *cfg.KafkaBroker.Port)) + kafkaConfigMap.SetKey("bootstrap.servers", cfg.KafkaServers) var securityProtocol string if cfg.KafkaBroker.SecurityProtocol != nil { securityProtocol = *cfg.KafkaBroker.SecurityProtocol diff --git a/pkg/common/kafka/kafkaconfigmap_test.go b/pkg/common/kafka/kafkaconfigmap_test.go index d05e7bea3..f91224656 100644 --- a/pkg/common/kafka/kafkaconfigmap_test.go +++ b/pkg/common/kafka/kafkaconfigmap_test.go @@ -20,12 +20,14 @@ func TestGetKafkaProducerConfigMap(t *testing.T) { conf := cfg.KafkaConfig originalKafkaBroker := cfg.KafkaBroker + originalKafkaServers := cfg.KafkaServers authType := clowder.BrokerConfigAuthtype("sasl") dummyString := "something" mech := "PLAIN" proto := "SASL_SSL" port := 80 + kafkaServers := "192.168.1.7:80,192.168.1.8:90" brokerConfig := clowder.BrokerConfig{ Authtype: &authType, Cacert: &dummyString, @@ -39,7 +41,7 @@ func TestGetKafkaProducerConfigMap(t *testing.T) { }, } kafkaConfigMap := kafka.ConfigMap{ - "bootstrap.servers": "192.168.1.7:80", + "bootstrap.servers": kafkaServers, "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_SSL", "sasl.username": "something", @@ -51,14 +53,16 @@ func TestGetKafkaProducerConfigMap(t *testing.T) { cfg.KafkaBroker = &brokerConfig // Reset config.kafkaconfig back to its original value - defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig) { + defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig, kafkaServers string) { config.Get().KafkaConfig = conf config.Get().KafkaBroker = kafkaBroker - }(conf, originalKafkaBroker) + config.Get().KafkaServers = kafkaServers + }(conf, originalKafkaBroker, originalKafkaServers) ctx := context.Background() cfg.KafkaConfig = &v1.KafkaConfig{} cfg.KafkaConfig.Brokers = []clowder.BrokerConfig{brokerConfig} + cfg.KafkaServers = kafkaServers cases := []struct { Name string @@ -89,6 +93,7 @@ func TestGetKafkaConsumerConfigMap(t *testing.T) { conf := cfg.KafkaConfig originalKafkaBroker := cfg.KafkaBroker + originalKafkaServers := cfg.KafkaServers authType := clowder.BrokerConfigAuthtype("sasl") dummyString := "something" @@ -107,8 +112,9 @@ func TestGetKafkaConsumerConfigMap(t *testing.T) { Password: &dummyString, }, } + kafkaServers := "192.168.1.7:80,192.168.1.8:90" kafkaConfigMap := kafka.ConfigMap{ - "bootstrap.servers": "192.168.1.7:80", + "bootstrap.servers": kafkaServers, "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_SSL", "sasl.username": "something", @@ -121,12 +127,14 @@ func TestGetKafkaConsumerConfigMap(t *testing.T) { brokerSlice := []clowder.BrokerConfig{brokerConfig} cfg.KafkaBrokers = brokerSlice cfg.KafkaBroker = &brokerConfig + cfg.KafkaServers = kafkaServers // Reset config.kafkaconfig back to its original value - defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig) { + defer func(conf *v1.KafkaConfig, kafkaBroker *clowder.BrokerConfig, kafkaServers string) { config.Get().KafkaConfig = conf config.Get().KafkaBroker = kafkaBroker - }(conf, originalKafkaBroker) + config.Get().KafkaServers = kafkaServers + }(conf, originalKafkaBroker, originalKafkaServers) ctx := context.Background() cfg.KafkaConfig = &v1.KafkaConfig{}