diff --git a/cmd/spiderpool-controller/cmd/config.go b/cmd/spiderpool-controller/cmd/config.go index 276c5cebe0..1a5df4ee06 100644 --- a/cmd/spiderpool-controller/cmd/config.go +++ b/cmd/spiderpool-controller/cmd/config.go @@ -78,9 +78,9 @@ var envInfo = []envConf{ {"SPIDERPOOL_GC_PODENTRY_MAX_RETRIES", "5", true, nil, nil, &gcIPConfig.WorkQueueMaxRetries}, {"SPIDERPOOL_POD_NAMESPACE", "", true, &controllerContext.Cfg.ControllerPodNamespace, nil, nil}, {"SPIDERPOOL_POD_NAME", "", true, &controllerContext.Cfg.ControllerPodName, nil, nil}, - {"SPIDERPOOL_LEADER_DURATION", "15", true, nil, nil, &controllerContext.Cfg.LeaseDuration}, - {"SPIDERPOOL_LEADER_RENEW_DEADLINE", "10", true, nil, nil, &controllerContext.Cfg.LeaseRenewDeadline}, - {"SPIDERPOOL_LEADER_RETRY_PERIOD", "2", true, nil, nil, &controllerContext.Cfg.LeaseRetryPeriod}, + {"SPIDERPOOL_LEADER_DURATION", "30", true, nil, nil, &controllerContext.Cfg.LeaseDuration}, + {"SPIDERPOOL_LEADER_RENEW_DEADLINE", "20", true, nil, nil, &controllerContext.Cfg.LeaseRenewDeadline}, + {"SPIDERPOOL_LEADER_RETRY_PERIOD", "5", true, nil, nil, &controllerContext.Cfg.LeaseRetryPeriod}, {"SPIDERPOOL_LEADER_RETRY_GAP", "1", true, nil, nil, &controllerContext.Cfg.LeaseRetryGap}, {"SPIDERPOOL_IPPOOL_MAX_ALLOCATED_IPS", "5000", false, nil, nil, &controllerContext.Cfg.IPPoolMaxAllocatedIPs}, diff --git a/cmd/spiderpool-controller/cmd/daemon.go b/cmd/spiderpool-controller/cmd/daemon.go index 88023c0565..5a3ba480a6 100644 --- a/cmd/spiderpool-controller/cmd/daemon.go +++ b/cmd/spiderpool-controller/cmd/daemon.go @@ -43,6 +43,7 @@ import ( "github.com/spidernet-io/spiderpool/pkg/statefulsetmanager" "github.com/spidernet-io/spiderpool/pkg/subnetmanager" "github.com/spidernet-io/spiderpool/pkg/workloadendpointmanager" + leaderelection "k8s.io/client-go/tools/leaderelection" ) // DaemonMain runs controllerContext handlers. @@ -445,7 +446,20 @@ func initSpiderControllerLeaderElect(ctx context.Context) { logger.Fatal(err.Error()) } - err = leaderElector.Run(ctx, controllerContext.ClientSet) + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + logger.Info("Begin to set up Leader informer") + }, + OnStoppedLeading: func() { + logger.Warn("Lost leadership, stopping SpiderIPPool informer") + // Stop the informer when losing leadership + os.Exit(0) + }, + OnNewLeader: func(identity string) { + logger.Sugar().Infof("New leader elected: %s", identity) + }, + } + err = leaderElector.Run(ctx, controllerContext.ClientSet, callbacks) if nil != err { logger.Fatal(err.Error()) } @@ -511,6 +525,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) { }, controllerContext.CRDManager.GetClient(), controllerContext.DynamicClient, + controllerContext.ClientSet, ) err = ipPoolController.SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader) if nil != err { diff --git a/pkg/election/lease_election.go b/pkg/election/lease_election.go index 6b9b229b48..3ce0edcae8 100644 --- a/pkg/election/lease_election.go +++ b/pkg/election/lease_election.go @@ -18,15 +18,17 @@ import ( "github.com/spidernet-io/spiderpool/pkg/constant" "github.com/spidernet-io/spiderpool/pkg/lock" "github.com/spidernet-io/spiderpool/pkg/logutils" + "k8s.io/apimachinery/pkg/util/uuid" ) var logger *zap.Logger type SpiderLeaseElector interface { - Run(ctx context.Context, clientSet kubernetes.Interface) error + Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error // IsElected returns a boolean value to check current Elector whether is a leader IsElected() bool GetLeader() string + // LeaderElectAndRun(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error } type SpiderLeader struct { @@ -94,10 +96,10 @@ func NewLeaseElector(leaseLockNS, leaseLockName, leaseLockIdentity string, return sl, nil } -func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface) error { +func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error { logger = logutils.Logger.Named("Lease-Lock-Election") - err := sl.register(clientSet) + err := sl.register(clientSet, callbacks) if nil != err { return err } @@ -108,7 +110,9 @@ func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface) } // register will new client-go LeaderElector object with options configurations -func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error { +func (sl *SpiderLeader) register(clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error { + leaseLockIdentity := sl.leaseLockIdentity + "_" + string(uuid.NewUUID()) + leaseLock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: sl.leaseLockName, @@ -116,32 +120,19 @@ func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error { }, Client: clientSet.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ - Identity: sl.leaseLockIdentity, + Identity: leaseLockIdentity, }, } le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: leaseLock, - LeaseDuration: sl.leaseDuration, - RenewDeadline: sl.leaseRenewDeadline, - RetryPeriod: sl.leaseRetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - sl.Lock() - sl.isLeader = true - sl.Unlock() - logger.Sugar().Infof("leader elected: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) - }, - OnStoppedLeading: func() { - // we can do cleanup here - sl.Lock() - sl.isLeader = false - sl.Unlock() - logger.Sugar().Warnf("leader lost: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity) - }, - }, + Lock: leaseLock, + LeaseDuration: sl.leaseDuration, + RenewDeadline: sl.leaseRenewDeadline, + RetryPeriod: sl.leaseRetryPeriod, + Callbacks: callbacks, ReleaseOnCancel: true, }) + if nil != err { return fmt.Errorf("unable to new leader elector: %w", err) } @@ -150,6 +141,49 @@ func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error { return nil } +// func (sl *SpiderLeader) LeaderElectAndRun(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error { +// leaseLock := &resourcelock.LeaseLock{ +// LeaseMeta: metav1.ObjectMeta{ +// Name: sl.leaseLockName, +// Namespace: sl.leaseLockNamespace, +// }, +// Client: clientSet.CoordinationV1(), +// LockConfig: resourcelock.ResourceLockConfig{ +// Identity: sl.leaseLockIdentity, +// }, +// } + +// le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ +// Lock: leaseLock, +// LeaseDuration: sl.leaseDuration, +// RenewDeadline: sl.leaseRenewDeadline, +// RetryPeriod: sl.leaseRetryPeriod, +// Callbacks: callbacks, +// ReleaseOnCancel: true, +// }) + +// if nil != err { +// return fmt.Errorf("unable to new leader elector: %w", err) +// } +// sl.leaderElector = le +// // Setup any healthz checks we will want to use. +// var electionChecker *leaderelection.HealthzAdaptor +// if sl.isLeader { +// electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Minute) +// } + +// leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ +// Lock: leaseLock, +// LeaseDuration: sl.leaseDuration, +// RenewDeadline: sl.leaseRenewDeadline, +// RetryPeriod: sl.leaseRetryPeriod, +// Callbacks: callbacks, +// WatchDog: electionChecker, +// }) + +// return fmt.Errorf("leader elector %s/%s unreachable: %w", sl.leaseLockNamespace, sl.leaseLockName, err) +// } + func (sl *SpiderLeader) IsElected() bool { sl.RLock() defer sl.RUnlock() diff --git a/pkg/election/lease_election_test.go b/pkg/election/lease_election_test.go index f112061e74..a741339222 100644 --- a/pkg/election/lease_election_test.go +++ b/pkg/election/lease_election_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/leaderelection" ) var _ = Describe("Leader Election", Label("unittest", "election_test"), func() { @@ -103,7 +104,7 @@ var _ = Describe("Leader Election", Label("unittest", "election_test"), func() { It("check leader election function", func() { ctx, cancel := context.WithCancel(context.TODO()) - err = spiderLeaseElector.Run(ctx, fake.NewSimpleClientset()) + err = spiderLeaseElector.Run(ctx, fake.NewSimpleClientset(), leaderelection.LeaderCallbacks{}) Expect(err).NotTo(HaveOccurred()) // wait for us to become leader diff --git a/pkg/election/mock/lease_election_mock.go b/pkg/election/mock/lease_election_mock.go index 065675029b..cea3e375fe 100644 --- a/pkg/election/mock/lease_election_mock.go +++ b/pkg/election/mock/lease_election_mock.go @@ -1,6 +1,3 @@ -// Copyright 2022 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - // Code generated by MockGen. DO NOT EDIT. // Source: ./lease_election.go @@ -13,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" kubernetes "k8s.io/client-go/kubernetes" + leaderelection "k8s.io/client-go/tools/leaderelection" ) // MockSpiderLeaseElector is a mock of SpiderLeaseElector interface. @@ -67,15 +65,15 @@ func (mr *MockSpiderLeaseElectorMockRecorder) IsElected() *gomock.Call { } // Run mocks base method. -func (m *MockSpiderLeaseElector) Run(ctx context.Context, clientSet kubernetes.Interface) error { +func (m *MockSpiderLeaseElector) Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", ctx, clientSet) + ret := m.ctrl.Call(m, "Run", ctx, clientSet, callbacks) ret0, _ := ret[0].(error) return ret0 } // Run indicates an expected call of Run. -func (mr *MockSpiderLeaseElectorMockRecorder) Run(ctx, clientSet interface{}) *gomock.Call { +func (mr *MockSpiderLeaseElectorMockRecorder) Run(ctx, clientSet, callbacks interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSpiderLeaseElector)(nil).Run), ctx, clientSet) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSpiderLeaseElector)(nil).Run), ctx, clientSet, callbacks) } diff --git a/pkg/ippoolmanager/ippool_informer.go b/pkg/ippoolmanager/ippool_informer.go index 6cba9e95d0..fc62dd0056 100644 --- a/pkg/ippoolmanager/ippool_informer.go +++ b/pkg/ippoolmanager/ippool_informer.go @@ -34,6 +34,8 @@ import ( "github.com/spidernet-io/spiderpool/pkg/logutils" "github.com/spidernet-io/spiderpool/pkg/metric" "github.com/spidernet-io/spiderpool/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" ) var informerLogger *zap.Logger @@ -42,6 +44,7 @@ type IPPoolController struct { IPPoolControllerConfig client client.Client dynamicClient dynamic.Interface + clientSet kubernetes.Interface poolLister listers.SpiderIPPoolLister poolSynced cache.InformerSynced poolWorkqueue workqueue.RateLimitingInterface @@ -58,13 +61,14 @@ type IPPoolControllerConfig struct { ResyncPeriod time.Duration } -func NewIPPoolController(poolControllerConfig IPPoolControllerConfig, client client.Client, dynamicClient dynamic.Interface) *IPPoolController { +func NewIPPoolController(poolControllerConfig IPPoolControllerConfig, client client.Client, dynamicClient dynamic.Interface, clientSet kubernetes.Interface) *IPPoolController { informerLogger = logutils.Logger.Named("SpiderIPPool-Informer") c := &IPPoolController{ IPPoolControllerConfig: poolControllerConfig, client: client, dynamicClient: dynamicClient, + clientSet: clientSet, } return c @@ -75,6 +79,52 @@ func (ic *IPPoolController) SetupInformer(ctx context.Context, client crdclients return fmt.Errorf("failed to start SpiderIPPool informer, controller leader must be specified") } + informerLogger.Info("registering SpiderIPPool informer") + + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + informerLogger.Info("Became leader, starting SpiderIPPool informer") + + innerCtx, innerCancel := context.WithCancel(ctx) + defer innerCancel() + + factory := externalversions.NewSharedInformerFactory(client, ic.ResyncPeriod) + err := ic.addEventHandlers(factory.Spiderpool().V2beta1().SpiderIPPools()) + if err != nil { + informerLogger.Error(err.Error()) + return + } + + factory.Start(innerCtx.Done()) + if err := ic.Run(innerCtx.Done()); err != nil { + informerLogger.Sugar().Errorf("failed to run ippool controller, error: %v", err) + } + informerLogger.Info("SpiderIPPool informer has stopped") + }, + OnStoppedLeading: func() { + informerLogger.Warn("Lost leadership, stopping SpiderIPPool informer") + // Stop the informer when losing leadership + ic.poolWorkqueue.ShutDown() + }, + OnNewLeader: func(identity string) { + if identity == controllerLeader.GetLeader() { + informerLogger.Sugar().Infof("New leader elected: %s", identity) + } + }, + } + + go func() { + controllerLeader.Run(ctx, ic.clientSet, callbacks) + }() + + return nil +} + +func (ic *IPPoolController) SetupInformerDebug(ctx context.Context, client crdclientset.Interface, controllerLeader election.SpiderLeaseElector) error { + if controllerLeader == nil { + return fmt.Errorf("failed to start SpiderIPPool informer, controller leader must be specified") + } + informerLogger.Info("try to register SpiderIPPool informer") go func() { for { diff --git a/pkg/ippoolmanager/ippool_informer_test.go b/pkg/ippoolmanager/ippool_informer_test.go index 605bea4711..6154b0d6d0 100644 --- a/pkg/ippoolmanager/ippool_informer_test.go +++ b/pkg/ippoolmanager/ippool_informer_test.go @@ -16,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" dynamicfake "k8s.io/client-go/dynamic/fake" + clientSetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -212,6 +213,7 @@ type poolController struct { func newController() *poolController { fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + fakeclientSetClient := clientSetfake.NewSimpleClientset() poolControllerConfig = IPPoolControllerConfig{ IPPoolControllerWorkers: 3, @@ -223,7 +225,7 @@ func newController() *poolController { ResyncPeriod: 10 * time.Second, } - pController := NewIPPoolController(poolControllerConfig, fakeClient, fakeDynamicClient) + pController := NewIPPoolController(poolControllerConfig, fakeClient, fakeDynamicClient, fakeclientSetClient) return &poolController{ IPPoolController: pController, diff --git a/pkg/subnetmanager/subnet_informer.go b/pkg/subnetmanager/subnet_informer.go index 388d38bc93..1d151885ff 100644 --- a/pkg/subnetmanager/subnet_informer.go +++ b/pkg/subnetmanager/subnet_informer.go @@ -41,6 +41,8 @@ import ( "github.com/spidernet-io/spiderpool/pkg/metric" spiderpooltypes "github.com/spidernet-io/spiderpool/pkg/types" "github.com/spidernet-io/spiderpool/pkg/utils/convert" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" ) const ( @@ -81,7 +83,7 @@ type thirdControllerKey struct { AppUID types.UID } -func (sc *SubnetController) SetupInformer(ctx context.Context, client clientset.Interface, leader election.SpiderLeaseElector) error { +func (sc *SubnetController) SetupInformerDebug(ctx context.Context, client clientset.Interface, leader election.SpiderLeaseElector, clientSet kubernetes.Interface) error { if client == nil { return fmt.Errorf("spiderpoolv2beta1 clientset %w", constant.ErrMissingRequiredParam) } @@ -149,6 +151,72 @@ func (sc *SubnetController) SetupInformer(ctx context.Context, client clientset. return nil } +func (sc *SubnetController) SetupInformer(ctx context.Context, client clientset.Interface, leader election.SpiderLeaseElector) error { + if client == nil { + return fmt.Errorf("spiderpoolv2beta1 clientset %w", constant.ErrMissingRequiredParam) + } + if leader == nil { + return fmt.Errorf("controller leader %w", constant.ErrMissingRequiredParam) + } + + InformerLogger = logutils.Logger.Named("Subnet-Informer") + + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + InformerLogger.Info("Became leader, starting Subnet informer") + + innerCtx, innerCancel := context.WithCancel(ctx) + defer innerCancel() + + // Initialize Subnet and Dynamic informer + sc.dynamicFactory = dynamicinformer.NewDynamicSharedInformerFactory(sc.DynamicClient, 0) + sc.dynamicWorkqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Dynamic-Objects") + + InformerLogger.Info("Initializing Subnet informer") + informerFactory := externalversions.NewSharedInformerFactory(client, sc.ResyncPeriod) + + // Register event handlers + err := sc.addEventHandlers( + informerFactory.Spiderpool().V2beta1().SpiderSubnets(), + informerFactory.Spiderpool().V2beta1().SpiderIPPools(), + ) + if err != nil { + InformerLogger.Error(err.Error()) + return + } + + // Start informer and controller + informerFactory.Start(innerCtx.Done()) + if err := sc.run(logutils.IntoContext(innerCtx, InformerLogger), sc.SubnetControllerWorkers); err != nil { + InformerLogger.Sugar().Errorf("failed to run Subnet informer: %v", err) + innerCancel() + } + + InformerLogger.Info("Subnet informer has stopped") + }, + OnStoppedLeading: func() { + InformerLogger.Warn("Lost leadership, stopping Subnet informer") + // Stop the informer when losing leadership + sc.Workqueue.ShutDown() + }, + OnNewLeader: func(identity string) { + InformerLogger.Sugar().Infof("New leader elected: %s", identity) + }, + } + + clientSet, err := kubernetes.NewForConfig(ctrl.GetConfigOrDie()) + if nil != err { + return fmt.Errorf("failed to init K8s clientset: %v", err) + } + + // Start leader election + go func() { + leader.Run(ctx, clientSet, callbacks) + }() + + return nil +} + func (sc *SubnetController) addEventHandlers(subnetInformer informers.SpiderSubnetInformer, ipPoolInformer informers.SpiderIPPoolInformer) error { sc.SubnetsLister = subnetInformer.Lister() sc.IPPoolsLister = ipPoolInformer.Lister()