Skip to content

Commit

Permalink
Remove scheduler config
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Nov 26, 2024
1 parent 5903ff9 commit 98e887b
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 222 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,6 @@ spec:
- name: POD_CAPACITY
value: '20'

- name: SCHEDULER_CONFIG
value: 'config-kafka-scheduler'

- name: DESCHEDULER_CONFIG
value: 'config-kafka-descheduler'

- name: AUTOSCALER_CONFIG
value: 'config-kafka-autoscaler'

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ spec:
- name: POD_CAPACITY
value: '20'

- name: SCHEDULER_CONFIG
value: 'config-kafka-scheduler'

- name: DESCHEDULER_CONFIG
value: 'config-kafka-descheduler'

- name: AUTOSCALER_CONFIG
value: 'config-kafka-autoscaler'

- name: CONFIG_LEADERELECTION_NAME
value: config-kafka-leader-election
- name: CONFIG_LOGGING_NAME
Expand Down
94 changes: 11 additions & 83 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package consumergroup

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

v1 "k8s.io/client-go/informers/core/v1"

"github.com/kelseyhightower/envconfig"
"go.uber.org/multierr"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -45,7 +43,6 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -85,19 +82,15 @@ var (
)

type envConfig struct {
SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"`
PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"`
SchedulerPolicyConfigMap string `envconfig:"SCHEDULER_CONFIG" required:"true"`
DeSchedulerPolicyConfigMap string `envconfig:"DESCHEDULER_CONFIG" required:"true"`
AutoscalerConfigMap string `envconfig:"AUTOSCALER_CONFIG" required:"true"`
SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"`
PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"`
AutoscalerConfigMap string `envconfig:"AUTOSCALER_CONFIG" required:"true"`
}

type SchedulerConfig struct {
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
SchedulerPolicy *scheduler.SchedulerPolicy
DeSchedulerPolicy *scheduler.SchedulerPolicy
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
}

func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
Expand All @@ -109,10 +102,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
SchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.SchedulerPolicyConfigMap),
DeSchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.DeSchedulerPolicyConfigMap),
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
}

dispatcherPodInformer := podinformer.Get(ctx, internalsapi.DispatcherLabelSelectorStr)
Expand Down Expand Up @@ -332,11 +323,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string,
return createStatefulSetScheduler(
ctx,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
SchedulerPolicy: c.SchedulerPolicy,
DeSchedulerPolicy: c.DeSchedulerPolicy,
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
},
func() ([]scheduler.VPod, error) {
consumerGroups, err := lister.List(labels.SelectorFromSet(getSelectorLabel(ssName)))
Expand Down Expand Up @@ -380,12 +369,8 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
ScaleCacheConfig: scheduler.ScaleCacheConfig{RefreshPeriod: statefulSetScaleCacheRefreshPeriod},
PodCapacity: c.Capacity,
RefreshPeriod: c.RefreshPeriod,
SchedulerPolicy: scheduler.MAXFILLUP,
SchedPolicy: c.SchedulerPolicy,
DeschedPolicy: c.DeSchedulerPolicy,
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: dispatcherPodInformer.Lister().Pods(system.Namespace()),
})

Expand All @@ -394,60 +379,3 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
SchedulerConfig: c,
}
}

// schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMapOrFail(ctx context.Context, configMapName string) *scheduler.SchedulerPolicy {
p, err := schedulerPolicyFromConfigMap(ctx, configMapName)
if err != nil {
logging.FromContext(ctx).Fatal(zap.Error(err))
}
return p
}

// schedulerPolicyFromConfigMap reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMap(ctx context.Context, configMapName string) (*scheduler.SchedulerPolicy, error) {
policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err)
}

logger := logging.FromContext(ctx).
Desugar().
With(zap.String("configmap", configMapName))
policy := &scheduler.SchedulerPolicy{}

preds, found := policyConfigMap.Data["predicates"]
if !found {
return nil, fmt.Errorf("missing policy config map %s/%s value at key predicates", system.Namespace(), configMapName)
}
if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

priors, found := policyConfigMap.Data["priorities"]
if !found {
return nil, fmt.Errorf("missing policy config map value at key priorities")
}
if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

if errs := validatePolicy(policy); errs != nil {
return nil, multierr.Combine(err)
}

logger.Info("Schedulers policy registration", zap.Any("policy", policy))

return policy, nil
}

func validatePolicy(policy *scheduler.SchedulerPolicy) []error {
var validationErrors []error

for _, priority := range policy.Priorities {
if priority.Weight < scheduler.MinWeight || priority.Weight > scheduler.MaxWeight {
validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name))
}
}
return validationErrors
}
49 changes: 1 addition & 48 deletions control-plane/pkg/reconciler/consumergroup/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/node/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
Expand All @@ -52,10 +50,6 @@ import (
const (
RefreshPeriod = "100"
PodCapacity = "20"
// ConfigKafkaSchedulerName is the name of the ConfigMap to configure the scheduler.
ConfigKafkaSchedulerName = "config-kafka-scheduler"
// ConfigKafkaDeSchedulerName is the name of the ConfigMap to configure the descheduler.
ConfigKafkaDeSchedulerName = "config-kafka-descheduler"
// ConfigKafkaAutoscalerName is the name of the ConfigMap to configure the autoscaler.
ConfigKafkaAutoscalerName = "config-kafka-autoscaler"
)
Expand All @@ -67,52 +61,11 @@ func TestNewController(t *testing.T) {
)
})
ctx, _ = kedaclient.With(ctx)

t.Setenv("SYSTEM_NAMESPACE", systemNamespace)

ctx, _ = kubeclient.With(ctx,
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: ConfigKafkaSchedulerName,
Namespace: systemNamespace,
},
Data: map[string]string{
"predicates": `
[
{"Name": "PodFitsResources"},
{"Name": "NoMaxResourceCount", "Args": "{\"NumPartitions\": 100}"},
{"Name": "EvenPodSpread", "Args": "{\"MaxSkew\": 2}"}
]`,
"priorities": `
[
{"Name": "AvailabilityZonePriority", "Weight": 10, "Args": "{\"MaxSkew\": 2}"},
{"Name": "LowestOrdinalPriority", "Weight": 2}
]`,
},
},
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: ConfigKafkaDeSchedulerName,
Namespace: systemNamespace,
},
Data: map[string]string{
"predicates": `[]`,
"priorities": `
[
{"Name": "RemoveWithEvenPodSpreadPriority", "Weight": 10, "Args": "{\"MaxSkew\": 2}"},
{"Name": "RemoveWithAvailabilityZonePriority", "Weight": 10, "Args": "{\"MaxSkew\": 2}"},
{"Name": "RemoveWithHighestOrdinalPriority", "Weight": 2}
]`,
},
},
)

ctx = clientpool.WithKafkaClientPool(ctx)

t.Setenv("SYSTEM_NAMESPACE", systemNamespace)
t.Setenv("AUTOSCALER_REFRESH_PERIOD", RefreshPeriod)
t.Setenv("POD_CAPACITY", PodCapacity)
t.Setenv("SCHEDULER_CONFIG", ConfigKafkaSchedulerName)
t.Setenv("DESCHEDULER_CONFIG", ConfigKafkaDeSchedulerName)
t.Setenv("AUTOSCALER_CONFIG", ConfigKafkaAutoscalerName)
controller := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 98e887b

Please sign in to comment.