From cbc5d7cbea324db278cc512ff9853179615cb8cb Mon Sep 17 00:00:00 2001 From: Ryan Zhang Date: Mon, 11 Mar 2024 23:51:26 -0700 Subject: [PATCH] perf: increase the number of scheduler workers (#691) Co-authored-by: Ryan Zhang --- cmd/hubagent/workload/setup.go | 10 +++--- pkg/controllers/rollout/controller.go | 2 +- pkg/controllers/workgenerator/controller.go | 2 +- pkg/metrics/metrics.go | 6 ++++ pkg/scheduler/scheduler.go | 37 +++++++++++++++++---- test/e2e/README.md | 4 +-- test/scheduler/suite_test.go | 2 +- 7 files changed, 48 insertions(+), 15 deletions(-) diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index fd8228b7b..6c2030e54 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -207,7 +207,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, if err := (&rollout.Reconciler{ Client: mgr.GetClient(), UncachedReader: mgr.GetAPIReader(), - MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported) / 34)), //3 rollout reconciler routine per 100 member clusters + MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/30) * math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs)/10)), }).SetupWithManager(mgr); err != nil { klog.ErrorS(err, "Unable to set up rollout controller") return err @@ -217,7 +217,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, klog.Info("Setting up work generator") if err := (&workgenerator.Reconciler{ Client: mgr.GetClient(), - MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported) / 10)), //one work generator reconciler routine per 10 member clusters, + MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/10) * math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs)/10)), }).SetupWithManager(mgr); err != nil { klog.ErrorS(err, "Unable to set up work generator") return err @@ -230,7 +230,9 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, defaultSchedulingQueue := queue.NewSimpleClusterResourcePlacementSchedulingQueue( queue.WithName(schedulerQueueName), ) - defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr) + // we use one scheduler for every 10 concurrent placement + defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr, + int(math.Ceil(float64(opts.MaxFleetSizeSupported)/50)*math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs)/10))) klog.Info("Starting the scheduler") // Scheduler must run in a separate goroutine as Run() is a blocking call. wg.Add(1) @@ -284,7 +286,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, InformerManager: dynamicInformerManager, ResourceConfig: resourceConfig, SkippedNamespaces: skippedNamespaces, - ConcurrentClusterPlacementWorker: opts.ConcurrentClusterPlacementSyncs, + ConcurrentClusterPlacementWorker: int(math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs) / 10)), ConcurrentResourceChangeWorker: opts.ConcurrentResourceChangeSyncs, } diff --git a/pkg/controllers/rollout/controller.go b/pkg/controllers/rollout/controller.go index 7d71cf5f0..3a80226fa 100644 --- a/pkg/controllers/rollout/controller.go +++ b/pkg/controllers/rollout/controller.go @@ -466,7 +466,7 @@ func (r *Reconciler) updateBindings(ctx context.Context, latestResourceSnapshotN // It reconciles on the CRP when a new resource resourceBinding is created or an existing resource binding is created/updated. func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error { r.recorder = mgr.GetEventRecorderFor("rollout-controller") - return runtime.NewControllerManagedBy(mgr).Named("rollout_controller"). + return runtime.NewControllerManagedBy(mgr).Named("rollout-controller"). WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles Watches(&fleetv1beta1.ClusterResourceSnapshot{}, handler.Funcs{ CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { diff --git a/pkg/controllers/workgenerator/controller.go b/pkg/controllers/workgenerator/controller.go index 94b1a45e3..02eba931e 100644 --- a/pkg/controllers/workgenerator/controller.go +++ b/pkg/controllers/workgenerator/controller.go @@ -566,7 +566,7 @@ func extractResFromConfigMap(uConfigMap *unstructured.Unstructured) ([]fleetv1be // It watches binding events and also update/delete events for work. func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error { r.recorder = mgr.GetEventRecorderFor("work generator") - return controllerruntime.NewControllerManagedBy(mgr). + return controllerruntime.NewControllerManagedBy(mgr).Named("work-generator"). WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles For(&fleetv1beta1.ClusterResourceBinding{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Watches(&fleetv1beta1.Work{}, &handler.Funcs{ diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 52b214d3f..8a1a80cd9 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -68,4 +68,10 @@ var ( "needs_requeue", }, ) + + // SchedulerActiveWorkers is a prometheus metric which holds the number of active scheduler loop. + SchedulerActiveWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "scheduling_active_workers", + Help: "Number of currently running scheduling loop", + }, []string{}) ) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 69fa8f510..cb4f50bc5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -10,12 +10,13 @@ import ( "context" "fmt" "strconv" + "sync" "time" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -58,6 +59,9 @@ type Scheduler struct { // manager is the controller manager in use by the scheduler. manager ctrl.Manager + // workerNumber is number of scheduling loop we will run concurrently + workerNumber int + // eventRecorder is the event recorder in use by the scheduler. eventRecorder record.EventRecorder } @@ -68,6 +72,7 @@ func NewScheduler( framework framework.Framework, queue queue.ClusterResourcePlacementSchedulingQueue, manager ctrl.Manager, + workerNumber int, ) *Scheduler { return &Scheduler{ name: name, @@ -76,11 +81,13 @@ func NewScheduler( client: manager.GetClient(), uncachedReader: manager.GetAPIReader(), manager: manager, + workerNumber: workerNumber, eventRecorder: manager.GetEventRecorderFor(name), } } // ScheduleOnce performs scheduling for one single item pulled from the work queue. +// it returns true if the context is not canceled, false otherwise. func (s *Scheduler) scheduleOnce(ctx context.Context) { // Retrieve the next item (name of a CRP) from the work queue. // @@ -100,6 +107,10 @@ func (s *Scheduler) scheduleOnce(ctx context.Context) { s.queue.Done(crpName) }() + // keep track of the number of active scheduling loop + metrics.SchedulerActiveWorkers.WithLabelValues().Add(1) + defer metrics.SchedulerActiveWorkers.WithLabelValues().Add(-1) + startTime := time.Now() crpRef := klog.KRef("", string(crpName)) klog.V(2).InfoS("Schedule once", "clusterResourcePlacement", crpRef) @@ -221,14 +232,28 @@ func (s *Scheduler) Run(ctx context.Context) { // Starting the scheduling queue. s.queue.Run() - // Run scheduleOnce forever. - // - // The loop starts in a dedicated goroutine; it exits when the context is canceled. - go wait.UntilWithContext(ctx, s.scheduleOnce, 0) + wg := &sync.WaitGroup{} + wg.Add(s.workerNumber) + for i := 0; i < s.workerNumber; i++ { + go func() { + defer wg.Done() + defer utilruntime.HandleCrash() + // Run scheduleOnce forever until context is cancelled + for { + select { + case <-ctx.Done(): + return + default: + s.scheduleOnce(ctx) + } + } + }() + } // Wait for the context to be canceled. <-ctx.Done() - + // The loop starts in a dedicated goroutine; it exits when the context is canceled. + wg.Wait() // Stopping the scheduling queue; drain if necessary. // // Note that if a scheduling cycle is in progress; this will only return when the diff --git a/test/e2e/README.md b/test/e2e/README.md index 0b81c1ada..8e8be2484 100644 --- a/test/e2e/README.md +++ b/test/e2e/README.md @@ -14,7 +14,7 @@ test suites, follow the steps below: # Use a different path if the local set up is different. export KUBECONFIG=~/.kube/config export OUTPUT_TYPE=type=docker - ./setup.sh + ./setup.sh ${number of member clusters} ``` The setup script will perform the following tasks: @@ -65,5 +65,5 @@ To stop the `Kind` clusters, run the script `stop.sh`: ```sh chmod +x ./stop.sh - ./stop.sh + ./stop.sh ${number of member clusters} ``` \ No newline at end of file diff --git a/test/scheduler/suite_test.go b/test/scheduler/suite_test.go index c78b002b8..b3174ca1b 100644 --- a/test/scheduler/suite_test.go +++ b/test/scheduler/suite_test.go @@ -335,7 +335,7 @@ func beforeSuiteForProcess1() []byte { // Set up the scheduler. fw := buildSchedulerFramework(ctrlMgr, clusterEligibilityChecker) - sched := scheduler.NewScheduler(defaultSchedulerName, fw, schedulerWorkQueue, ctrlMgr) + sched := scheduler.NewScheduler(defaultSchedulerName, fw, schedulerWorkQueue, ctrlMgr, 3) // Run the controller manager. go func() {