Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to latest dependencies #4149

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

86 changes: 8 additions & 78 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package consumergroup

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

v1 "k8s.io/client-go/informers/core/v1"

"github.com/kelseyhightower/envconfig"
"go.uber.org/multierr"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -45,7 +43,6 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -93,11 +90,9 @@ type envConfig struct {
}

type SchedulerConfig struct {
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
SchedulerPolicy *scheduler.SchedulerPolicy
DeSchedulerPolicy *scheduler.SchedulerPolicy
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
}

func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
Expand All @@ -109,10 +104,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
SchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.SchedulerPolicyConfigMap),
DeSchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.DeSchedulerPolicyConfigMap),
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
}

dispatcherPodInformer := podinformer.Get(ctx, internalsapi.DispatcherLabelSelectorStr)
Expand Down Expand Up @@ -332,11 +325,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string,
return createStatefulSetScheduler(
ctx,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
SchedulerPolicy: c.SchedulerPolicy,
DeSchedulerPolicy: c.DeSchedulerPolicy,
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
},
func() ([]scheduler.VPod, error) {
consumerGroups, err := lister.List(labels.SelectorFromSet(getSelectorLabel(ssName)))
Expand Down Expand Up @@ -380,12 +371,8 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
ScaleCacheConfig: scheduler.ScaleCacheConfig{RefreshPeriod: statefulSetScaleCacheRefreshPeriod},
PodCapacity: c.Capacity,
RefreshPeriod: c.RefreshPeriod,
SchedulerPolicy: scheduler.MAXFILLUP,
SchedPolicy: c.SchedulerPolicy,
DeschedPolicy: c.DeSchedulerPolicy,
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: dispatcherPodInformer.Lister().Pods(system.Namespace()),
})

Expand All @@ -394,60 +381,3 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
SchedulerConfig: c,
}
}

// schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMapOrFail(ctx context.Context, configMapName string) *scheduler.SchedulerPolicy {
p, err := schedulerPolicyFromConfigMap(ctx, configMapName)
if err != nil {
logging.FromContext(ctx).Fatal(zap.Error(err))
}
return p
}

// schedulerPolicyFromConfigMap reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMap(ctx context.Context, configMapName string) (*scheduler.SchedulerPolicy, error) {
policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err)
}

logger := logging.FromContext(ctx).
Desugar().
With(zap.String("configmap", configMapName))
policy := &scheduler.SchedulerPolicy{}

preds, found := policyConfigMap.Data["predicates"]
if !found {
return nil, fmt.Errorf("missing policy config map %s/%s value at key predicates", system.Namespace(), configMapName)
}
if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

priors, found := policyConfigMap.Data["priorities"]
if !found {
return nil, fmt.Errorf("missing policy config map value at key priorities")
}
if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

if errs := validatePolicy(policy); errs != nil {
return nil, multierr.Combine(err)
}

logger.Info("Schedulers policy registration", zap.Any("policy", policy))

return policy, nil
}

func validatePolicy(policy *scheduler.SchedulerPolicy) []error {
var validationErrors []error

for _, priority := range policy.Priorities {
if priority.Weight < scheduler.MinWeight || priority.Weight > scheduler.MaxWeight {
validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name))
}
}
return validationErrors
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/apiserver v0.30.3
k8s.io/client-go v0.30.3
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7
knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358
knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60
knative.dev/pkg v0.0.0-20241026180704-25f6002b00f3
knative.dev/reconciler-test v0.0.0-20241024141702-aae114c1c0e3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1214,8 +1214,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7 h1:pYKhXbvHVOmQumyKS7vjQBaB11rXzeAjz84z2L9qrtM=
knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7/go.mod h1:2mdt9J66vQYzxizDz8I/F6IGzV1QgwCkacBR8X12Ssk=
knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358 h1:RwoyL/OsdC2xzWARfLRKWBAkqG64F1CMNHAIbWHTBAI=
knative.dev/eventing v0.43.1-0.20241029203049-7c97e6ff8358/go.mod h1:2mdt9J66vQYzxizDz8I/F6IGzV1QgwCkacBR8X12Ssk=
knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60 h1:LjBbosBvW/9/qjzIJtGpehPsbNWVvy1Fz8yZvMbFWe4=
knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY=
knative.dev/pkg v0.0.0-20241026180704-25f6002b00f3 h1:uUSDGlOIkdPT4svjlhi+JEnP2Ufw7AM/F5QDYiEL02U=
Expand Down
2 changes: 1 addition & 1 deletion test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
source $(dirname "$0")/e2e-common.sh

if ! ${SKIP_INITIALIZE}; then
initialize "$@" --num-nodes=4
initialize "$@" --num-nodes=5
save_release_artifacts || fail_test "Failed to save release artifacts"
fi

Expand Down
50 changes: 44 additions & 6 deletions test/e2e/sacura_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
Expand All @@ -31,17 +32,22 @@ import (
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/dynamic"
"k8s.io/utils/pointer"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
testlib "knative.dev/eventing/test/lib"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internalskafkaeventing/v1alpha1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

pkgtest "knative.dev/eventing-kafka-broker/test/pkg"
kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka"
pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing"
Expand All @@ -64,6 +70,8 @@ type SacuraTestConfig struct {
// Namespace is the test namespace.
Namespace string

ConsumerResourceGVR schema.GroupVersionResource

// BrokerTopic is the expected Broker topic.
// It's used to verify the committed offset.
BrokerTopic *string
Expand All @@ -79,15 +87,17 @@ type SacuraTestConfig struct {

func TestSacuraSinkSourceJob(t *testing.T) {
runSacuraTest(t, SacuraTestConfig{
Namespace: "sacura-sink-source",
SourceTopic: pointer.String("sacura-sink-source-topic"),
Namespace: "sacura-sink-source",
ConsumerResourceGVR: sources.SchemeGroupVersion.WithResource("kafkasources"),
SourceTopic: pointer.String("sacura-sink-source-topic"),
})
}

func TestSacuraBrokerJob(t *testing.T) {
runSacuraTest(t, SacuraTestConfig{
Namespace: "sacura",
BrokerTopic: pointer.String("knative-broker-sacura-sink-source-broker"),
Namespace: "sacura",
ConsumerResourceGVR: eventing.SchemeGroupVersion.WithResource("triggers"),
BrokerTopic: pointer.String("knative-broker-sacura-sink-source-broker"),
})
}

Expand All @@ -98,6 +108,15 @@ func runSacuraTest(t *testing.T, config SacuraTestConfig) {

ctx := context.Background()

watchUserFacingResource := watchResource(t, ctx, c.Dynamic, config.Namespace, config.ConsumerResourceGVR)
t.Cleanup(watchUserFacingResource.Stop)

watchConsumerGroups := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumergroups"))
t.Cleanup(watchConsumerGroups.Stop)

watchConsumer := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumers"))
t.Cleanup(watchConsumer.Stop)

jobPollError := wait.Poll(pollInterval, pollTimeout, func() (done bool, err error) {
job, err := c.Kube.BatchV1().Jobs(config.Namespace).Get(ctx, app, metav1.GetOptions{})
assert.Nil(t, err)
Expand Down Expand Up @@ -195,3 +214,22 @@ func getKafkaSubscriptionConsumerGroup(ctx context.Context, c dynamic.Interface,
return fmt.Sprintf("kafka.%s.%s.%s", c, sacuraChannelName, string(sub.UID))
}
}

func watchResource(t *testing.T, ctx context.Context, dynamic dynamic.Interface, ns string, gvr schema.GroupVersionResource) watch.Interface {

w, err := dynamic.Resource(gvr).
Namespace(ns).
Watch(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal("Failed to watch resource", gvr, err)
}

go func() {
for e := range w.ResultChan() {
bytes, _ := json.MarshalIndent(e, "", " ")
t.Logf("Resource %q changed:\n%s\n\n", gvr.String(), string(bytes))
}
}()

return w
}
2 changes: 1 addition & 1 deletion test/keda-reconciler-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ source $(dirname "$0")/e2e-common.sh
export BROKER_TEMPLATES=./templates/kafka-broker

if ! ${SKIP_INITIALIZE}; then
initialize "$@" --num-nodes=4
initialize "$@" --num-nodes=5
save_release_artifacts || fail_test "Failed to save release artifacts"
fi

Expand Down
2 changes: 1 addition & 1 deletion test/reconciler-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ source $(dirname "$0")/e2e-common.sh
export BROKER_TEMPLATES=./templates/kafka-broker

if ! ${SKIP_INITIALIZE}; then
initialize "$@" --num-nodes=4
initialize "$@" --num-nodes=5
save_release_artifacts || fail_test "Failed to save release artifacts"
fi

Expand Down
2 changes: 1 addition & 1 deletion test/upgrade-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function test_setup() {
}

if ! ${SKIP_INITIALIZE}; then
initialize "$@"
initialize "$@" --num-nodes=5
save_release_artifacts || fail_test "Failed to save release artifacts"
fi

Expand Down
Loading
Loading