Skip to content

Commit

Permalink
Wait for initial cache to sync before starting autoscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
domenicbozzuto committed Oct 22, 2024
1 parent 7d051cc commit 450d6e6
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -608,10 +609,54 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
// additional informers might have been registered in the factory during NewAutoscaler.
stop := make(chan struct{})
informerFactory.Start(stop)

klog.V(1).Info("Started shared informer factory, waiting for initial cache sync")

syncStart := time.Now()
allSynced := false
for !allSynced {
syncStatus := waitForCacheSyncWithTimeout(informerFactory, time.Second*10)
var missing []string
for t, synced := range syncStatus {
if !synced {
missing = append(missing, t.String())
}
}
if len(missing) > 0 {
klog.V(4).Infof("Still waiting to sync the following caches: %s", strings.Join(missing, ","))
} else {
allSynced = true
}
}
klog.V(1).Infof("Shared informer factory initialized, took %v", time.Since(syncStart))
return autoscaler, nil
}

// waitForCacheSyncWithTimeout waits up to the specified time for the informerFactory's caches to be synced;
// Returns before the timeout if the caches are synced sooner.
func waitForCacheSyncWithTimeout(informerFactory informers.SharedInformerFactory, timeout time.Duration) map[reflect.Type]bool {
interruptCh := make(chan struct{})
defer close(interruptCh)

doneCh := make(chan map[reflect.Type]bool)
defer close(doneCh)

go func() {
syncStatus := informerFactory.WaitForCacheSync(interruptCh)
doneCh <- syncStatus
}()

for {
select {
// WaitForCacheSync has returned; return the resulting status
case syncStatus := <-doneCh:
return syncStatus
// The timeout has expired, signal the interrupt channel and then read the result from the done channel
case <-time.After(timeout):
interruptCh <- struct{}{}
}
}
}

func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)

Expand Down

0 comments on commit 450d6e6

Please sign in to comment.