Skip to content

Commit

Permalink
Optimize spiderpool leader lease election
Browse files Browse the repository at this point in the history
Signed-off-by: ty-dc <[email protected]>
  • Loading branch information
ty-dc committed Sep 13, 2024
1 parent 9e2c46b commit 529da30
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 273 deletions.
63 changes: 43 additions & 20 deletions cmd/spiderpool-controller/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"github.com/spidernet-io/spiderpool/pkg/statefulsetmanager"
"github.com/spidernet-io/spiderpool/pkg/subnetmanager"
"github.com/spidernet-io/spiderpool/pkg/workloadendpointmanager"
"k8s.io/apimachinery/pkg/util/uuid"
leaderelection "k8s.io/client-go/tools/leaderelection"
)

// DaemonMain runs controllerContext handlers.
Expand Down Expand Up @@ -190,9 +192,6 @@ func DaemonMain() {
}
}()

logger.Info("Begin to initialize IP GC Manager")
initGCManager(controllerContext.InnerCtx)

logger.Info("Set spiderpool-controller Startup probe ready")
controllerContext.webhookClient = openapi.NewWebhookHealthCheckClient()
controllerContext.IsStartupProbe.Store(true)
Expand All @@ -202,7 +201,7 @@ func DaemonMain() {
// disturbed by an abnormal webhook.
checkWebhookReady()

setupInformers(controllerContext.ClientSet)
initSpiderControllerLeaderElect(controllerContext.InnerCtx)

monitorMetrics(controllerContext.InnerCtx)

Expand Down Expand Up @@ -235,9 +234,6 @@ func WatchSignal(sigCh chan os.Signal) {
}

func initControllerServiceManagers(ctx context.Context) {
logger.Debug("Begin to initialize spiderpool-controller leader election")
initSpiderControllerLeaderElect(ctx)

logger.Debug("Begin to initialize Node manager")
nodeManager, err := nodemanager.NewNodeManager(
controllerContext.CRDManager.GetClient(),
Expand Down Expand Up @@ -427,11 +423,16 @@ func initGCManager(ctx context.Context) {
}

func initSpiderControllerLeaderElect(ctx context.Context) {
logger.Debug("Begin to initialize spiderpool-controller leader election")
leaseDuration := time.Duration(controllerContext.Cfg.LeaseDuration) * time.Second
renewDeadline := time.Duration(controllerContext.Cfg.LeaseRenewDeadline) * time.Second
leaseRetryPeriod := time.Duration(controllerContext.Cfg.LeaseRetryPeriod) * time.Second
leaderRetryElectGap := time.Duration(controllerContext.Cfg.LeaseRetryGap) * time.Second

// add an uniquifier so that two processes on the same host don't accidentally both become active
identityID := controllerContext.Cfg.ControllerPodName + "_" + string(uuid.NewUUID())

// Initialize a new lease elector with provided configuration
leaderElector, err := election.NewLeaseElector(
controllerContext.Cfg.ControllerPodNamespace,
constant.SpiderControllerElectorLockName,
Expand All @@ -441,15 +442,38 @@ func initSpiderControllerLeaderElect(ctx context.Context) {
&leaseRetryPeriod,
&leaderRetryElectGap,
)
if nil != err {
logger.Fatal(err.Error())
if err != nil {
logger.Sugar().Fatalf("Failed to create lease elector: %w", err.Error())
}

err = leaderElector.Run(ctx, controllerContext.ClientSet)
if nil != err {
logger.Fatal(err.Error())
// Define leader election callbacks
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
logger.Info("Acquired leadership, starting to set up the Leader informer")
setupInformers(controllerContext.ClientSet)

logger.Info("Begin to initialize IP GC Manager")
controllerContext.Leader = leaderElector
initGCManager(controllerContext.InnerCtx)
},
OnStoppedLeading: func() {
logger.Warn("Lost leadership, stopping controller")
os.Exit(0)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == identityID {
// I just got the lock
return
}
logger.Sugar().Infof("New leader elected: %s", identity)
controllerContext.Leader = leaderElector
},
}

if err := leaderElector.Run(ctx, controllerContext.ClientSet, callbacks); err != nil {
logger.Sugar().Fatalf("Leader election failed: %w", err.Error())
}
controllerContext.Leader = leaderElector
}

// initK8sClientSet will new kubernetes Clientset
Expand Down Expand Up @@ -492,7 +516,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
DefaultCniConfDir: controllerContext.Cfg.DefaultCniConfDir,
CiliumConfigMap: controllerContext.Cfg.CiliumConfigName,
DefaultCoordinatorName: controllerContext.Cfg.DefaultCoordinatorName,
}).SetupInformer(controllerContext.InnerCtx, crdClient, k8sClient, controllerContext.Leader); err != nil {
}).SetupInformer(controllerContext.InnerCtx, crdClient, k8sClient); err != nil {
logger.Fatal(err.Error())
}
}
Expand All @@ -512,7 +536,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
controllerContext.CRDManager.GetClient(),
controllerContext.DynamicClient,
)
err = ipPoolController.SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader)
err = ipPoolController.SetupInformer(controllerContext.InnerCtx, crdClient)
if nil != err {
logger.Fatal(err.Error())
}
Expand All @@ -527,7 +551,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
SubnetControllerWorkers: controllerContext.Cfg.SubnetInformerWorkers,
MaxWorkqueueLength: controllerContext.Cfg.SubnetInformerMaxWorkqueueLength,
DynamicClient: controllerContext.DynamicClient,
}).SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader); err != nil {
}).SetupInformer(controllerContext.InnerCtx, crdClient); err != nil {
logger.Fatal(err.Error())
}

Expand All @@ -550,7 +574,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
logger.Fatal(err.Error())
}

err = subnetAppController.SetupInformer(controllerContext.InnerCtx, controllerContext.ClientSet, controllerContext.Leader)
err = subnetAppController.SetupInformer(controllerContext.InnerCtx, controllerContext.ClientSet)
if nil != err {
logger.Fatal(err.Error())
}
Expand All @@ -570,7 +594,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
ResyncPeriod: time.Duration(controllerContext.Cfg.MultusConfigInformerResyncPeriod) * time.Second,
},
controllerContext.CRDManager.GetClient())
err = multusConfigController.SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader)
err = multusConfigController.SetupInformer(controllerContext.InnerCtx, crdClient)
if nil != err {
logger.Fatal(err.Error())
}
Expand All @@ -581,8 +605,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0 /* resync period */)
if err = dracontroller.StartController(controllerContext.InnerCtx,
time.Duration(controllerContext.Cfg.LeaseRetryGap)*time.Second,
crdClient, k8sClient, informerFactory,
controllerContext.Leader); err != nil {
crdClient, k8sClient, informerFactory); err != nil {
logger.Fatal(err.Error())
}
} else {
Expand Down
32 changes: 2 additions & 30 deletions pkg/applicationcontroller/app_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/spidernet-io/spiderpool/pkg/applicationcontroller/applicationinformers"
"github.com/spidernet-io/spiderpool/pkg/constant"
"github.com/spidernet-io/spiderpool/pkg/election"
spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1"
"github.com/spidernet-io/spiderpool/pkg/logutils"
"github.com/spidernet-io/spiderpool/pkg/subnetmanager"
Expand Down Expand Up @@ -98,11 +97,7 @@ func NewSubnetAppController(client client.Client, apiReader client.Reader, subne
return c, nil
}

func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubernetes.Interface, leader election.SpiderLeaseElector) error {
if leader == nil {
return fmt.Errorf("failed to start SpiderSubnet App informer, controller leader must be specified")
}

func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubernetes.Interface) error {
logger.Info("try to register SpiderSubnet App informer")
go func() {
for {
Expand All @@ -111,30 +106,7 @@ func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubern
return
default:
}

if !leader.IsElected() {
time.Sleep(sac.LeaderRetryElectGap)
continue
}

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
logger.Warn("Leader lost, stop Subnet App informer")
innerCancel()
return
}
time.Sleep(sac.LeaderRetryElectGap)
}
}()

innerCtx, _ := context.WithCancel(ctx)

Check failure on line 109 in pkg/applicationcontroller/app_controller.go

View workflow job for this annotation

GitHub Actions / lint-golang

lostcancel: the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak (govet)

Check failure on line 109 in pkg/applicationcontroller/app_controller.go

View workflow job for this annotation

GitHub Actions / lint-golang

the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak
logger.Info("create SpiderSubnet App informer")
factory := kubeinformers.NewSharedInformerFactory(client, 0)
err := sac.addEventHandlers(factory)
Expand Down
29 changes: 1 addition & 28 deletions pkg/coordinatormanager/coordinator_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/spidernet-io/spiderpool/pkg/constant"
"github.com/spidernet-io/spiderpool/pkg/election"
"github.com/spidernet-io/spiderpool/pkg/event"
spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1"
clientset "github.com/spidernet-io/spiderpool/pkg/k8s/client/clientset/versioned"
Expand Down Expand Up @@ -113,17 +112,13 @@ func (cc *CoordinatorController) SetupInformer(
ctx context.Context,
spiderClientset clientset.Interface,
k8sClientset *kubernetes.Clientset,
leader election.SpiderLeaseElector,
) error {
if spiderClientset == nil {
return fmt.Errorf("spiderpoolv2beta1 clientset %w", constant.ErrMissingRequiredParam)
}
if k8sClientset == nil {
return fmt.Errorf("kubernetes clientset %w", constant.ErrMissingRequiredParam)
}
if leader == nil {
return fmt.Errorf("controller leader %w", constant.ErrMissingRequiredParam)
}

InformerLogger = logutils.Logger.Named("Coordinator-Informer")

Expand All @@ -134,29 +129,8 @@ func (cc *CoordinatorController) SetupInformer(
return
default:
}

if !leader.IsElected() {
time.Sleep(cc.LeaderRetryElectGap)
continue
}

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
InformerLogger.Warn("Leader lost, stop Coordinator informer")
innerCancel()
return
}
time.Sleep(cc.LeaderRetryElectGap)
}
}()
defer innerCancel()

cc.Workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), constant.KindSpiderCoordinator)

Expand Down Expand Up @@ -187,7 +161,6 @@ func (cc *CoordinatorController) SetupInformer(
InformerLogger.Info("Coordinator informer down")
}
}()

return nil
}

Expand Down
27 changes: 2 additions & 25 deletions pkg/dra/dra-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/spidernet-io/spiderpool/pkg/constant"
"github.com/spidernet-io/spiderpool/pkg/election"
clientset "github.com/spidernet-io/spiderpool/pkg/k8s/client/clientset/versioned"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -19,8 +18,7 @@ func StartController(ctx context.Context,
leaderRetryElectGap time.Duration,
spiderClientset clientset.Interface,
kubeClient kubernetes.Interface,
informerFactory informers.SharedInformerFactory,
leader election.SpiderLeaseElector) error {
informerFactory informers.SharedInformerFactory) error {

driver := NewDriver(spiderClientset)
controller := controller.New(ctx, constant.DRADriverName, driver, kubeClient, informerFactory)
Expand All @@ -32,29 +30,8 @@ func StartController(ctx context.Context,
return
default:
}

if !leader.IsElected() {
time.Sleep(leaderRetryElectGap)
continue
}

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
innerCancel()
return
}
time.Sleep(leaderRetryElectGap)
}
}()

defer innerCancel()
informerFactory.Start(innerCtx.Done())
controller.Run(1)
}
Expand Down
Loading

0 comments on commit 529da30

Please sign in to comment.