Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure externallisteners are listed first #97

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,20 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(ctx context.Context,
if kafkaCluster.Spec.KRaftMode {
switch broker.Id {
case 0:
expectedListeners = "INTERNAL://:29092,TEST://:9094"
expectedListeners = "TEST://:9094,INTERNAL://:29092"
case 1:
expectedListeners = "CONTROLLER://:29093"
case 2:
expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"
expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"
}
} else {
expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"
expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"
}
Expect(listeners.Value()).To(Equal(expectedListeners))

listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap)
Expect(found).To(BeTrue())
Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT"))
Expect(listenerSecMap.Value()).To(Equal("TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"))
// check service
service := corev1.Service{}
Eventually(ctx, func() error {
Expand Down
14 changes: 7 additions & 7 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
node.id=%d
Expand All @@ -265,7 +265,7 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=CONTROLLER://:29093
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
Expand All @@ -283,8 +283,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
node.id=%d
Expand All @@ -299,8 +299,8 @@ control.plane.listener.name=CONTROLLER
cruise.control.metrics.reporter.bootstrap.servers=kafkacluster-1-all-broker.kafka-1.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
zookeeper.connect=/
Expand Down
22 changes: 11 additions & 11 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,17 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri
internalListenerSSLConfig = make(map[string]string)
externalListenerSSLConfig = make(map[string]string)

for _, eListener := range l.ExternalListeners {
upperedListenerType := eListener.Type.ToUpperString()
upperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
// Add external listeners SSL configuration
if eListener.Type == v1beta1.SecurityProtocolSSL {
maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name]))
}
}

for _, iListener := range l.InternalListeners {
if iListener.UsedForInnerBrokerCommunication {
if interBrokerListenerName == "" {
Expand All @@ -436,17 +447,6 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri
}
}

for _, eListener := range l.ExternalListeners {
upperedListenerType := eListener.Type.ToUpperString()
upperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
// Add external listeners SSL configuration
if eListener.Type == v1beta1.SecurityProtocolSSL {
maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name]))
}
}

return interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig
}

Expand Down
Loading