Skip to content

Commit

Permalink
pkg/member: Fixing residual counts in campaign times (tikv#8226) (tik…
Browse files Browse the repository at this point in the history
…v#8230)

close tikv#8225

Signed-off-by: husharp <[email protected]>

Co-authored-by: husharp <[email protected]>
  • Loading branch information
ti-chi-bot and HuSharp authored May 30, 2024
1 parent 634e05a commit 23f8a7c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
15 changes: 8 additions & 7 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (
)

const (
defaultCampaignTimesSlot = 10
watchLoopUnhealthyTimeout = 60 * time.Second
campaignTimesRecordTimeout = 5 * time.Minute
defaultCampaignTimesSlot = 10
watchLoopUnhealthyTimeout = 60 * time.Second
)

var campaignTimesRecordTimeout = 5 * time.Minute

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -114,6 +115,7 @@ func (ls *Leadership) GetLeaderKey() string {
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
// Need to make sure `AddCampaignTimes` is called before this function.
func (ls *Leadership) GetCampaignTimesNum() int {
if ls == nil {
return 0
Expand All @@ -129,16 +131,16 @@ func (ls *Leadership) ResetCampaignTimes() {
ls.campaignTimes = make([]time.Time, 0, defaultCampaignTimesSlot)
}

// addCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) addCampaignTimes() {
// AddCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) AddCampaignTimes() {
if ls == nil {
return
}
for i := len(ls.campaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.campaignTimes[i]) > campaignTimesRecordTimeout {
// remove the time which is more than `campaignTimesRecordTimeout`
// array is sorted by time
ls.campaignTimes = ls.campaignTimes[i:]
ls.campaignTimes = ls.campaignTimes[i+1:]
break
}
}
Expand All @@ -148,7 +150,6 @@ func (ls *Leadership) addCampaignTimes() {

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.addCampaignTimes()
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Expand Down
33 changes: 33 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,36 @@ func TestRequestProgress(t *testing.T) {
checkWatcherRequestProgress(false)
checkWatcherRequestProgress(true)
}

func TestCampaignTimes(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
leadership := NewLeadership(client, "test_leader", "test_leader")

// all the campaign times are within the timeout.
campaignTimesRecordTimeout = 10 * time.Second
defer func() {
campaignTimesRecordTimeout = 5 * time.Minute
}()
for i := 0; i < 3; i++ {
leadership.AddCampaignTimes()
time.Sleep(100 * time.Millisecond)
}
re.Equal(3, leadership.GetCampaignTimesNum())

// only the last 2 records are valid.
campaignTimesRecordTimeout = 200 * time.Millisecond
for i := 0; i < 3; i++ {
leadership.AddCampaignTimes()
time.Sleep(100 * time.Millisecond)
}
re.Equal(2, leadership.GetCampaignTimesNum())

time.Sleep(200 * time.Millisecond)
// need to wait for the next addCampaignTimes to update the campaign time.
re.Equal(2, leadership.GetCampaignTimesNum())
// check campaign leader frequency.
leadership.AddCampaignTimes()
re.Equal(1, leadership.GetCampaignTimesNum())
}
3 changes: 2 additions & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
// and make it become a PD leader.
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
m.leadership.AddCampaignTimes()
failpoint.Inject("skipCampaignLeaderCheck", func() {
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
})
if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
if m.leadership.GetCampaignTimesNum() > campaignLeaderFrequencyTimes {
log.Warn("campaign times is too frequent, resign and campaign again",
zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
Expand Down
10 changes: 8 additions & 2 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,20 +327,26 @@ func TestCampaignLeaderFrequently(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
cluster, err := tests.NewTestCluster(ctx, 3)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
// the 1st time campaign leader.
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())

for i := 0; i < 3; i++ {
// need to prevent 3 times(including the above 1st time) campaign leader in 5 min.
for i := 0; i < 2; i++ {
cluster.GetServers()[cluster.GetLeader()].ResetPDLeader()
cluster.WaitLeader()
re.Equal(leader, cluster.GetLeader())
}
// check for the 4th time.
cluster.GetLeaderServer().ResetPDLeader()
cluster.WaitLeader()
// PD leader should be different from before because etcd leader changed.
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
Expand Down

0 comments on commit 23f8a7c

Please sign in to comment.