Skip to content

Commit

Permalink
Optimize spiderpool leader lease election
Browse files Browse the repository at this point in the history
Signed-off-by: ty-dc <[email protected]>
  • Loading branch information
ty-dc committed Sep 18, 2024
1 parent d45c09b commit 46fc6d8
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 163 deletions.
24 changes: 12 additions & 12 deletions pkg/applicationcontroller/app_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,11 @@ func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubern
select {
case <-ctx.Done():
return
default:
}

if !leader.IsElected() {
time.Sleep(sac.LeaderRetryElectGap)
continue
case isLeader := <-leader.IsElected():
if !isLeader {
time.Sleep(sac.LeaderRetryElectGap)
continue
}
}

innerCtx, innerCancel := context.WithCancel(ctx)
Expand All @@ -123,12 +122,13 @@ func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubern
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
logger.Warn("Leader lost, stop Subnet App informer")
innerCancel()
case isLeader := <-leader.IsElected():
if !isLeader {
logger.Warn("Leader lost, stop SpiderSubnet informer")
innerCancel()
return
}
case <-ctx.Done():
return
}
time.Sleep(sac.LeaderRetryElectGap)
Expand Down
24 changes: 12 additions & 12 deletions pkg/coordinatormanager/coordinator_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,11 @@ func (cc *CoordinatorController) SetupInformer(
select {
case <-ctx.Done():
return
default:
}

if !leader.IsElected() {
time.Sleep(cc.LeaderRetryElectGap)
continue
case isLeader := <-leader.IsElected():
if !isLeader {
time.Sleep(cc.LeaderRetryElectGap)
continue
}
}

innerCtx, innerCancel := context.WithCancel(ctx)
Expand All @@ -146,12 +145,13 @@ func (cc *CoordinatorController) SetupInformer(
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
InformerLogger.Warn("Leader lost, stop Coordinator informer")
innerCancel()
case isLeader := <-leader.IsElected():
if !isLeader {
InformerLogger.Warn("Leader lost, stop Coordinator informer")
innerCancel()
return
}
case <-ctx.Done():
return
}
time.Sleep(cc.LeaderRetryElectGap)
Expand Down
23 changes: 12 additions & 11 deletions pkg/dra/dra-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ func StartController(ctx context.Context,
select {
case <-ctx.Done():
return
default:
}

if !leader.IsElected() {
time.Sleep(leaderRetryElectGap)
continue
case isLeader := <-leader.IsElected():
if !isLeader {
time.Sleep(leaderRetryElectGap)
continue
}
}

innerCtx, innerCancel := context.WithCancel(ctx)
Expand All @@ -44,11 +43,12 @@ func StartController(ctx context.Context,
select {
case <-innerCtx.Done():
return
default:
}

if !leader.IsElected() {
innerCancel()
case isLeader := <-leader.IsElected():
if !isLeader {
innerCancel()
return
}
case <-ctx.Done():
return
}
time.Sleep(leaderRetryElectGap)
Expand All @@ -58,6 +58,7 @@ func StartController(ctx context.Context,
informerFactory.Start(innerCtx.Done())
controller.Run(1)
}

}()
return nil
}
12 changes: 6 additions & 6 deletions pkg/election/lease_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var logger *zap.Logger
type SpiderLeaseElector interface {
Run(ctx context.Context, clientSet kubernetes.Interface) error
// IsElected returns a boolean value to check current Elector whether is a leader
IsElected() bool
IsElected() chan bool
GetLeader() string
}

Expand All @@ -40,7 +40,7 @@ type SpiderLeader struct {
leaseRetryPeriod time.Duration
leaderRetryElectGap time.Duration

isLeader bool
isLeader chan bool
leaderElector *leaderelection.LeaderElector
}

Expand Down Expand Up @@ -81,7 +81,7 @@ func NewLeaseElector(leaseLockNS, leaseLockName, leaseLockIdentity string,
}

sl := &SpiderLeader{
isLeader: false,
isLeader: make(chan bool, 1),
leaseLockName: leaseLockName,
leaseLockNamespace: leaseLockNS,
leaseLockIdentity: leaseLockIdentity,
Expand Down Expand Up @@ -128,14 +128,14 @@ func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
sl.Lock()
sl.isLeader = true
sl.isLeader <- true
sl.Unlock()
logger.Sugar().Infof("leader elected: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity)
},
OnStoppedLeading: func() {
// we can do cleanup here
sl.Lock()
sl.isLeader = false
sl.isLeader <- false
sl.Unlock()
logger.Sugar().Warnf("leader lost: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity)
},
Expand All @@ -150,7 +150,7 @@ func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error {
return nil
}

func (sl *SpiderLeader) IsElected() bool {
func (sl *SpiderLeader) IsElected() chan bool {
sl.RLock()
defer sl.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/election/mock/lease_election_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions pkg/gcmanager/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,24 @@ func (s *SpiderGC) Health() bool {
ctx, cancelFunc := context.WithTimeout(context.TODO(), waitForCacheSyncTimeout)
defer cancelFunc()

if s.leader.IsElected() {
if s.informerFactory == nil {
logger.Warn("the IP-GC manager pod informer is not ready")
return false
select {
case isLeader := <-s.leader.IsElected():
if !isLeader {
return true
}
case <-ctx.Done():
return false
}

waitForCacheSync := s.informerFactory.WaitForCacheSync(ctx.Done())
for _, isCacheSync := range waitForCacheSync {
if !isCacheSync {
return false
}
if s.informerFactory == nil {
logger.Warn("the IP-GC manager pod informer is not ready")
return false
}

waitForCacheSync := s.informerFactory.WaitForCacheSync(ctx.Done())
for _, isCacheSync := range waitForCacheSync {
if !isCacheSync {
return false
}
}

Expand Down
124 changes: 67 additions & 57 deletions pkg/gcmanager/pod_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,69 +20,73 @@ func (s *SpiderGC) startPodInformer(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
}

if !s.leader.IsElected() {
time.Sleep(s.gcConfig.LeaderRetryElectGap)
continue
}

innerCtx, innerCancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-innerCtx.Done():
return
default:
}
case isLeader := <-s.leader.IsElected():
// Proceed only if this pod is the leader
if !isLeader {
logger.Warn("Not the leader, waiting for leadership.")
time.Sleep(s.gcConfig.LeaderRetryElectGap)
continue
}

if !s.leader.IsElected() {
logger.Warn("Leader lost, stop IP GC pod informer")
innerCancel()
return
innerCtx, innerCancel := context.WithCancel(ctx)
defer innerCancel()

go func() {
for {
select {
case <-innerCtx.Done():
return
case isLeader := <-s.leader.IsElected():
if !isLeader {
logger.Warn("Leader lost, stopping IP GC pod informer.")
innerCancel()
return
}
}
time.Sleep(s.gcConfig.LeaderRetryElectGap)
}
time.Sleep(s.gcConfig.LeaderRetryElectGap)
}()

logger.Info("Create Pod informer")
informerFactory := informers.NewSharedInformerFactory(s.k8ClientSet, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onPodAdd,
UpdateFunc: s.onPodUpdate,
DeleteFunc: s.onPodDel,
})
if err != nil {
logger.Error(err.Error())
innerCancel()
continue
}
}()

logger.Info("create Pod informer")
informerFactory := informers.NewSharedInformerFactory(s.k8ClientSet, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onPodAdd,
UpdateFunc: s.onPodUpdate,
DeleteFunc: s.onPodDel,
})
if nil != err {
logger.Error(err.Error())
innerCancel()
continue
}
s.informerFactory = informerFactory
informerFactory.Start(innerCtx.Done())

// Let the leader trigger IP GC scan all.
// When the spiderpool-controller restarted, it will trigger IP GC scan all first.
// If the pod informer not starts and the user delete some pods, this will lead to IP leakage.
logger.Debug("try to trigger scan all with leader elected")
cacheSync := cache.WaitForCacheSync(innerCtx.Done())
if !cacheSync {
innerCancel()
continue
}
s.gcSignal <- struct{}{}
s.informerFactory = informerFactory
informerFactory.Start(innerCtx.Done())

logger.Debug("Triggering scan all with leader elected")
cacheSync := cache.WaitForCacheSync(innerCtx.Done())
if !cacheSync {
innerCancel()
continue
}
// Notify the system to trigger a GC scan
s.gcSignal <- struct{}{}

<-innerCtx.Done()
logger.Error("k8s pod informer broken")
// Wait for informer context to be done
<-innerCtx.Done()
logger.Error("K8s pod informer broken, restarting process")
}
}
}

// onPodAdd represents Pod informer Add Event
func (s *SpiderGC) onPodAdd(obj interface{}) {
// backup controller could be elected as master
if !s.leader.IsElected() {
return
select {
case isLeader := <-s.leader.IsElected():
if !isLeader {
return
}
}

pod := obj.(*corev1.Pod)
Expand All @@ -104,8 +108,11 @@ func (s *SpiderGC) onPodAdd(obj interface{}) {
// onPodUpdate represents Pod informer Update Event
func (s *SpiderGC) onPodUpdate(oldObj interface{}, newObj interface{}) {
// backup controller could be elected as master
if !s.leader.IsElected() {
return
select {
case isLeader := <-s.leader.IsElected():
if !isLeader {
return
}
}

oldPod := oldObj.(*corev1.Pod)
Expand All @@ -128,8 +135,11 @@ func (s *SpiderGC) onPodUpdate(oldObj interface{}, newObj interface{}) {
// onPodDel represents Pod informer Delete Event
func (s *SpiderGC) onPodDel(obj interface{}) {
// backup controller could be elected as master
if !s.leader.IsElected() {
return
select {
case isLeader := <-s.leader.IsElected():
if !isLeader {
return
}
}

pod := obj.(*corev1.Pod)
Expand Down
Loading

0 comments on commit 46fc6d8

Please sign in to comment.