Skip to content

Commit

Permalink
[release-1.12] Test Keda Kafka Source Scaling (#3586)
Browse files Browse the repository at this point in the history
* test: added kafkasource keda scaling test

Signed-off-by: Calum Murray <[email protected]>

* fix: fixed keda cooldown interval to be aligned with serving

Signed-off-by: Calum Murray <[email protected]>

* build: fix extra args in call to GenerateScaledObject

Signed-off-by: Calum Murray <[email protected]>

* build: fix lint check

Signed-off-by: Calum Murray <[email protected]>

* build: try again to fix lint issues

Signed-off-by: Calum Murray <[email protected]>

* address review comments

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
  • Loading branch information
knative-prow-robot and Cali0707 authored Jan 15, 2024
1 parent 6ea8642 commit 15ebc55
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ data:
min-scale: "0"
max-scale: "50"
polling-interval: "10"
cooldown-period: "300"
cooldown-period: "30"
lag-threshold: "100"
activation-lag-threshold: "1"
4 changes: 1 addition & 3 deletions control-plane/pkg/autoscaler/autoscaler_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ const (
// DefaultPollingInterval is the default value for AutoscalingPollingIntervalAnnotation.
DefaultPollingInterval = 10
// DefaultCooldownPeriod is the default value for AutoscalingCooldownPeriodAnnotation.
DefaultCooldownPeriod = 300
DefaultCooldownPeriod = 30
// DefaultMinReplicaCount is the default value for AutoscalingMinScaleAnnotation
DefaultMinReplicaCount = 0
// DefaultMaxReplicaCount is the default value for AutoscalingMaxScaleAnnotation.
DefaultMaxReplicaCount = 50
// DefaultLagThreshold is the default value for AutoscalingLagThreshold.
DefaultLagThreshold = 100
// DefaultActivationLagThreshold is the default value for AutoscalingActivationLagThreshold.
DefaultActivationLagThreshold = 1
)
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ var (
autoscaler.AutoscalingMinScaleAnnotation: "0",
autoscaler.AutoscalingMaxScaleAnnotation: "5",
autoscaler.AutoscalingPollingIntervalAnnotation: "30",
autoscaler.AutoscalingCooldownPeriodAnnotation: "300",
autoscaler.AutoscalingCooldownPeriodAnnotation: "30",
autoscaler.AutoscalingLagThreshold: "10",
autoscaler.AutoscalingActivationLagThreshold: "1",
}
)

Expand Down
17 changes: 17 additions & 0 deletions test/e2e_new/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package e2e_new

import (
"testing"
"time"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -207,3 +208,19 @@ func TestKafkaSourceUpdate(t *testing.T) {
// the new event is delivered properly.
env.Test(ctx, t, features.KafkaSourceWithEventAfterUpdate(kafkaSource, kafkaSink, topic))
}

func TestKafkaSourceKedaScaling(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(5*time.Second, 4*time.Minute),
environment.Managed(t),
)

env.Test(ctx, t, features.KafkaSourceScalesToZeroWithKeda())

}
97 changes: 97 additions & 0 deletions test/rekt/features/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/test"
. "github.com/cloudevents/sdk-go/v2/test"
cetest "github.com/cloudevents/sdk-go/v2/test"
cetypes "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/rekt/features/featuressteps"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
Expand All @@ -48,6 +51,7 @@ import (
kafkaclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client"
sourcesclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client"
consumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client"
"knative.dev/eventing-kafka-broker/test/rekt/features/kafkafeatureflags"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"

Expand Down Expand Up @@ -561,3 +565,96 @@ func KafkaSourceWithEventAfterUpdate(kafkaSource, kafkaSink, topic string) *feat

return f
}

func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

// we need to ensure that autoscaling is enabled for the rest of the feature to work
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())

kafkaSource := feature.MakeRandomK8sName("kafka-source")
topic := feature.MakeRandomK8sName("topic")
kafkaSink := feature.MakeRandomK8sName("kafkaSink")
receiver := feature.MakeRandomK8sName("eventshub-receiver")
sender := feature.MakeRandomK8sName("eventshub-sender")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

f.Setup("install kafka topic", kafkatopic.Install(topic))
f.Setup("topic is ready", kafkatopic.IsReady(topic))

// Binary content mode is default for Kafka Sink.
f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr))
f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink))

f.Setup("install eventshub receiver", eventshub.Install(receiver, eventshub.StartReceiver))

kafkaSourceOpts := []manifest.CfgFn{
kafkasource.WithSink(service.AsKReference(receiver), ""),
kafkasource.WithTopics([]string{topic}),
kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr),
}

f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...))
f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource))

// check that the source initially has replicas = 0
f.Setup("Source should start with replicas = 0", verifyConsumerGroupReplicas(kafkaSource, 0, true))

options := []eventshub.EventsHubOption{
eventshub.StartSenderToResource(kafkasink.GVR(), kafkaSink),
eventshub.InputEvent(event),
}
f.Requirement("install eventshub sender", eventshub.Install(sender, options...))

f.Requirement("eventshub receiver gets event", assert.OnStore(receiver).MatchEvent(test.HasId(event.ID())).Exact(1))

// after the event is sent, the source should scale down to zero replicas
f.Alpha("KafkaSource").Must("Scale down to zero", verifyConsumerGroupReplicas(kafkaSource, 0, false))

return f
}

func verifyConsumerGroupReplicas(source string, replicas int32, allowNotFound bool) feature.StepFn {
return func(ctx context.Context, t feature.T) {
var seenReplicas int32
interval, timeout := environment.PollTimingsFromContext(ctx)
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ns := environment.FromContext(ctx).Namespace()

ks, err := sourcesclient.Get(ctx).
SourcesV1beta1().
KafkaSources(ns).
Get(ctx, source, metav1.GetOptions{})
if err != nil {
if allowNotFound {
return false, nil
}
t.Fatal(err)
}

InternalsClient := consumergroupclient.Get(ctx)
cg, err := InternalsClient.InternalV1alpha1().
ConsumerGroups(ns).
Get(ctx, string(ks.UID), metav1.GetOptions{})

if err != nil {
if allowNotFound {
return false, nil
}
t.Fatal(err)
}

if *cg.Spec.Replicas != replicas {
seenReplicas = *cg.Spec.Replicas
return false, nil
}
return true, nil
})

if err != nil {
t.Errorf("failed to verify consumergroup replicas. Expected %d, final value was %d", replicas, seenReplicas)
}
}
}
55 changes: 55 additions & 0 deletions test/rekt/features/kafkafeatureflags/kafkafeatureflags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafkafeatureflags

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kafkafeature "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/feature"
)

func AutoscalingEnabled() feature.ShouldRun {
return func(ctx context.Context, t feature.T) (feature.PrerequisiteResult, error) {
flags, err := getKafkaFeatureFlags(ctx, "config-kafka-features")
if err != nil {
return feature.PrerequisiteResult{}, err
}

return feature.PrerequisiteResult{
ShouldRun: flags.IsControllerAutoscalerEnabled(),
}, nil

}
}

func getKafkaFeatureFlags(ctx context.Context, cmName string) (*kafkafeature.KafkaFeatureFlags, error) {
ns := system.Namespace()
cm, err := kubeclient.Get(ctx).
CoreV1().
ConfigMaps(ns).
Get(ctx, cmName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get cm %s/%s: %s", ns, cmName, err)
}

return kafkafeature.NewFeaturesConfigFromMap(cm)
}

0 comments on commit 15ebc55

Please sign in to comment.