From 15ebc559011b54963dbef6cced830556d863497d Mon Sep 17 00:00:00 2001 From: Knative Prow Robot Date: Mon, 15 Jan 2024 15:05:34 +0000 Subject: [PATCH] [release-1.12] Test Keda Kafka Source Scaling (#3586) * test: added kafkasource keda scaling test Signed-off-by: Calum Murray * fix: fixed keda cooldown interval to be aligned with serving Signed-off-by: Calum Murray * build: fix extra args in call to GenerateScaledObject Signed-off-by: Calum Murray * build: fix lint check Signed-off-by: Calum Murray * build: try again to fix lint issues Signed-off-by: Calum Murray * address review comments Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray Co-authored-by: Calum Murray --- .../100-config-kafka-autoscaler.yaml | 3 +- .../pkg/autoscaler/autoscaler_api.go | 4 +- .../testing/objects_consumergroup.go | 3 +- test/e2e_new/kafka_source_test.go | 17 ++++ test/rekt/features/kafka_source.go | 97 +++++++++++++++++++ .../kafkafeatureflags/kafkafeatureflags.go | 55 +++++++++++ 6 files changed, 172 insertions(+), 7 deletions(-) create mode 100644 test/rekt/features/kafkafeatureflags/kafkafeatureflags.go diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-autoscaler.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-autoscaler.yaml index 572733c381..d4fe3bb41b 100644 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-autoscaler.yaml +++ b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-autoscaler.yaml @@ -24,6 +24,5 @@ data: min-scale: "0" max-scale: "50" polling-interval: "10" - cooldown-period: "300" + cooldown-period: "30" lag-threshold: "100" - activation-lag-threshold: "1" diff --git a/control-plane/pkg/autoscaler/autoscaler_api.go b/control-plane/pkg/autoscaler/autoscaler_api.go index 58486b4c25..c58b40269e 100644 --- a/control-plane/pkg/autoscaler/autoscaler_api.go +++ b/control-plane/pkg/autoscaler/autoscaler_api.go @@ -52,13 +52,11 @@ const ( // DefaultPollingInterval is the default value for AutoscalingPollingIntervalAnnotation. DefaultPollingInterval = 10 // DefaultCooldownPeriod is the default value for AutoscalingCooldownPeriodAnnotation. - DefaultCooldownPeriod = 300 + DefaultCooldownPeriod = 30 // DefaultMinReplicaCount is the default value for AutoscalingMinScaleAnnotation DefaultMinReplicaCount = 0 // DefaultMaxReplicaCount is the default value for AutoscalingMaxScaleAnnotation. DefaultMaxReplicaCount = 50 // DefaultLagThreshold is the default value for AutoscalingLagThreshold. DefaultLagThreshold = 100 - // DefaultActivationLagThreshold is the default value for AutoscalingActivationLagThreshold. - DefaultActivationLagThreshold = 1 ) diff --git a/control-plane/pkg/reconciler/testing/objects_consumergroup.go b/control-plane/pkg/reconciler/testing/objects_consumergroup.go index 075d406dd6..e762c1365f 100644 --- a/control-plane/pkg/reconciler/testing/objects_consumergroup.go +++ b/control-plane/pkg/reconciler/testing/objects_consumergroup.go @@ -76,9 +76,8 @@ var ( autoscaler.AutoscalingMinScaleAnnotation: "0", autoscaler.AutoscalingMaxScaleAnnotation: "5", autoscaler.AutoscalingPollingIntervalAnnotation: "30", - autoscaler.AutoscalingCooldownPeriodAnnotation: "300", + autoscaler.AutoscalingCooldownPeriodAnnotation: "30", autoscaler.AutoscalingLagThreshold: "10", - autoscaler.AutoscalingActivationLagThreshold: "1", } ) diff --git a/test/e2e_new/kafka_source_test.go b/test/e2e_new/kafka_source_test.go index 2a0e70342c..ca1b27b808 100644 --- a/test/e2e_new/kafka_source_test.go +++ b/test/e2e_new/kafka_source_test.go @@ -21,6 +21,7 @@ package e2e_new import ( "testing" + "time" "knative.dev/eventing-kafka-broker/test/rekt/features" "knative.dev/pkg/system" @@ -207,3 +208,19 @@ func TestKafkaSourceUpdate(t *testing.T) { // the new event is delivered properly. env.Test(ctx, t, features.KafkaSourceWithEventAfterUpdate(kafkaSource, kafkaSink, topic)) } + +func TestKafkaSourceKedaScaling(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.WithPollTimings(5*time.Second, 4*time.Minute), + environment.Managed(t), + ) + + env.Test(ctx, t, features.KafkaSourceScalesToZeroWithKeda()) + +} diff --git a/test/rekt/features/kafka_source.go b/test/rekt/features/kafka_source.go index 058778049c..a7397ee7d0 100644 --- a/test/rekt/features/kafka_source.go +++ b/test/rekt/features/kafka_source.go @@ -23,11 +23,14 @@ import ( "strings" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/test" . "github.com/cloudevents/sdk-go/v2/test" cetest "github.com/cloudevents/sdk-go/v2/test" cetypes "github.com/cloudevents/sdk-go/v2/types" + "github.com/google/uuid" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/rekt/features/featuressteps" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" @@ -48,6 +51,7 @@ import ( kafkaclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client" sourcesclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client" consumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client" + "knative.dev/eventing-kafka-broker/test/rekt/features/kafkafeatureflags" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" @@ -561,3 +565,96 @@ func KafkaSourceWithEventAfterUpdate(kafkaSource, kafkaSink, topic string) *feat return f } + +func KafkaSourceScalesToZeroWithKeda() *feature.Feature { + f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda") + + // we need to ensure that autoscaling is enabled for the rest of the feature to work + f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled()) + + kafkaSource := feature.MakeRandomK8sName("kafka-source") + topic := feature.MakeRandomK8sName("topic") + kafkaSink := feature.MakeRandomK8sName("kafkaSink") + receiver := feature.MakeRandomK8sName("eventshub-receiver") + sender := feature.MakeRandomK8sName("eventshub-sender") + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install kafka topic", kafkatopic.Install(topic)) + f.Setup("topic is ready", kafkatopic.IsReady(topic)) + + // Binary content mode is default for Kafka Sink. + f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr)) + f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink)) + + f.Setup("install eventshub receiver", eventshub.Install(receiver, eventshub.StartReceiver)) + + kafkaSourceOpts := []manifest.CfgFn{ + kafkasource.WithSink(service.AsKReference(receiver), ""), + kafkasource.WithTopics([]string{topic}), + kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr), + } + + f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...)) + f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource)) + + // check that the source initially has replicas = 0 + f.Setup("Source should start with replicas = 0", verifyConsumerGroupReplicas(kafkaSource, 0, true)) + + options := []eventshub.EventsHubOption{ + eventshub.StartSenderToResource(kafkasink.GVR(), kafkaSink), + eventshub.InputEvent(event), + } + f.Requirement("install eventshub sender", eventshub.Install(sender, options...)) + + f.Requirement("eventshub receiver gets event", assert.OnStore(receiver).MatchEvent(test.HasId(event.ID())).Exact(1)) + + // after the event is sent, the source should scale down to zero replicas + f.Alpha("KafkaSource").Must("Scale down to zero", verifyConsumerGroupReplicas(kafkaSource, 0, false)) + + return f +} + +func verifyConsumerGroupReplicas(source string, replicas int32, allowNotFound bool) feature.StepFn { + return func(ctx context.Context, t feature.T) { + var seenReplicas int32 + interval, timeout := environment.PollTimingsFromContext(ctx) + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { + ns := environment.FromContext(ctx).Namespace() + + ks, err := sourcesclient.Get(ctx). + SourcesV1beta1(). + KafkaSources(ns). + Get(ctx, source, metav1.GetOptions{}) + if err != nil { + if allowNotFound { + return false, nil + } + t.Fatal(err) + } + + InternalsClient := consumergroupclient.Get(ctx) + cg, err := InternalsClient.InternalV1alpha1(). + ConsumerGroups(ns). + Get(ctx, string(ks.UID), metav1.GetOptions{}) + + if err != nil { + if allowNotFound { + return false, nil + } + t.Fatal(err) + } + + if *cg.Spec.Replicas != replicas { + seenReplicas = *cg.Spec.Replicas + return false, nil + } + return true, nil + }) + + if err != nil { + t.Errorf("failed to verify consumergroup replicas. Expected %d, final value was %d", replicas, seenReplicas) + } + } +} diff --git a/test/rekt/features/kafkafeatureflags/kafkafeatureflags.go b/test/rekt/features/kafkafeatureflags/kafkafeatureflags.go new file mode 100644 index 0000000000..9d0b827119 --- /dev/null +++ b/test/rekt/features/kafkafeatureflags/kafkafeatureflags.go @@ -0,0 +1,55 @@ +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafkafeatureflags + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kafkafeature "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/feature" +) + +func AutoscalingEnabled() feature.ShouldRun { + return func(ctx context.Context, t feature.T) (feature.PrerequisiteResult, error) { + flags, err := getKafkaFeatureFlags(ctx, "config-kafka-features") + if err != nil { + return feature.PrerequisiteResult{}, err + } + + return feature.PrerequisiteResult{ + ShouldRun: flags.IsControllerAutoscalerEnabled(), + }, nil + + } +} + +func getKafkaFeatureFlags(ctx context.Context, cmName string) (*kafkafeature.KafkaFeatureFlags, error) { + ns := system.Namespace() + cm, err := kubeclient.Get(ctx). + CoreV1(). + ConfigMaps(ns). + Get(ctx, cmName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get cm %s/%s: %s", ns, cmName, err) + } + + return kafkafeature.NewFeaturesConfigFromMap(cm) +}