Skip to content

Commit

Permalink
perf: increase the number of scheduler workers (#691)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Mar 12, 2024
1 parent 6db7ef7 commit cbc5d7c
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 15 deletions.
10 changes: 6 additions & 4 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
if err := (&rollout.Reconciler{
Client: mgr.GetClient(),
UncachedReader: mgr.GetAPIReader(),
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported) / 34)), //3 rollout reconciler routine per 100 member clusters
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/30) * math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs)/10)),
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up rollout controller")
return err
Expand All @@ -217,7 +217,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
klog.Info("Setting up work generator")
if err := (&workgenerator.Reconciler{
Client: mgr.GetClient(),
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported) / 10)), //one work generator reconciler routine per 10 member clusters,
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/10) * math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs)/10)),
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up work generator")
return err
Expand All @@ -230,7 +230,9 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
defaultSchedulingQueue := queue.NewSimpleClusterResourcePlacementSchedulingQueue(
queue.WithName(schedulerQueueName),
)
defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr)
// we use one scheduler for every 10 concurrent placement
defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr,
int(math.Ceil(float64(opts.MaxFleetSizeSupported)/50)*math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs)/10)))
klog.Info("Starting the scheduler")
// Scheduler must run in a separate goroutine as Run() is a blocking call.
wg.Add(1)
Expand Down Expand Up @@ -284,7 +286,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
InformerManager: dynamicInformerManager,
ResourceConfig: resourceConfig,
SkippedNamespaces: skippedNamespaces,
ConcurrentClusterPlacementWorker: opts.ConcurrentClusterPlacementSyncs,
ConcurrentClusterPlacementWorker: int(math.Ceil(float64(opts.ConcurrentClusterPlacementSyncs) / 10)),
ConcurrentResourceChangeWorker: opts.ConcurrentResourceChangeSyncs,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (r *Reconciler) updateBindings(ctx context.Context, latestResourceSnapshotN
// It reconciles on the CRP when a new resource resourceBinding is created or an existing resource binding is created/updated.
func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
r.recorder = mgr.GetEventRecorderFor("rollout-controller")
return runtime.NewControllerManagedBy(mgr).Named("rollout_controller").
return runtime.NewControllerManagedBy(mgr).Named("rollout-controller").
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
Watches(&fleetv1beta1.ClusterResourceSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func extractResFromConfigMap(uConfigMap *unstructured.Unstructured) ([]fleetv1be
// It watches binding events and also update/delete events for work.
func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
r.recorder = mgr.GetEventRecorderFor("work generator")
return controllerruntime.NewControllerManagedBy(mgr).
return controllerruntime.NewControllerManagedBy(mgr).Named("work-generator").
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
For(&fleetv1beta1.ClusterResourceBinding{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&fleetv1beta1.Work{}, &handler.Funcs{
Expand Down
6 changes: 6 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,10 @@ var (
"needs_requeue",
},
)

// SchedulerActiveWorkers is a prometheus metric which holds the number of active scheduler loop.
SchedulerActiveWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "scheduling_active_workers",
Help: "Number of currently running scheduling loop",
}, []string{})
)
37 changes: 31 additions & 6 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -58,6 +59,9 @@ type Scheduler struct {
// manager is the controller manager in use by the scheduler.
manager ctrl.Manager

// workerNumber is number of scheduling loop we will run concurrently
workerNumber int

// eventRecorder is the event recorder in use by the scheduler.
eventRecorder record.EventRecorder
}
Expand All @@ -68,6 +72,7 @@ func NewScheduler(
framework framework.Framework,
queue queue.ClusterResourcePlacementSchedulingQueue,
manager ctrl.Manager,
workerNumber int,
) *Scheduler {
return &Scheduler{
name: name,
Expand All @@ -76,11 +81,13 @@ func NewScheduler(
client: manager.GetClient(),
uncachedReader: manager.GetAPIReader(),
manager: manager,
workerNumber: workerNumber,
eventRecorder: manager.GetEventRecorderFor(name),
}
}

// ScheduleOnce performs scheduling for one single item pulled from the work queue.
// it returns true if the context is not canceled, false otherwise.
func (s *Scheduler) scheduleOnce(ctx context.Context) {
// Retrieve the next item (name of a CRP) from the work queue.
//
Expand All @@ -100,6 +107,10 @@ func (s *Scheduler) scheduleOnce(ctx context.Context) {
s.queue.Done(crpName)
}()

// keep track of the number of active scheduling loop
metrics.SchedulerActiveWorkers.WithLabelValues().Add(1)
defer metrics.SchedulerActiveWorkers.WithLabelValues().Add(-1)

startTime := time.Now()
crpRef := klog.KRef("", string(crpName))
klog.V(2).InfoS("Schedule once", "clusterResourcePlacement", crpRef)
Expand Down Expand Up @@ -221,14 +232,28 @@ func (s *Scheduler) Run(ctx context.Context) {
// Starting the scheduling queue.
s.queue.Run()

// Run scheduleOnce forever.
//
// The loop starts in a dedicated goroutine; it exits when the context is canceled.
go wait.UntilWithContext(ctx, s.scheduleOnce, 0)
wg := &sync.WaitGroup{}
wg.Add(s.workerNumber)
for i := 0; i < s.workerNumber; i++ {
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
// Run scheduleOnce forever until context is cancelled
for {
select {
case <-ctx.Done():
return
default:
s.scheduleOnce(ctx)
}
}
}()
}

// Wait for the context to be canceled.
<-ctx.Done()

// The loop starts in a dedicated goroutine; it exits when the context is canceled.
wg.Wait()
// Stopping the scheduling queue; drain if necessary.
//
// Note that if a scheduling cycle is in progress; this will only return when the
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ test suites, follow the steps below:
# Use a different path if the local set up is different.
export KUBECONFIG=~/.kube/config
export OUTPUT_TYPE=type=docker
./setup.sh
./setup.sh ${number of member clusters}
```

The setup script will perform the following tasks:
Expand Down Expand Up @@ -65,5 +65,5 @@ To stop the `Kind` clusters, run the script `stop.sh`:

```sh
chmod +x ./stop.sh
./stop.sh
./stop.sh ${number of member clusters}
```
2 changes: 1 addition & 1 deletion test/scheduler/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func beforeSuiteForProcess1() []byte {

// Set up the scheduler.
fw := buildSchedulerFramework(ctrlMgr, clusterEligibilityChecker)
sched := scheduler.NewScheduler(defaultSchedulerName, fw, schedulerWorkQueue, ctrlMgr)
sched := scheduler.NewScheduler(defaultSchedulerName, fw, schedulerWorkQueue, ctrlMgr, 3)

// Run the controller manager.
go func() {
Expand Down

0 comments on commit cbc5d7c

Please sign in to comment.