Skip to content

Commit

Permalink
Optimize spiderpool leader lease election
Browse files Browse the repository at this point in the history
Signed-off-by: ty-dc <[email protected]>
  • Loading branch information
ty-dc committed Sep 11, 2024
1 parent 9e2c46b commit f88c31c
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 39 deletions.
6 changes: 3 additions & 3 deletions cmd/spiderpool-controller/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ var envInfo = []envConf{
{"SPIDERPOOL_GC_PODENTRY_MAX_RETRIES", "5", true, nil, nil, &gcIPConfig.WorkQueueMaxRetries},
{"SPIDERPOOL_POD_NAMESPACE", "", true, &controllerContext.Cfg.ControllerPodNamespace, nil, nil},
{"SPIDERPOOL_POD_NAME", "", true, &controllerContext.Cfg.ControllerPodName, nil, nil},
{"SPIDERPOOL_LEADER_DURATION", "15", true, nil, nil, &controllerContext.Cfg.LeaseDuration},
{"SPIDERPOOL_LEADER_RENEW_DEADLINE", "10", true, nil, nil, &controllerContext.Cfg.LeaseRenewDeadline},
{"SPIDERPOOL_LEADER_RETRY_PERIOD", "2", true, nil, nil, &controllerContext.Cfg.LeaseRetryPeriod},
{"SPIDERPOOL_LEADER_DURATION", "30", true, nil, nil, &controllerContext.Cfg.LeaseDuration},
{"SPIDERPOOL_LEADER_RENEW_DEADLINE", "20", true, nil, nil, &controllerContext.Cfg.LeaseRenewDeadline},
{"SPIDERPOOL_LEADER_RETRY_PERIOD", "5", true, nil, nil, &controllerContext.Cfg.LeaseRetryPeriod},
{"SPIDERPOOL_LEADER_RETRY_GAP", "1", true, nil, nil, &controllerContext.Cfg.LeaseRetryGap},

{"SPIDERPOOL_IPPOOL_MAX_ALLOCATED_IPS", "5000", false, nil, nil, &controllerContext.Cfg.IPPoolMaxAllocatedIPs},
Expand Down
17 changes: 16 additions & 1 deletion cmd/spiderpool-controller/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/spidernet-io/spiderpool/pkg/statefulsetmanager"
"github.com/spidernet-io/spiderpool/pkg/subnetmanager"
"github.com/spidernet-io/spiderpool/pkg/workloadendpointmanager"
leaderelection "k8s.io/client-go/tools/leaderelection"
)

// DaemonMain runs controllerContext handlers.
Expand Down Expand Up @@ -445,7 +446,20 @@ func initSpiderControllerLeaderElect(ctx context.Context) {
logger.Fatal(err.Error())
}

err = leaderElector.Run(ctx, controllerContext.ClientSet)
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
logger.Info("Begin to set up Leader informer")
},
OnStoppedLeading: func() {
logger.Warn("Lost leadership, stopping SpiderIPPool informer")
// Stop the informer when losing leadership
os.Exit(0)
},
OnNewLeader: func(identity string) {
logger.Sugar().Infof("New leader elected: %s", identity)
},
}
err = leaderElector.Run(ctx, controllerContext.ClientSet, callbacks)
if nil != err {
logger.Fatal(err.Error())
}
Expand Down Expand Up @@ -511,6 +525,7 @@ func setupInformers(k8sClient *kubernetes.Clientset) {
},
controllerContext.CRDManager.GetClient(),
controllerContext.DynamicClient,
controllerContext.ClientSet,
)
err = ipPoolController.SetupInformer(controllerContext.InnerCtx, crdClient, controllerContext.Leader)
if nil != err {
Expand Down
82 changes: 58 additions & 24 deletions pkg/election/lease_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"github.com/spidernet-io/spiderpool/pkg/constant"
"github.com/spidernet-io/spiderpool/pkg/lock"
"github.com/spidernet-io/spiderpool/pkg/logutils"
"k8s.io/apimachinery/pkg/util/uuid"
)

var logger *zap.Logger

type SpiderLeaseElector interface {
Run(ctx context.Context, clientSet kubernetes.Interface) error
Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error
// IsElected returns a boolean value to check current Elector whether is a leader
IsElected() bool
GetLeader() string
// LeaderElectAndRun(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error
}

type SpiderLeader struct {
Expand Down Expand Up @@ -94,10 +96,10 @@ func NewLeaseElector(leaseLockNS, leaseLockName, leaseLockIdentity string,
return sl, nil
}

func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface) error {
func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error {
logger = logutils.Logger.Named("Lease-Lock-Election")

err := sl.register(clientSet)
err := sl.register(clientSet, callbacks)
if nil != err {
return err
}
Expand All @@ -108,40 +110,29 @@ func (sl *SpiderLeader) Run(ctx context.Context, clientSet kubernetes.Interface)
}

// register will new client-go LeaderElector object with options configurations
func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error {
func (sl *SpiderLeader) register(clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error {
leaseLockIdentity := sl.leaseLockIdentity + "_" + string(uuid.NewUUID())

leaseLock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: sl.leaseLockName,
Namespace: sl.leaseLockNamespace,
},
Client: clientSet.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: sl.leaseLockIdentity,
Identity: leaseLockIdentity,
},
}

le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: leaseLock,
LeaseDuration: sl.leaseDuration,
RenewDeadline: sl.leaseRenewDeadline,
RetryPeriod: sl.leaseRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
sl.Lock()
sl.isLeader = true
sl.Unlock()
logger.Sugar().Infof("leader elected: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity)
},
OnStoppedLeading: func() {
// we can do cleanup here
sl.Lock()
sl.isLeader = false
sl.Unlock()
logger.Sugar().Warnf("leader lost: %s/%s/%s", sl.leaseLockNamespace, sl.leaseLockName, sl.leaseLockIdentity)
},
},
Lock: leaseLock,
LeaseDuration: sl.leaseDuration,
RenewDeadline: sl.leaseRenewDeadline,
RetryPeriod: sl.leaseRetryPeriod,
Callbacks: callbacks,
ReleaseOnCancel: true,
})

if nil != err {
return fmt.Errorf("unable to new leader elector: %w", err)
}
Expand All @@ -150,6 +141,49 @@ func (sl *SpiderLeader) register(clientSet kubernetes.Interface) error {
return nil
}

// func (sl *SpiderLeader) LeaderElectAndRun(ctx context.Context, clientSet kubernetes.Interface, callbacks leaderelection.LeaderCallbacks) error {
// leaseLock := &resourcelock.LeaseLock{
// LeaseMeta: metav1.ObjectMeta{
// Name: sl.leaseLockName,
// Namespace: sl.leaseLockNamespace,
// },
// Client: clientSet.CoordinationV1(),
// LockConfig: resourcelock.ResourceLockConfig{
// Identity: sl.leaseLockIdentity,
// },
// }

// le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
// Lock: leaseLock,
// LeaseDuration: sl.leaseDuration,
// RenewDeadline: sl.leaseRenewDeadline,
// RetryPeriod: sl.leaseRetryPeriod,
// Callbacks: callbacks,
// ReleaseOnCancel: true,
// })

// if nil != err {
// return fmt.Errorf("unable to new leader elector: %w", err)
// }
// sl.leaderElector = le
// // Setup any healthz checks we will want to use.
// var electionChecker *leaderelection.HealthzAdaptor
// if sl.isLeader {
// electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Minute)
// }

// leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
// Lock: leaseLock,
// LeaseDuration: sl.leaseDuration,
// RenewDeadline: sl.leaseRenewDeadline,
// RetryPeriod: sl.leaseRetryPeriod,
// Callbacks: callbacks,
// WatchDog: electionChecker,
// })

// return fmt.Errorf("leader elector %s/%s unreachable: %w", sl.leaseLockNamespace, sl.leaseLockName, err)
// }

func (sl *SpiderLeader) IsElected() bool {
sl.RLock()
defer sl.RUnlock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/election/lease_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/leaderelection"
)

var _ = Describe("Leader Election", Label("unittest", "election_test"), func() {
Expand Down Expand Up @@ -103,7 +104,7 @@ var _ = Describe("Leader Election", Label("unittest", "election_test"), func() {

It("check leader election function", func() {
ctx, cancel := context.WithCancel(context.TODO())
err = spiderLeaseElector.Run(ctx, fake.NewSimpleClientset())
err = spiderLeaseElector.Run(ctx, fake.NewSimpleClientset(), leaderelection.LeaderCallbacks{})
Expect(err).NotTo(HaveOccurred())

// wait for us to become leader
Expand Down
12 changes: 5 additions & 7 deletions pkg/election/mock/lease_election_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 51 additions & 1 deletion pkg/ippoolmanager/ippool_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/spidernet-io/spiderpool/pkg/logutils"
"github.com/spidernet-io/spiderpool/pkg/metric"
"github.com/spidernet-io/spiderpool/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
)

var informerLogger *zap.Logger
Expand All @@ -42,6 +44,7 @@ type IPPoolController struct {
IPPoolControllerConfig
client client.Client
dynamicClient dynamic.Interface
clientSet kubernetes.Interface
poolLister listers.SpiderIPPoolLister
poolSynced cache.InformerSynced
poolWorkqueue workqueue.RateLimitingInterface
Expand All @@ -58,13 +61,14 @@ type IPPoolControllerConfig struct {
ResyncPeriod time.Duration
}

func NewIPPoolController(poolControllerConfig IPPoolControllerConfig, client client.Client, dynamicClient dynamic.Interface) *IPPoolController {
func NewIPPoolController(poolControllerConfig IPPoolControllerConfig, client client.Client, dynamicClient dynamic.Interface, clientSet kubernetes.Interface) *IPPoolController {
informerLogger = logutils.Logger.Named("SpiderIPPool-Informer")

c := &IPPoolController{
IPPoolControllerConfig: poolControllerConfig,
client: client,
dynamicClient: dynamicClient,
clientSet: clientSet,
}

return c
Expand All @@ -75,6 +79,52 @@ func (ic *IPPoolController) SetupInformer(ctx context.Context, client crdclients
return fmt.Errorf("failed to start SpiderIPPool informer, controller leader must be specified")
}

informerLogger.Info("registering SpiderIPPool informer")

callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
informerLogger.Info("Became leader, starting SpiderIPPool informer")

innerCtx, innerCancel := context.WithCancel(ctx)
defer innerCancel()

factory := externalversions.NewSharedInformerFactory(client, ic.ResyncPeriod)
err := ic.addEventHandlers(factory.Spiderpool().V2beta1().SpiderIPPools())
if err != nil {
informerLogger.Error(err.Error())
return
}

factory.Start(innerCtx.Done())
if err := ic.Run(innerCtx.Done()); err != nil {
informerLogger.Sugar().Errorf("failed to run ippool controller, error: %v", err)
}
informerLogger.Info("SpiderIPPool informer has stopped")
},
OnStoppedLeading: func() {
informerLogger.Warn("Lost leadership, stopping SpiderIPPool informer")
// Stop the informer when losing leadership
ic.poolWorkqueue.ShutDown()
},
OnNewLeader: func(identity string) {
if identity == controllerLeader.GetLeader() {
informerLogger.Sugar().Infof("New leader elected: %s", identity)
}
},
}

go func() {
controllerLeader.Run(ctx, ic.clientSet, callbacks)

Check failure on line 117 in pkg/ippoolmanager/ippool_informer.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `controllerLeader.Run` is not checked (errcheck)

Check failure on line 117 in pkg/ippoolmanager/ippool_informer.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `controllerLeader.Run` is not checked (errcheck)
}()

return nil
}

func (ic *IPPoolController) SetupInformerDebug(ctx context.Context, client crdclientset.Interface, controllerLeader election.SpiderLeaseElector) error {
if controllerLeader == nil {
return fmt.Errorf("failed to start SpiderIPPool informer, controller leader must be specified")
}

informerLogger.Info("try to register SpiderIPPool informer")
go func() {
for {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ippoolmanager/ippool_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
clientSetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -212,6 +213,7 @@ type poolController struct {
func newController() *poolController {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme)
fakeclientSetClient := clientSetfake.NewSimpleClientset()

poolControllerConfig = IPPoolControllerConfig{
IPPoolControllerWorkers: 3,
Expand All @@ -223,7 +225,7 @@ func newController() *poolController {
ResyncPeriod: 10 * time.Second,
}

pController := NewIPPoolController(poolControllerConfig, fakeClient, fakeDynamicClient)
pController := NewIPPoolController(poolControllerConfig, fakeClient, fakeDynamicClient, fakeclientSetClient)

return &poolController{
IPPoolController: pController,
Expand Down
Loading

0 comments on commit f88c31c

Please sign in to comment.