Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 25, 2024
1 parent 4c7f8ac commit 36b5a82
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 151 deletions.
21 changes: 11 additions & 10 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,18 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh
return leadership
}

// getLease gets the lease of leadership, only if leadership is valid,
// GetLease gets the lease of leadership, only if leadership is valid,
// i.e. the owner is a true leader, the lease is not nil.
func (ls *Leadership) getLease() *lease {
func (ls *Leadership) GetLease() *Lease {
l := ls.lease.Load()
if l == nil {
return nil
}
return l.(*lease)
return l.(*Lease)
}

func (ls *Leadership) setLease(lease *lease) {
// SetLease sets the lease of leadership.
func (ls *Leadership) SetLease(lease *Lease) {
ls.lease.Store(lease)
}

Expand Down Expand Up @@ -165,12 +166,12 @@ func (ls *Leadership) AddCampaignTimes() {
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
newLease := &Lease{
Purpose: ls.purpose,
client: ls.client,
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)
ls.SetLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
name, ok := val.(string)
Expand Down Expand Up @@ -218,12 +219,12 @@ func (ls *Leadership) Keep(ctx context.Context) {
ls.keepAliveCancelFuncLock.Lock()
ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx)
ls.keepAliveCancelFuncLock.Unlock()
go ls.getLease().KeepAlive(ls.keepAliveCtx)
go ls.GetLease().KeepAlive(ls.keepAliveCtx)
}

// Check returns whether the leadership is still available.
func (ls *Leadership) Check() bool {
return ls != nil && ls.getLease() != nil && !ls.getLease().IsExpired()
return ls != nil && ls.GetLease() != nil && !ls.GetLease().IsExpired()
}

// LeaderTxn returns txn() with a leader comparison to guarantee that
Expand Down Expand Up @@ -410,14 +411,14 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

// Reset does some defer jobs such as closing lease, resetting lease etc.
func (ls *Leadership) Reset() {
if ls == nil || ls.getLease() == nil {
if ls == nil || ls.GetLease() == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
if ls.keepAliveCancelFunc != nil {
ls.keepAliveCancelFunc()
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
ls.GetLease().Close()
ls.SetPrimaryWatch(false)
}
4 changes: 2 additions & 2 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TestLeadership(t *testing.T) {
leadership2.Keep(ctx)

// Check the lease.
lease1 := leadership1.getLease()
lease1 := leadership1.GetLease()
re.NotNil(lease1)
lease2 := leadership2.getLease()
lease2 := leadership2.GetLease()
re.NotNil(lease2)

re.True(lease1.IsExpired())
Expand Down
22 changes: 15 additions & 7 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
slowRequestTime = etcdutil.DefaultSlowRequestTime
)

// lease is used as the low-level mechanism for campaigning and renewing elected leadership.
// Lease is used as the low-level mechanism for campaigning and renewing elected leadership.
// The way to gain and maintain leadership is to update and keep the lease alive continuously.
type lease struct {
type Lease struct {
// purpose is used to show what this election for
Purpose string
// etcd client and lease
Expand All @@ -48,8 +48,16 @@ type lease struct {
expireTime atomic.Value
}

func NewLease(client *clientv3.Client, purpose string) *Lease {
return &Lease{
Purpose: purpose,
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *lease) Grant(leaseTimeout int64) error {
func (l *Lease) Grant(leaseTimeout int64) error {
if l == nil {
return errs.ErrEtcdGrantLease.GenWithStackByCause("lease is nil")
}
Expand All @@ -71,7 +79,7 @@ func (l *lease) Grant(leaseTimeout int64) error {
}

// Close releases the lease.
func (l *lease) Close() error {
func (l *Lease) Close() error {
if l == nil {
return nil
}
Expand All @@ -92,15 +100,15 @@ func (l *lease) Close() error {

// IsExpired checks if the lease is expired. If it returns true,
// current leader should step down and try to re-elect again.
func (l *lease) IsExpired() bool {
func (l *Lease) IsExpired() bool {
if l == nil || l.expireTime.Load() == nil {
return true
}
return time.Now().After(l.expireTime.Load().(time.Time))
}

// KeepAlive auto renews the lease and update expireTime.
func (l *lease) KeepAlive(ctx context.Context) {
func (l *Lease) KeepAlive(ctx context.Context) {
defer logutil.LogPanic()

if l == nil {
Expand Down Expand Up @@ -146,7 +154,7 @@ func (l *lease) KeepAlive(ctx context.Context) {
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
func (l *Lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
ch := make(chan time.Time)

go func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/election/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func TestLease(t *testing.T) {
defer clean()

// Create the lease.
lease1 := &lease{
lease1 := &Lease{
Purpose: "test_lease_1",
client: client,
lease: clientv3.NewLease(client),
}
lease2 := &lease{
lease2 := &Lease{
Purpose: "test_lease_2",
client: client,
lease: clientv3.NewLease(client),
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestLeaseKeepAlive(t *testing.T) {
defer clean()

// Create the lease.
lease := &lease{
lease := &Lease{
Purpose: "test_lease",
client: client,
lease: clientv3.NewLease(client),
Expand Down
21 changes: 7 additions & 14 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,22 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
return errors.Errorf("failed to get cluster ID: %v", err)
}

var primaryKey string
var primaryPath string
switch serviceName {
case utils.SchedulingServiceName:
primaryKey = endpoint.SchedulingPrimaryPath(clusterID)
primaryPath = endpoint.SchedulingPrimaryPath(clusterID)
case utils.TSOServiceName:
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}

// remove possible residual value.
utils.ClearPrimaryExpectationFlag(client, primaryKey)

// grant the primary lease to the new primary.
grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease)
if err != nil {
return errors.Errorf("failed to grant lease for %s, err: %v", serviceName, err)
return errors.Errorf("failed to grant lease for expected primary, err: %v", err)
}
// update primary key to notify old primary server.
putResp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(primaryKey, primaryIDs[nextPrimaryID], clientv3.WithLease(grantResp.ID))).
Commit()
if err != nil || !putResp.Succeeded {
return errors.Errorf("failed to write primary flag for %s, err: %v", serviceName, err)
_, err = utils.MarkExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID)
if err != nil {
return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err)
}
return nil
}
83 changes: 46 additions & 37 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
Expand All @@ -56,12 +57,12 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -306,13 +307,19 @@ func (s *Server) campaignLeader() {
cb()
}
}()
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
expectedLease, revision, err := s.keepExpectedPrimaryAlive(ctx)
if err != nil {
log.Error("prepare primary watch error", errs.ZapError(err))
return
}
go s.expectedPrimaryWatch(ctx, expectedLease, revision+1, exitPrimary)
s.participant.EnableLeader()

member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

exitPrimary := make(chan struct{})
go s.primaryWatch(ctx, exitPrimary)

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
defer leaderTicker.Stop()

Expand All @@ -334,42 +341,44 @@ func (s *Server) campaignLeader() {
}
}

// primaryWatch watches `/ms/primary/transfer` API whether changed the primary.
// 1. modify the expected primary flag to the new primary
// 2. modify memory status
// 3. exit the primary watch loop
// 4. delete the leader key
func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
resp, err := etcdutil.EtcdKVGet(s.participant.GetLeadership().GetClient(), s.participant.GetLeaderPath())
if err != nil || resp == nil || len(resp.Kvs) == 0 {
log.Error("scheduling primary getting the primary meets error", errs.ZapError(err))
return
// keepExpectedPrimaryAlive keeps the expected primary alive.
// We use lease to keep `expected primary` healthy.
// ONLY reset by the following conditions:
// - changed by`/ms/primary/transfer` API.
// - server closed.
func (s *Server) keepExpectedPrimaryAlive(ctx context.Context) (*election.Leadership, int64, error) {
const propose = "scheduling-primary-watch"
lease := election.NewLease(s.GetClient(), propose)
if err := lease.Grant(s.cfg.LeaderLease); err != nil {
log.Error("grant lease for expected primary error", errs.ZapError(err))
return nil, 0, err
}
log.Info("scheduling primary start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader()))
// Watch will keep looping and never return unless the primary has changed.
s.participant.GetLeadership().SetPrimaryWatch(true)
s.participant.GetLeadership().Watch(ctx, resp.Kvs[0].ModRevision+1)
s.participant.GetLeadership().SetPrimaryWatch(false)

// only `/ms/primary/transfer` API update primary will set `leaderPath` to the expected primary.
curPrimary, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath())
revision, err := utils.MarkExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath(), s.participant.MemberValue(),
lease.ID.Load().(clientv3.LeaseID))
if err != nil {
log.Error("scheduling primary getting the leader meets error", errs.ZapError(err))
log.Error("mark expected primary error", errs.ZapError(err))
return nil, 0, err
}
// Keep alive the current primary leadership to indicate that the server is still alive.
// Watch the expected primary path to check whether the expected primary has changed.
expectedPrimary := election.NewLeadership(s.GetClient(), utils.ExpectedPrimaryPath(s.participant.GetLeaderPath()), propose)
expectedPrimary.SetLease(lease)
expectedPrimary.Keep(ctx)
return expectedPrimary, revision, nil
}

// expectedPrimaryWatch watches `/ms/primary/transfer` API whether changed the expected primary.
func (s *Server) expectedPrimaryWatch(ctx context.Context, expectedPrimary *election.Leadership, revision int64, exitPrimary chan struct{}) {
log.Info("scheduling primary start to watch the expected primary", zap.String("scheduling-primary", s.participant.MemberValue()))
expectedPrimary.SetPrimaryWatch(true)
expectedPrimary.Watch(ctx, revision)
expectedPrimary.Reset()
defer log.Info("scheduling primary exit the expected primary watch loop")
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
if curPrimary != nil && resp.Kvs[0].Value != nil && !strings.Contains(string(resp.Kvs[0].Value), string(curPrimary)) {
// 1. modify the expected primary flag to the new primary.
utils.MarkExpectedPrimaryFlag(s.participant.Client(), s.participant.GetLeaderPath())
// 2. modify memory status.
s.participant.UnsetLeader()
defer log.Info("scheduling primary exit the primary watch loop")
select {
case <-ctx.Done():
return
// 3. exit the primary watch loop, 4.`exitPrimary` will help delete the leader key.
case exitPrimary <- struct{}{}:
return
}
}
}

Expand Down
Loading

0 comments on commit 36b5a82

Please sign in to comment.