From 529da304996aa29fae48556bdfdffce89eaac24c Mon Sep 17 00:00:00 2001 From: ty-dc Date: Wed, 11 Sep 2024 16:12:03 +0800 Subject: [PATCH] Optimize spiderpool leader lease election Signed-off-by: ty-dc --- cmd/spiderpool-controller/cmd/daemon.go | 63 +++++++++----- pkg/applicationcontroller/app_controller.go | 32 +------ .../coordinator_informer.go | 29 +------ pkg/dra/dra-controller/controller.go | 27 +----- pkg/election/lease_election.go | 84 +++++++------------ pkg/election/lease_election_test.go | 3 +- pkg/election/mock/lease_election_mock.go | 9 +- pkg/gcmanager/gc_manager.go | 5 +- pkg/gcmanager/pod_informer.go | 26 +----- pkg/ippoolmanager/ippool_informer.go | 36 ++------ pkg/multuscniconfig/multusconfig_informer.go | 29 +------ pkg/subnetmanager/subnet_informer.go | 30 +------ pkg/subnetmanager/subnet_informer_test.go | 2 +- 13 files changed, 102 insertions(+), 273 deletions(-) diff --git a/cmd/spiderpool-controller/cmd/daemon.go b/cmd/spiderpool-controller/cmd/daemon.go index 88023c0565..5ee8f0aa2e 100644 --- a/cmd/spiderpool-controller/cmd/daemon.go +++ b/cmd/spiderpool-controller/cmd/daemon.go @@ -43,6 +43,8 @@ import ( "github.com/spidernet-io/spiderpool/pkg/statefulsetmanager" "github.com/spidernet-io/spiderpool/pkg/subnetmanager" "github.com/spidernet-io/spiderpool/pkg/workloadendpointmanager" + "k8s.io/apimachinery/pkg/util/uuid" + leaderelection "k8s.io/client-go/tools/leaderelection" ) // DaemonMain runs controllerContext handlers. @@ -190,9 +192,6 @@ func DaemonMain() { } }() - logger.Info("Begin to initialize IP GC Manager") - initGCManager(controllerContext.InnerCtx) - logger.Info("Set spiderpool-controller Startup probe ready") controllerContext.webhookClient = openapi.NewWebhookHealthCheckClient() controllerContext.IsStartupProbe.Store(true) @@ -202,7 +201,7 @@ func DaemonMain() { // disturbed by an abnormal webhook. checkWebhookReady() - setupInformers(controllerContext.ClientSet) + initSpiderControllerLeaderElect(controllerContext.InnerCtx) monitorMetrics(controllerContext.InnerCtx) @@ -235,9 +234,6 @@ func WatchSignal(sigCh chan os.Signal) { } func initControllerServiceManagers(ctx context.Context) { - logger.Debug("Begin to initialize spiderpool-controller leader election") - initSpiderControllerLeaderElect(ctx) - logger.Debug("Begin to initialize Node manager") nodeManager, err := nodemanager.NewNodeManager( controllerContext.CRDManager.GetClient(), @@ -427,11 +423,16 @@ func initGCManager(ctx context.Context) { } func initSpiderControllerLeaderElect(ctx context.Context) { + logger.Debug("Begin to initialize spiderpool-controller leader election") leaseDuration := time.Duration(controllerContext.Cfg.LeaseDuration) * time.Second renewDeadline := time.Duration(controllerContext.Cfg.LeaseRenewDeadline) * time.Second leaseRetryPeriod := time.Duration(controllerContext.Cfg.LeaseRetryPeriod) * time.Second leaderRetryElectGap := time.Duration(controllerContext.Cfg.LeaseRetryGap) * time.Second + // add an uniquifier so that two processes on the same host don't accidentally both become active + identityID := controllerContext.Cfg.ControllerPodName + "_" + string(uuid.NewUUID()) + + // Initialize a new lease elector with provided configuration leaderElector, err := election.NewLeaseElector( controllerContext.Cfg.ControllerPodNamespace, constant.SpiderControllerElectorLockName, @@ -441,15 +442,38 @@ func initSpiderControllerLeaderElect(ctx context.Context) { &leaseRetryPeriod, &leaderRetryElectGap, ) - if nil != err { - logger.Fatal(err.Error()) + if err != nil { + logger.Sugar().Fatalf("Failed to create lease elector: %w", err.Error()) } - err = leaderElector.Run(ctx, controllerContext.ClientSet) - if nil != err { - logger.Fatal(err.Error()) + // Define leader election callbacks + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + logger.Info("Acquired leadership, starting to set up the Leader informer") + setupInformers(controllerContext.ClientSet) + + logger.Info("Begin to initialize IP GC Manager") + controllerContext.Leader = leaderElector + initGCManager(controllerContext.InnerCtx) + }, + OnStoppedLeading: func() { + logger.Warn("Lost leadership, stopping controller") + os.Exit(0) + }, + OnNewLeader: func(identity string) { + // we're notified when new leader elected + if identity == identityID { + // I just got the lock + return + } + logger.Sugar().Infof("New leader elected: %s", identity) + controllerContext.Leader = leaderElector + }, + } + + if err := leaderElector.Run(ctx, controllerContext.ClientSet, callbacks); err != nil { + logger.Sugar().Fatalf("Leader election failed: %w", err.Error()) } - controllerContext.Leader = leaderElector } // initK8sClientSet will new kubernetes Clientset @@ -492,7 +516,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { DefaultCniConfDir: controllerContext.Cfg.DefaultCniConfDir, CiliumConfigMap: controllerContext.Cfg.CiliumConfigName, DefaultCoordinatorName: controllerContext.Cfg.DefaultCoordinatorName, - }).SetupInformer(controllerContext.InnerCtx, crdClient, k8sClient, controllerContext.Leader); err != nil { + }).SetupInformer(controllerContext.InnerCtx, crdClient, k8sClient); err != nil { logger.Fatal(err.Error()) } } @@ -512,7 +536,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { controllerContext.CRDManager.GetClient(), controllerContext.DynamicClient, ) - err = ipPoolController.SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader) + err = ipPoolController.SetupInformer(controllerContext.InnerCtx, crdClient) if nil != err { logger.Fatal(err.Error()) } @@ -527,7 +551,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { SubnetControllerWorkers: controllerContext.Cfg.SubnetInformerWorkers, MaxWorkqueueLength: controllerContext.Cfg.SubnetInformerMaxWorkqueueLength, DynamicClient: controllerContext.DynamicClient, - }).SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader); err != nil { + }).SetupInformer(controllerContext.InnerCtx, crdClient); err != nil { logger.Fatal(err.Error()) } @@ -550,7 +574,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { logger.Fatal(err.Error()) } - err = subnetAppController.SetupInformer(controllerContext.InnerCtx, controllerContext.ClientSet, controllerContext.Leader) + err = subnetAppController.SetupInformer(controllerContext.InnerCtx, controllerContext.ClientSet) if nil != err { logger.Fatal(err.Error()) } @@ -570,7 +594,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { ResyncPeriod: time.Duration(controllerContext.Cfg.MultusConfigInformerResyncPeriod) * time.Second, }, controllerContext.CRDManager.GetClient()) - err = multusConfigController.SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader) + err = multusConfigController.SetupInformer(controllerContext.InnerCtx, crdClient) if nil != err { logger.Fatal(err.Error()) } @@ -581,8 +605,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { informerFactory := informers.NewSharedInformerFactory(k8sClient, 0 /* resync period */) if err = dracontroller.StartController(controllerContext.InnerCtx, time.Duration(controllerContext.Cfg.LeaseRetryGap)*time.Second, - crdClient, k8sClient, informerFactory, - controllerContext.Leader); err != nil { + crdClient, k8sClient, informerFactory); err != nil { logger.Fatal(err.Error()) } } else { diff --git a/pkg/applicationcontroller/app_controller.go b/pkg/applicationcontroller/app_controller.go index 40ec2c8e92..ca6bf1b583 100644 --- a/pkg/applicationcontroller/app_controller.go +++ b/pkg/applicationcontroller/app_controller.go @@ -31,7 +31,6 @@ import ( "github.com/spidernet-io/spiderpool/pkg/applicationcontroller/applicationinformers" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/election" spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1" "github.com/spidernet-io/spiderpool/pkg/logutils" "github.com/spidernet-io/spiderpool/pkg/subnetmanager" @@ -98,11 +97,7 @@ func NewSubnetAppController(client client.Client, apiReader client.Reader, subne return c, nil } -func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubernetes.Interface, leader election.SpiderLeaseElector) error { - if leader == nil { - return fmt.Errorf("failed to start SpiderSubnet App informer, controller leader must be specified") - } - +func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubernetes.Interface) error { logger.Info("try to register SpiderSubnet App informer") go func() { for { @@ -111,30 +106,7 @@ func (sac *SubnetAppController) SetupInformer(ctx context.Context, client kubern return default: } - - if !leader.IsElected() { - time.Sleep(sac.LeaderRetryElectGap) - continue - } - - innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !leader.IsElected() { - logger.Warn("Leader lost, stop Subnet App informer") - innerCancel() - return - } - time.Sleep(sac.LeaderRetryElectGap) - } - }() - + innerCtx, _ := context.WithCancel(ctx) logger.Info("create SpiderSubnet App informer") factory := kubeinformers.NewSharedInformerFactory(client, 0) err := sac.addEventHandlers(factory) diff --git a/pkg/coordinatormanager/coordinator_informer.go b/pkg/coordinatormanager/coordinator_informer.go index 0f6da33d98..10e8a563c6 100644 --- a/pkg/coordinatormanager/coordinator_informer.go +++ b/pkg/coordinatormanager/coordinator_informer.go @@ -40,7 +40,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/election" "github.com/spidernet-io/spiderpool/pkg/event" spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1" clientset "github.com/spidernet-io/spiderpool/pkg/k8s/client/clientset/versioned" @@ -113,7 +112,6 @@ func (cc *CoordinatorController) SetupInformer( ctx context.Context, spiderClientset clientset.Interface, k8sClientset *kubernetes.Clientset, - leader election.SpiderLeaseElector, ) error { if spiderClientset == nil { return fmt.Errorf("spiderpoolv2beta1 clientset %w", constant.ErrMissingRequiredParam) @@ -121,9 +119,6 @@ func (cc *CoordinatorController) SetupInformer( if k8sClientset == nil { return fmt.Errorf("kubernetes clientset %w", constant.ErrMissingRequiredParam) } - if leader == nil { - return fmt.Errorf("controller leader %w", constant.ErrMissingRequiredParam) - } InformerLogger = logutils.Logger.Named("Coordinator-Informer") @@ -134,29 +129,8 @@ func (cc *CoordinatorController) SetupInformer( return default: } - - if !leader.IsElected() { - time.Sleep(cc.LeaderRetryElectGap) - continue - } - innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !leader.IsElected() { - InformerLogger.Warn("Leader lost, stop Coordinator informer") - innerCancel() - return - } - time.Sleep(cc.LeaderRetryElectGap) - } - }() + defer innerCancel() cc.Workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), constant.KindSpiderCoordinator) @@ -187,7 +161,6 @@ func (cc *CoordinatorController) SetupInformer( InformerLogger.Info("Coordinator informer down") } }() - return nil } diff --git a/pkg/dra/dra-controller/controller.go b/pkg/dra/dra-controller/controller.go index cce59007e6..aa38640af9 100644 --- a/pkg/dra/dra-controller/controller.go +++ b/pkg/dra/dra-controller/controller.go @@ -8,7 +8,6 @@ import ( "time" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/election" clientset "github.com/spidernet-io/spiderpool/pkg/k8s/client/clientset/versioned" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -19,8 +18,7 @@ func StartController(ctx context.Context, leaderRetryElectGap time.Duration, spiderClientset clientset.Interface, kubeClient kubernetes.Interface, - informerFactory informers.SharedInformerFactory, - leader election.SpiderLeaseElector) error { + informerFactory informers.SharedInformerFactory) error { driver := NewDriver(spiderClientset) controller := controller.New(ctx, constant.DRADriverName, driver, kubeClient, informerFactory) @@ -32,29 +30,8 @@ func StartController(ctx context.Context, return default: } - - if !leader.IsElected() { - time.Sleep(leaderRetryElectGap) - continue - } - innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !leader.IsElected() { - innerCancel() - return - } - time.Sleep(leaderRetryElectGap) - } - }() - + defer innerCancel() informerFactory.Start(innerCtx.Done()) controller.Run(1) } diff --git a/pkg/election/lease_election.go b/pkg/election/lease_election.go index 6b9b229b48..41af5afc64 100644 --- a/pkg/election/lease_election.go +++ b/pkg/election/lease_election.go @@ -1,6 +1,3 @@ -// Copyright 2022 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - package election import ( @@ -9,29 +6,28 @@ import ( "regexp" "time" + "github.com/spidernet-io/spiderpool/pkg/constant" + "github.com/spidernet-io/spiderpool/pkg/lock" + "github.com/spidernet-io/spiderpool/pkg/logutils" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" - - "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/lock" - "github.com/spidernet-io/spiderpool/pkg/logutils" ) var logger *zap.Logger +// SpiderLeaseElector interface defines the leader election methods type SpiderLeaseElector interface { - Run(ctx context.Context, clientSet kubernetes.Interface) error - // IsElected returns a boolean value to check current Elector whether is a leader + Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error IsElected() bool GetLeader() string } +// SpiderLeader implements SpiderLeaseElector type SpiderLeader struct { lock.RWMutex - leaseLockName string leaseLockNamespace string leaseLockIdentity string @@ -39,12 +35,10 @@ type SpiderLeader struct { leaseRenewDeadline time.Duration leaseRetryPeriod time.Duration leaderRetryElectGap time.Duration - - isLeader bool - leaderElector *leaderelection.LeaderElector + isLeader bool + leaderElector *leaderelection.LeaderElector } -// NewLeaseElector will return a SpiderLeaseElector object func NewLeaseElector(leaseLockNS, leaseLockName, leaseLockIdentity string, leaseDuration, leaseRenewDeadline, leaseRetryPeriod, leaderRetryElectGap *time.Duration) (SpiderLeaseElector, error) { if len(leaseLockNS) == 0 { @@ -94,21 +88,11 @@ func NewLeaseElector(leaseLockNS, leaseLockName, leaseLockIdentity string, return sl, nil } -func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface) error { +// Run executes the leader election process with the given callbacks +func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error { logger = logutils.Logger.Named("Lease-Lock-Election") - err := sl.register(clientSet) - if nil != err { - return err - } - - go sl.tryToElect(ctx) - - return nil -} - -// register will new client-go LeaderElector object with options configurations -func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error { + // Create a LeaseLock object that defines the resource to be locked leaseLock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: sl.leaseLockName, @@ -120,61 +104,57 @@ func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error { }, } + // Initialize the leader elector with the configuration le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: leaseLock, LeaseDuration: sl.leaseDuration, RenewDeadline: sl.leaseRenewDeadline, RetryPeriod: sl.leaseRetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { + // Callback when this instance becomes the leader + OnStartedLeading: func(ctx context.Context) { sl.Lock() sl.isLeader = true sl.Unlock() - logger.Sugar().Infof("leader elected: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) + logger.Sugar().Infof("lease %s/%s leader election succeeded, leader identity: [%s]", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) + callbacks.OnStartedLeading(ctx) }, + // Callback when this instance stops being the leader OnStoppedLeading: func() { - // we can do cleanup here sl.Lock() sl.isLeader = false sl.Unlock() - logger.Sugar().Warnf("leader lost: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) + logger.Sugar().Warnf("lease %s/%s leader election failed, leader lost: [%s]", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) + callbacks.OnStoppedLeading() }, + // Callback when a new leader is elected + OnNewLeader: callbacks.OnNewLeader, }, ReleaseOnCancel: true, }) - if nil != err { - return fmt.Errorf("unable to new leader elector: %w", err) + + // Return error if leader elector creation fails + if err != nil { + return fmt.Errorf("unable to create new leader elector: %w", err) } + // Assign the leader elector to the SpiderLeader object sl.leaderElector = le + + // Start the leader election process + sl.leaderElector.Run(ctx) + return nil } +// IsElected checks if the current instance is the leader func (sl *SpiderLeader) IsElected() bool { sl.RLock() defer sl.RUnlock() - return sl.isLeader } -// tryToElect will elect continually -func (sl *SpiderLeader) tryToElect(ctx context.Context) { - for { - logger.Sugar().Infof("'%s/%s/%s' is trying to elect", - sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) - - // Once a node acquire the lease lock and become the leader, it will renew the lease lock continually until it failed to interact with API server. - // In this case the node will lose leader title and try to elect again. - // If there's a leader and another node will try to acquire the lease lock persistently until the leader renew failed. - sl.leaderElector.Run(ctx) - - logger.Sugar().Warnf("'%s/%s/%s' election request disconnected, and it will continue to elect after '%v'", - sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity, sl.leaderRetryElectGap) - - time.Sleep(sl.leaderRetryElectGap) - } -} - +// GetLeader returns the current leader's identity func (sl *SpiderLeader) GetLeader() string { return sl.leaderElector.GetLeader() } diff --git a/pkg/election/lease_election_test.go b/pkg/election/lease_election_test.go index f112061e74..a741339222 100644 --- a/pkg/election/lease_election_test.go +++ b/pkg/election/lease_election_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/leaderelection" ) var _ = Describe("Leader Election", Label("unittest", "election_test"), func() { @@ -103,7 +104,7 @@ var _ = Describe("Leader Election", Label("unittest", "election_test"), func() { It("check leader election function", func() { ctx, cancel := context.WithCancel(context.TODO()) - err = spiderLeaseElector.Run(ctx, fake.NewSimpleClientset()) + err = spiderLeaseElector.Run(ctx, fake.NewSimpleClientset(), leaderelection.LeaderCallbacks{}) Expect(err).NotTo(HaveOccurred()) // wait for us to become leader diff --git a/pkg/election/mock/lease_election_mock.go b/pkg/election/mock/lease_election_mock.go index 065675029b..987a0f9c4c 100644 --- a/pkg/election/mock/lease_election_mock.go +++ b/pkg/election/mock/lease_election_mock.go @@ -13,6 +13,7 @@ import ( gomock "github.com/golang/mock/gomock" kubernetes "k8s.io/client-go/kubernetes" + leaderelection "k8s.io/client-go/tools/leaderelection" ) // MockSpiderLeaseElector is a mock of SpiderLeaseElector interface. @@ -67,15 +68,15 @@ func (mr *MockSpiderLeaseElectorMockRecorder) IsElected() *gomock.Call { } // Run mocks base method. -func (m *MockSpiderLeaseElector) Run(ctx context.Context, clientSet kubernetes.Interface) error { +func (m *MockSpiderLeaseElector) Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", ctx, clientSet) + ret := m.ctrl.Call(m, "Run", ctx, clientSet, callbacks) ret0, _ := ret[0].(error) return ret0 } // Run indicates an expected call of Run. -func (mr *MockSpiderLeaseElectorMockRecorder) Run(ctx, clientSet interface{}) *gomock.Call { +func (mr *MockSpiderLeaseElectorMockRecorder) Run(ctx, clientSet, callbacks interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSpiderLeaseElector)(nil).Run), ctx, clientSet) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSpiderLeaseElector)(nil).Run), ctx, clientSet, callbacks) } diff --git a/pkg/gcmanager/gc_manager.go b/pkg/gcmanager/gc_manager.go index 05d773d810..34e64270dd 100644 --- a/pkg/gcmanager/gc_manager.go +++ b/pkg/gcmanager/gc_manager.go @@ -126,9 +126,8 @@ func NewGCManager(clientSet *kubernetes.Clientset, config *GarbageCollectionConf stsMgr: stsManager, kubevirtMgr: kubevirtMgr, nodeMgr: nodeMgr, - - leader: spiderControllerLeader, - gcLimiter: limiter.NewLimiter(limiter.LimiterConfig{}), + leader: spiderControllerLeader, + gcLimiter: limiter.NewLimiter(limiter.LimiterConfig{}), } return spiderGC, nil diff --git a/pkg/gcmanager/pod_informer.go b/pkg/gcmanager/pod_informer.go index fe5e44f5f4..c984b973d1 100644 --- a/pkg/gcmanager/pod_informer.go +++ b/pkg/gcmanager/pod_informer.go @@ -5,7 +5,6 @@ package gcmanager import ( "context" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" @@ -22,31 +21,10 @@ func (s *SpiderGC) startPodInformer(ctx context.Context) { return default: } - - if !s.leader.IsElected() { - time.Sleep(s.gcConfig.LeaderRetryElectGap) - continue - } - + logger.Info("create Pod informer") innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !s.leader.IsElected() { - logger.Warn("Leader lost, stop IP GC pod informer") - innerCancel() - return - } - time.Sleep(s.gcConfig.LeaderRetryElectGap) - } - }() + defer innerCancel() - logger.Info("create Pod informer") informerFactory := informers.NewSharedInformerFactory(s.k8ClientSet, 0) podInformer := informerFactory.Core().V1().Pods().Informer() _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/ippoolmanager/ippool_informer.go b/pkg/ippoolmanager/ippool_informer.go index 6cba9e95d0..aaf9faca10 100644 --- a/pkg/ippoolmanager/ippool_informer.go +++ b/pkg/ippoolmanager/ippool_informer.go @@ -24,7 +24,6 @@ import ( "github.com/spidernet-io/spiderpool/pkg/applicationcontroller/applicationinformers" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/election" spiderpoolip "github.com/spidernet-io/spiderpool/pkg/ip" spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1" crdclientset "github.com/spidernet-io/spiderpool/pkg/k8s/client/clientset/versioned" @@ -34,6 +33,7 @@ import ( "github.com/spidernet-io/spiderpool/pkg/logutils" "github.com/spidernet-io/spiderpool/pkg/metric" "github.com/spidernet-io/spiderpool/pkg/types" + "k8s.io/client-go/kubernetes" ) var informerLogger *zap.Logger @@ -42,6 +42,7 @@ type IPPoolController struct { IPPoolControllerConfig client client.Client dynamicClient dynamic.Interface + clientSet kubernetes.Interface poolLister listers.SpiderIPPoolLister poolSynced cache.InformerSynced poolWorkqueue workqueue.RateLimitingInterface @@ -70,12 +71,8 @@ func NewIPPoolController(poolControllerConfig IPPoolControllerConfig, client cli return c } -func (ic *IPPoolController) SetupInformer(ctx context.Context, client crdclientset.Interface, controllerLeader election.SpiderLeaseElector) error { - if controllerLeader == nil { - return fmt.Errorf("failed to start SpiderIPPool informer, controller leader must be specified") - } - - informerLogger.Info("try to register SpiderIPPool informer") +func (ic *IPPoolController) SetupInformer(ctx context.Context, client crdclientset.Interface) error { + informerLogger.Info("create SpiderIPPool informer") go func() { for { select { @@ -84,30 +81,9 @@ func (ic *IPPoolController) SetupInformer(ctx context.Context, client crdclients default: } - if !controllerLeader.IsElected() { - time.Sleep(ic.LeaderRetryElectGap) - continue - } - innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !controllerLeader.IsElected() { - informerLogger.Warn("Leader lost, stop IPPool informer") - innerCancel() - return - } - time.Sleep(ic.LeaderRetryElectGap) - } - }() + defer innerCancel() - informerLogger.Info("create SpiderIPPool informer") factory := externalversions.NewSharedInformerFactory(client, ic.ResyncPeriod) err := ic.addEventHandlers(factory.Spiderpool().V2beta1().SpiderIPPools()) if nil != err { @@ -119,10 +95,10 @@ func (ic *IPPoolController) SetupInformer(ctx context.Context, client crdclients if err := ic.Run(innerCtx.Done()); nil != err { informerLogger.Sugar().Errorf("failed to run ippool controller, error: %v", err) } + informerLogger.Error("SpiderIPPool informer broken") } }() - return nil } diff --git a/pkg/multuscniconfig/multusconfig_informer.go b/pkg/multuscniconfig/multusconfig_informer.go index 9d3c52372f..4daa3f53ad 100644 --- a/pkg/multuscniconfig/multusconfig_informer.go +++ b/pkg/multuscniconfig/multusconfig_informer.go @@ -27,7 +27,6 @@ import ( "github.com/spidernet-io/spiderpool/cmd/spiderpool/cmd" spiderpoolcmd "github.com/spidernet-io/spiderpool/cmd/spiderpool/cmd" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/election" spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1" crdclientset "github.com/spidernet-io/spiderpool/pkg/k8s/client/clientset/versioned" "github.com/spidernet-io/spiderpool/pkg/k8s/client/informers/externalversions" @@ -65,10 +64,7 @@ func NewMultusConfigController(multusConfigControllerConfig MultusConfigControll return m } -func (mcc *MultusConfigController) SetupInformer(ctx context.Context, client crdclientset.Interface, leader election.SpiderLeaseElector) error { - if leader == nil { - return fmt.Errorf("controller leader %w", constant.ErrMissingRequiredParam) - } +func (mcc *MultusConfigController) SetupInformer(ctx context.Context, client crdclientset.Interface) error { informerLogger.Info("try to register MultusConfig informer") go func() { @@ -78,29 +74,8 @@ func (mcc *MultusConfigController) SetupInformer(ctx context.Context, client crd return default: } - - if !leader.IsElected() { - time.Sleep(mcc.LeaderRetryElectGap) - continue - } - innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !leader.IsElected() { - informerLogger.Warn("Leader lost, stop MultusConfig informer") - innerCancel() - return - } - time.Sleep(mcc.LeaderRetryElectGap) - } - }() + defer innerCancel() informerLogger.Info("create MultusConfig informer") factory := externalversions.NewSharedInformerFactory(client, mcc.ResyncPeriod) diff --git a/pkg/subnetmanager/subnet_informer.go b/pkg/subnetmanager/subnet_informer.go index 388d38bc93..bb7ff3995b 100644 --- a/pkg/subnetmanager/subnet_informer.go +++ b/pkg/subnetmanager/subnet_informer.go @@ -29,7 +29,6 @@ import ( "github.com/spidernet-io/spiderpool/pkg/applicationcontroller/applicationinformers" "github.com/spidernet-io/spiderpool/pkg/constant" - "github.com/spidernet-io/spiderpool/pkg/election" spiderpoolip "github.com/spidernet-io/spiderpool/pkg/ip" "github.com/spidernet-io/spiderpool/pkg/ippoolmanager" spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1" @@ -81,16 +80,12 @@ type thirdControllerKey struct { AppUID types.UID } -func (sc *SubnetController) SetupInformer(ctx context.Context, client clientset.Interface, leader election.SpiderLeaseElector) error { +func (sc *SubnetController) SetupInformer(ctx context.Context, client clientset.Interface) error { if client == nil { return fmt.Errorf("spiderpoolv2beta1 clientset %w", constant.ErrMissingRequiredParam) } - if leader == nil { - return fmt.Errorf("controller leader %w", constant.ErrMissingRequiredParam) - } InformerLogger = logutils.Logger.Named("Subnet-Informer") - go func() { for { select { @@ -98,29 +93,8 @@ func (sc *SubnetController) SetupInformer(ctx context.Context, client clientset. return default: } - - if !leader.IsElected() { - time.Sleep(sc.LeaderRetryElectGap) - continue - } - innerCtx, innerCancel := context.WithCancel(ctx) - go func() { - for { - select { - case <-innerCtx.Done(): - return - default: - } - - if !leader.IsElected() { - InformerLogger.Warn("Leader lost, stop Subnet informer") - innerCancel() - return - } - time.Sleep(sc.LeaderRetryElectGap) - } - }() + defer innerCancel() InformerLogger.Info("Initialize Dynamic informer") sc.dynamicFactory = dynamicinformer.NewDynamicSharedInformerFactory(sc.DynamicClient, 0) diff --git a/pkg/subnetmanager/subnet_informer_test.go b/pkg/subnetmanager/subnet_informer_test.go index 6dfac88a87..7c92d8b668 100644 --- a/pkg/subnetmanager/subnet_informer_test.go +++ b/pkg/subnetmanager/subnet_informer_test.go @@ -104,7 +104,7 @@ var _ = Describe("SubnetController", Label("subnet_controller_test"), func() { fakeClientset.PrependWatchReactor("spidersubnets", testing.DefaultWatchReactor(fakeSubnetWatch, nil)) fakeClientset.PrependWatchReactor("spiderippools", testing.DefaultWatchReactor(fakeIPPoolWatch, nil)) - err := subnetController.SetupInformer(bCtx, fakeClientset, mockLeaderElector) + err := subnetController.SetupInformer(bCtx, fakeClientset) Expect(err).NotTo(HaveOccurred()) Eventually(func(g Gomega) { g.Expect(subnetController.SubnetIndexer).NotTo(BeNil())