From 987feee2ca58bf615d05f58e0e7db3d8809dcbe2 Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Mon, 13 Dec 2021 20:04:19 +0100 Subject: [PATCH] Use Broker's own configmap for Kafka (#1595) --- test/pkg/broker/broker.go | 31 +++++++++++++++++++++++++++++++ test/pkg/testing/run.go | 3 +++ 2 files changed, 34 insertions(+) diff --git a/test/pkg/broker/broker.go b/test/pkg/broker/broker.go index 9ac8e60e9b..2d7faff4ee 100644 --- a/test/pkg/broker/broker.go +++ b/test/pkg/broker/broker.go @@ -17,13 +17,20 @@ package broker import ( + "context" "fmt" "strings" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingtestlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func Creator(client *eventingtestlib.Client, version string) string { @@ -33,9 +40,33 @@ func Creator(client *eventingtestlib.Client, version string) string { switch version { case "v1": + namespace := client.Namespace + cmName := "kafka-broker-upgrade-config" + // Create Broker's own ConfigMap to prevent using defaults. + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: namespace, + }, + Data: map[string]string{ + broker.BootstrapServersConfigMapKey: testingpkg.BootstrapServersPlaintext, + broker.DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", testingpkg.NumPartitions), + broker.DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", testingpkg.ReplicationFactor), + }, + } + cm, err := client.Kube.CoreV1().ConfigMaps(namespace).Create(context.Background(), cm, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + client.T.Fatalf("Failed to create ConfigMap %s/%s: %v", namespace, cm.GetName(), err) + } client.CreateBrokerOrFail( name, resources.WithBrokerClassForBroker(kafka.BrokerClass), + resources.WithConfigForBroker(&duckv1.KReference{ + Kind: "ConfigMap", + Namespace: namespace, + Name: cmName, + APIVersion: "v1", + }), ) default: panic(fmt.Sprintf("Unsupported version of Broker: %q", version)) diff --git a/test/pkg/testing/run.go b/test/pkg/testing/run.go index f938e3b81e..9335e3d7bc 100644 --- a/test/pkg/testing/run.go +++ b/test/pkg/testing/run.go @@ -31,6 +31,9 @@ const ( BootstrapServersSaslPlaintext = "my-cluster-kafka-bootstrap.kafka:9095" BootstrapServersSslSaslScram = "my-cluster-kafka-bootstrap.kafka:9094" + NumPartitions = 10 + ReplicationFactor = 3 + KafkaClusterNamespace = "kafka" TlsUserSecretName = "my-tls-user" SaslUserSecretName = "my-sasl-user"