Skip to content

Commit

Permalink
Optimize spiderpool leader lease election
Browse files Browse the repository at this point in the history
Signed-off-by: ty-dc <[email protected]>
  • Loading branch information
ty-dc committed Sep 12, 2024
1 parent 9e2c46b commit 88c5c96
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 261 deletions.
36 changes: 29 additions & 7 deletions cmd/spiderpool-controller/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/spidernet-io/spiderpool/pkg/statefulsetmanager"
"github.com/spidernet-io/spiderpool/pkg/subnetmanager"
"github.com/spidernet-io/spiderpool/pkg/workloadendpointmanager"
leaderelection "k8s.io/client-go/tools/leaderelection"
)

// DaemonMain runs controllerContext handlers.
Expand Down Expand Up @@ -202,8 +203,6 @@ func DaemonMain() {
// disturbed by an abnormal webhook.
checkWebhookReady()

setupInformers(controllerContext.ClientSet)

monitorMetrics(controllerContext.InnerCtx)

sigCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -427,11 +426,13 @@ func initGCManager(ctx context.Context) {
}

func initSpiderControllerLeaderElect(ctx context.Context) {
// Extract configuration values
leaseDuration := time.Duration(controllerContext.Cfg.LeaseDuration) * time.Second
renewDeadline := time.Duration(controllerContext.Cfg.LeaseRenewDeadline) * time.Second
leaseRetryPeriod := time.Duration(controllerContext.Cfg.LeaseRetryPeriod) * time.Second
leaderRetryElectGap := time.Duration(controllerContext.Cfg.LeaseRetryGap) * time.Second

// Initialize a new lease elector with provided configuration
leaderElector, err := election.NewLeaseElector(
controllerContext.Cfg.ControllerPodNamespace,
constant.SpiderControllerElectorLockName,
Expand All @@ -441,14 +442,35 @@ func initSpiderControllerLeaderElect(ctx context.Context) {
&leaseRetryPeriod,
&leaderRetryElectGap,
)
if nil != err {
logger.Fatal(err.Error())
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to create lease elector: %v", err))
}

err = leaderElector.Run(ctx, controllerContext.ClientSet)
if nil != err {
logger.Fatal(err.Error())
// Define leader election callbacks
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
logger.Info("Acquired leadership, starting to set up the Leader informer")
setupInformers(controllerContext.ClientSet)
},
OnStoppedLeading: func() {
logger.Warn("Lost leadership, stopping controller")
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == controllerContext.Leader.GetLeader() {
logger.Sugar().Infof("New leader elected: %s", identity)
}
},
}

// Run the leader elector with provided context and callbacks
go func() {
if err := leaderElector.Run(ctx, controllerContext.ClientSet, callbacks); err != nil {
logger.Fatal(fmt.Sprintf("Leader election failed: %v", err))
}
}()

// Set the leaderElector object in the controller context
controllerContext.Leader = leaderElector
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/spiderpool-controller/cmd/runtime_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func (g *_httpGetControllerReadiness) Handle(params runtime.GetRuntimeReadinessP
return runtime.NewGetRuntimeReadinessInternalServerError()
}

if len(g.Leader.GetLeader()) == 0 {
logger.Warn("failed to check spiderpool-controller readiness probe: there's no leader in the current cluster, please wait for a while")
return runtime.NewGetRuntimeReadinessInternalServerError()
}
// if len(g.Leader.GetLeader()) == 0 {
// logger.Warn("failed to check spiderpool-controller readiness probe: there's no leader in the current cluster, please wait for a while")
// return runtime.NewGetRuntimeReadinessInternalServerError()
// }

if g.Leader.IsElected() {
if gcIPConfig.EnableGCIP && !g.GCManager.Health() {
Expand Down
25 changes: 1 addition & 24 deletions pkg/applicationcontroller/app_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,30 +111,7 @@ func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubern
return
default:
}

if !leader.IsElected() {
time.Sleep(sac.LeaderRetryElectGap)
continue
}

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
logger.Warn("Leader lost, stop Subnet App informer")
innerCancel()
return
}
time.Sleep(sac.LeaderRetryElectGap)
}
}()

innerCtx, _ := context.WithCancel(ctx)

Check failure on line 114 in pkg/applicationcontroller/app_controller.go

View workflow job for this annotation

GitHub Actions / lint-golang

lostcancel: the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak (govet)
logger.Info("create SpiderSubnet App informer")
factory := kubeinformers.NewSharedInformerFactory(client, 0)
err := sac.addEventHandlers(factory)
Expand Down
81 changes: 25 additions & 56 deletions pkg/coordinatormanager/coordinator_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,66 +127,35 @@ func (cc *CoordinatorController) SetupInformer(

InformerLogger = logutils.Logger.Named("Coordinator-Informer")

go func() {
for {
select {
case <-ctx.Done():
return
default:
}
innerCtx, innerCancel := context.WithCancel(ctx)

Check failure on line 130 in pkg/coordinatormanager/coordinator_informer.go

View workflow job for this annotation

GitHub Actions / lint-golang

lostcancel: the innerCancel function is not used on all paths (possible context leak) (govet)

if !leader.IsElected() {
time.Sleep(cc.LeaderRetryElectGap)
continue
}
cc.Workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), constant.KindSpiderCoordinator)

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
InformerLogger.Warn("Leader lost, stop Coordinator informer")
innerCancel()
return
}
time.Sleep(cc.LeaderRetryElectGap)
}
}()

cc.Workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), constant.KindSpiderCoordinator)

if err := cc.StartWatchPodCIDR(innerCtx, InformerLogger); err != nil {
InformerLogger.Error(err.Error())
continue
}
if err := cc.StartWatchPodCIDR(innerCtx, InformerLogger); err != nil {
InformerLogger.Error(err.Error())
return err

Check failure on line 136 in pkg/coordinatormanager/coordinator_informer.go

View workflow job for this annotation

GitHub Actions / lint-golang

lostcancel: this return statement may be reached without using the innerCancel var defined on line 130 (govet)
}

InformerLogger.Info("Initialize Coordinator informer")
k8sInformerFactory := informers.NewSharedInformerFactory(k8sClientset, cc.ResyncPeriod)
spiderInformerFactory := externalversions.NewSharedInformerFactory(spiderClientset, cc.ResyncPeriod)
err := cc.addEventHandlers(
spiderInformerFactory.Spiderpool().V2beta1().SpiderCoordinators(),
k8sInformerFactory.Core().V1().ConfigMaps(),
k8sInformerFactory.Networking().V1alpha1().ServiceCIDRs(),
)
if err != nil {
InformerLogger.Error(err.Error())
continue
}
InformerLogger.Info("Initialize Coordinator informer")
k8sInformerFactory := informers.NewSharedInformerFactory(k8sClientset, cc.ResyncPeriod)
spiderInformerFactory := externalversions.NewSharedInformerFactory(spiderClientset, cc.ResyncPeriod)
err := cc.addEventHandlers(
spiderInformerFactory.Spiderpool().V2beta1().SpiderCoordinators(),
k8sInformerFactory.Core().V1().ConfigMaps(),
k8sInformerFactory.Networking().V1alpha1().ServiceCIDRs(),
)
if err != nil {
InformerLogger.Error(err.Error())
return err
}

k8sInformerFactory.Start(innerCtx.Done())
spiderInformerFactory.Start(innerCtx.Done())
if err := cc.run(logutils.IntoContext(innerCtx, InformerLogger), 1); err != nil {
InformerLogger.Sugar().Errorf("failed to run Coordinator informer: %v", err)
innerCancel()
}
InformerLogger.Info("Coordinator informer down")
}
}()
k8sInformerFactory.Start(innerCtx.Done())
spiderInformerFactory.Start(innerCtx.Done())
if err := cc.run(logutils.IntoContext(innerCtx, InformerLogger), 1); err != nil {
InformerLogger.Sugar().Errorf("failed to run Coordinator informer: %v", err)
innerCancel()
}
InformerLogger.Info("Coordinator informer down")

return nil
}
Expand Down
24 changes: 1 addition & 23 deletions pkg/dra/dra-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,7 @@ func StartController(ctx context.Context,
return
default:
}

if !leader.IsElected() {
time.Sleep(leaderRetryElectGap)
continue
}

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
innerCancel()
return
}
time.Sleep(leaderRetryElectGap)
}
}()

innerCtx, _ := context.WithCancel(ctx)

Check failure on line 35 in pkg/dra/dra-controller/controller.go

View workflow job for this annotation

GitHub Actions / lint-golang

lostcancel: the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak (govet)
informerFactory.Start(innerCtx.Done())
controller.Run(1)
}
Expand Down
Loading

0 comments on commit 88c5c96

Please sign in to comment.