From d1a2269a782c7769df7715a032c1e898c7a8ae59 Mon Sep 17 00:00:00 2001 From: Ha Van Date: Wed, 22 Jan 2025 17:33:32 -0600 Subject: [PATCH 1/2] Ensure externallisteners are listed first --- pkg/resources/kafka/configmap.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index dc4c29e2d..eb35e6042 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -417,6 +417,17 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri internalListenerSSLConfig = make(map[string]string) externalListenerSSLConfig = make(map[string]string) + for _, eListener := range l.ExternalListeners { + upperedListenerType := eListener.Type.ToUpperString() + upperedListenerName := strings.ToUpper(eListener.Name) + securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType)) + listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort)) + // Add external listeners SSL configuration + if eListener.Type == v1beta1.SecurityProtocolSSL { + maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name])) + } + } + for _, iListener := range l.InternalListeners { if iListener.UsedForInnerBrokerCommunication { if interBrokerListenerName == "" { @@ -436,17 +447,6 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri } } - for _, eListener := range l.ExternalListeners { - upperedListenerType := eListener.Type.ToUpperString() - upperedListenerName := strings.ToUpper(eListener.Name) - securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType)) - listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort)) - // Add external listeners SSL configuration - if eListener.Type == v1beta1.SecurityProtocolSSL { - maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name])) - } - } - return interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig } From 11d2689a0fb6d77c125e58ada8c45609dc86775a Mon Sep 17 00:00:00 2001 From: Ha Van Date: Thu, 23 Jan 2025 08:38:08 -0600 Subject: [PATCH 2/2] Update test --- ...ter_controller_externallistenerbindings_test.go | 8 ++++---- .../tests/kafkacluster_controller_kafka_test.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go index 8f04adbb5..e4f0f148d 100644 --- a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go +++ b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go @@ -81,20 +81,20 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(ctx context.Context, if kafkaCluster.Spec.KRaftMode { switch broker.Id { case 0: - expectedListeners = "INTERNAL://:29092,TEST://:9094" + expectedListeners = "TEST://:9094,INTERNAL://:29092" case 1: expectedListeners = "CONTROLLER://:29093" case 2: - expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094" + expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093" } } else { - expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094" + expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093" } Expect(listeners.Value()).To(Equal(expectedListeners)) listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap) Expect(found).To(BeTrue()) - Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT")) + Expect(listenerSecMap.Value()).To(Equal("TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")) // check service service := corev1.Service{} Eventually(ctx, func() error { diff --git a/controllers/tests/kafkacluster_controller_kafka_test.go b/controllers/tests/kafkacluster_controller_kafka_test.go index 41012126e..eaac7385f 100644 --- a/controllers/tests/kafkacluster_controller_kafka_test.go +++ b/controllers/tests/kafkacluster_controller_kafka_test.go @@ -248,8 +248,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092 cruise.control.metrics.reporter.kubernetes.mode=true inter.broker.listener.name=INTERNAL -listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT -listeners=INTERNAL://:29092,TEST://:9094 +listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=TEST://:9094,INTERNAL://:29092 log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter node.id=%d @@ -265,7 +265,7 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092 cruise.control.metrics.reporter.kubernetes.mode=true inter.broker.listener.name=INTERNAL -listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT +listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT listeners=CONTROLLER://:29093 log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter @@ -283,8 +283,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092 cruise.control.metrics.reporter.kubernetes.mode=true inter.broker.listener.name=INTERNAL -listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT -listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094 +listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093 log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter node.id=%d @@ -299,8 +299,8 @@ control.plane.listener.name=CONTROLLER cruise.control.metrics.reporter.bootstrap.servers=kafkacluster-1-all-broker.kafka-1.svc.cluster.local:29092 cruise.control.metrics.reporter.kubernetes.mode=true inter.broker.listener.name=INTERNAL -listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT -listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094 +listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093 log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter zookeeper.connect=/