diff --git a/client/base_client.go b/client/base_client.go old mode 100644 new mode 100755 index 26c4505c6083..46d5fe3c7c34 --- a/client/base_client.go +++ b/client/base_client.go @@ -29,9 +29,11 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/tlsutil" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" ) const ( @@ -64,6 +66,10 @@ type baseClient struct { // Client option. option *option + + successReConnect chan struct{} + + bo *retry.Backoffer } // SecurityOption records options about tls @@ -88,6 +94,8 @@ func newBaseClient(ctx context.Context, urls []string, security SecurityOption) cancel: clientCancel, security: security, option: newOption(), + bo: retry.NewBackoffer(clientCtx, maxRetryTimes), + successReConnect: make(chan struct{}, 1), } bc.urls.Store(urls) return bc @@ -105,7 +113,7 @@ func (c *baseClient) init() error { log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) c.wg.Add(1) - go c.memberLoop() + go c.reconnectMemberLoop() return nil } @@ -124,32 +132,129 @@ func (c *baseClient) initRetry(f func() error) error { return errors.WithStack(err) } -func (c *baseClient) memberLoop() { +func (c *baseClient) reconnectMemberLoop() { defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() + + failpoint.Inject("acceleratedMemberUpdateInterval", func() { + ticker.Stop() + ticker = time.NewTicker(time.Millisecond * 100) + }) + for { select { case <-c.checkLeaderCh: - case <-time.After(memberUpdateInterval): + case <-ticker.C: case <-ctx.Done(): + log.Info("[pd.reconnectLoop] exit reconnectLoop") return } + failpoint.Inject("skipUpdateMember", func() { failpoint.Continue() }) + if err := c.updateMember(); err != nil { - log.Error("[pd] failed updateMember", errs.ZapError(err)) + log.Error("[pd.reconnectLoop] failed updateMember", errs.ZapError(err)) + } else { + c.SuccessReconnect() } } } +func (c *baseClient) waitForReady() error { + if e1 := c.waitForLeaderReady(); e1 != nil { + log.Error("[pd.waitForReady] wait for leader ready failed", errs.ZapError(e1)) + } else if e2 := c.loadMembers(); e2 != nil { + log.Error("[pd.waitForReady] load members failed", errs.ZapError(e2)) + } else { + return nil + } + + deadline := time.Now().Add(requestTimeout) + failpoint.Inject("acceleratedRequestTimeout", func() { + deadline = time.Now().Add(500 * time.Millisecond) + }) + for { + select { + case <-c.successReConnect: + return nil + case <-time.After(time.Until(deadline)): + log.Error("[pd.waitForReady] timeout") + return errors.New("wait for ready timeout") + } + } +} + +// waitForLeaderReady waits for the leader to be ready. +func (c *baseClient) waitForLeaderReady() error { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for { + old, ok := c.clientConns.Load(c.GetLeaderAddr()) + if !ok { + return errors.New("no leader") + } + cc := old.(*grpc.ClientConn) + + s := cc.GetState() + if s == connectivity.Ready { + return nil + } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + return ctx.Err() + } + } +} + +func (c *baseClient) loadMembers() error { + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + members, err := c.getMembers(ctx, c.GetLeaderAddr(), updateMemberTimeout) + if err != nil { + log.Warn("[pd.loadMembers] failed to load members ", zap.String("url", c.GetLeaderAddr()), errs.ZapError(err)) + return errors.WithStack(err) + } else if members.GetHeader() == nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { + err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist") + log.Warn("[pd.loadMembers] leader address don't exist. ", zap.String("url", c.GetLeaderAddr()), errs.ZapError(err)) + return errors.WithStack(err) + } + + return nil +} + +func (c *baseClient) SuccessReconnect() { + select { + case c.successReConnect <- struct{}{}: + default: + } +} + // ScheduleCheckLeader is used to check leader. func (c *baseClient) ScheduleCheckLeader() { select { case c.checkLeaderCh <- struct{}{}: + if err := c.waitForReady(); err != nil { + // If backoff times count is greater than 10, reset it. + if c.bo.GetBackoffTimeCnt(retry.BoMemberUpdate.String()) >= 10 { + c.bo.Reset() + } + e := c.bo.Backoff(retry.BoMemberUpdate, err) + + if e != nil { + log.Error("[pd] wait for ready backoff failed", errs.ZapError(e)) + return + } + log.Error("[pd] wait for ready failed", errs.ZapError(err)) + } default: } } @@ -454,3 +559,9 @@ func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) c.clientConns.Store(addr, cc) return cc, nil } + +// GetBackoffTimes returns a map contains backoff time count by type. +// For test purpose +func (c *baseClient) GetBackoffTimes() map[string]int { + return c.bo.GetBackoffTimes() +} diff --git a/client/client.go b/client/client.go old mode 100644 new mode 100755 index b7e15fe6eb23..ed8841d94065 --- a/client/client.go +++ b/client/client.go @@ -331,6 +331,7 @@ const ( defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst retryInterval = 500 * time.Millisecond maxRetryTimes = 6 + requestTimeout = 2 * time.Second ) // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. diff --git a/client/go.mod b/client/go.mod index c8055548f238..97fdb847a8b8 100644 --- a/client/go.mod +++ b/client/go.mod @@ -9,6 +9,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.7.0 go.uber.org/goleak v1.1.11 diff --git a/client/keyspace_client.go b/client/keyspace_client.go index a469a45b00e4..f8a0dd04bb31 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -16,7 +16,6 @@ package pd import ( "context" - "go.uber.org/zap" "time" "github.com/opentracing/opentracing-go" @@ -24,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/tikv/pd/client/grpcutil" + "go.uber.org/zap" "google.golang.org/grpc" ) diff --git a/client/retry/backoff.go b/client/retry/backoff.go new file mode 100644 index 000000000000..3a467cbdf465 --- /dev/null +++ b/client/retry/backoff.go @@ -0,0 +1,166 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "fmt" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/log" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// Backoffer is a utility for retrying queries. +type Backoffer struct { + ctx context.Context + + fn map[string]backoffFn + maxSleep int + totalSleep int + + errors []error + configs []*Config + backoffSleepMS map[string]int + backoffTimes map[string]int +} + +// NewBackoffer (Deprecated) creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { + return &Backoffer{ + ctx: ctx, + maxSleep: maxSleep, + } +} + +// Backoff sleeps a while base on the Config and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) Backoff(cfg *Config, err error) error { + if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", cfg), opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(b.ctx, span1) + } + return b.BackoffWithCfgAndMaxSleep(cfg, -1, err) +} + +// BackoffWithCfgAndMaxSleep sleeps a while base on the Config and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error { + select { + case <-b.ctx.Done(): + return errors.WithStack(err) + default: + } + b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) + b.configs = append(b.configs, cfg) + + // Lazy initialize. + if b.fn == nil { + b.fn = make(map[string]backoffFn) + } + f, ok := b.fn[cfg.name] + if !ok { + f = cfg.createBackoffFn() + b.fn[cfg.name] = f + } + realSleep := f(b.ctx, maxSleepMs) + + b.totalSleep += realSleep + if b.backoffSleepMS == nil { + b.backoffSleepMS = make(map[string]int) + } + b.backoffSleepMS[cfg.name] += realSleep + if b.backoffTimes == nil { + b.backoffTimes = make(map[string]int) + } + b.backoffTimes[cfg.name]++ + + log.Debug("retry later", + zap.Error(err), + zap.Int("totalSleep", b.totalSleep), + zap.Int("maxSleep", b.maxSleep), + zap.Stringer("type", cfg)) + return nil +} + +func (b *Backoffer) String() string { + if b.totalSleep == 0 { + return "" + } + return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.configs) +} + +// GetTotalSleep returns total sleep time. +func (b *Backoffer) GetTotalSleep() int { + return b.totalSleep +} + +// GetCtx returns the bound context. +func (b *Backoffer) GetCtx() context.Context { + return b.ctx +} + +// SetCtx sets the bound context to ctx. +func (b *Backoffer) SetCtx(ctx context.Context) { + b.ctx = ctx +} + +// GetBackoffTimes returns a map contains backoff time count by type. +func (b *Backoffer) GetBackoffTimes() map[string]int { + return b.backoffTimes +} + +// GetBackoffTimeCnt returns backoff time count by specific type. +func (b *Backoffer) GetBackoffTimeCnt(s string) int { + return b.backoffTimes[s] +} + +// GetTotalBackoffTimes returns the total backoff times of the backoffer. +func (b *Backoffer) GetTotalBackoffTimes() int { + total := 0 + for _, t := range b.backoffTimes { + total += t + } + return total +} + +// GetBackoffSleepMS returns a map contains backoff sleep time by type. +func (b *Backoffer) GetBackoffSleepMS() map[string]int { + return b.backoffSleepMS +} + +// ErrorsNum returns the number of errors. +func (b *Backoffer) ErrorsNum() int { + return len(b.errors) +} + +// Reset resets the sleep state of the backoffer, so that following backoff +// can sleep shorter. The reason why we don't create a new backoffer is that +// backoffer is similar to context, and it records some metrics that we +// want to record for an entire process which is composed of serveral stages. +func (b *Backoffer) Reset() { + b.fn = nil + b.totalSleep = 0 +} + +// ResetMaxSleep resets the sleep state and max sleep limit of the backoffer. +// It's used when switches to the next stage of the process. +func (b *Backoffer) ResetMaxSleep(maxSleep int) { + b.Reset() + b.maxSleep = maxSleep +} diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go new file mode 100644 index 000000000000..2b9c943b864f --- /dev/null +++ b/client/retry/backoff_test.go @@ -0,0 +1,29 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBackoffErrorType(t *testing.T) { + b := NewBackoffer(context.TODO(), 800) + err := b.Backoff(BoMemberUpdate, errors.New("no leader")) // 100 ms + assert.Nil(t, err) +} diff --git a/client/retry/config.go b/client/retry/config.go new file mode 100644 index 000000000000..478187f6f188 --- /dev/null +++ b/client/retry/config.go @@ -0,0 +1,141 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "math/rand" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Config is the configuration of the Backoff function. +type Config struct { + name string + fnCfg *BackoffFnCfg + err error +} + +// Backoff Config variables +var ( + // BoMemberUpdate is for member change events. + BoMemberUpdate = NewConfig("memberUpdate", NewBackoffFnCfg(100, 2000, EqualJitter), nil) +) + +// backoffFn is the backoff function which compute the sleep time and do sleep. +type backoffFn func(ctx context.Context, maxSleepMs int) int + +func (c *Config) createBackoffFn() backoffFn { + return newBackoffFn(c.fnCfg.base, c.fnCfg.cap, c.fnCfg.jitter) +} + +// BackoffFnCfg is the configuration for the backoff func which implements exponential backoff with +// optional jitters. +// See http://www.awsarchitectureblog.com/2015/03/backoff.html +type BackoffFnCfg struct { + base int + cap int + jitter int +} + +// NewBackoffFnCfg creates the config for BackoffFn. +func NewBackoffFnCfg(base, cap, jitter int) *BackoffFnCfg { + return &BackoffFnCfg{ + base, + cap, + jitter, + } +} + +// NewConfig creates a new Config for the Backoff operation. +func NewConfig(name string, backoffFnCfg *BackoffFnCfg, err error) *Config { + return &Config{ + name: name, + fnCfg: backoffFnCfg, + err: err, + } +} + +func (c *Config) String() string { + return c.name +} + +// SetErrors sets a more detailed error instead of the default bo config. +func (c *Config) SetErrors(err error) { + c.err = err +} + +const ( + // NoJitter makes the backoff sequence strict exponential. + NoJitter = 1 + iota + // FullJitter applies random factors to strict exponential. + FullJitter + // EqualJitter is also randomized, but prevents very short sleeps. + EqualJitter + // DecorrJitter increases the maximum jitter based on the last random value. + DecorrJitter +) + +// newBackoffFn creates a backoff func which implements exponential backoff with +// optional jitters. +// See http://www.awsarchitectureblog.com/2015/03/backoff.html +func newBackoffFn(base, cap, jitter int) backoffFn { + if base < 2 { + // Top prevent panic in 'rand.Intn'. + base = 2 + } + attempts := 0 + lastSleep := base + return func(ctx context.Context, maxSleepMs int) int { + var sleep int + switch jitter { + case NoJitter: + sleep = expo(base, cap, attempts) + case FullJitter: + v := expo(base, cap, attempts) + sleep = rand.Intn(v) + case EqualJitter: + v := expo(base, cap, attempts) + sleep = v/2 + rand.Intn(v/2) + case DecorrJitter: + sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) + } + log.Debug("backoff", + zap.Int("base", base), + zap.Int("sleep", sleep), + zap.Int("attempts", attempts)) + + realSleep := sleep + // when set maxSleepMs >= 0 will force sleep maxSleepMs milliseconds. + if maxSleepMs >= 0 && realSleep > maxSleepMs { + realSleep = maxSleepMs + } + select { + case <-time.After(time.Duration(realSleep) * time.Millisecond): + attempts++ + lastSleep = sleep + return realSleep + case <-ctx.Done(): + return 0 + } + } +} + +func expo(base, cap, n int) int { + return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) +} diff --git a/pkg/testutil/leak.go b/pkg/testutil/leak.go index d1329aef0e60..1bc70855a3e3 100644 --- a/pkg/testutil/leak.go +++ b/pkg/testutil/leak.go @@ -28,4 +28,5 @@ var LeakOptions = []goleak.Option{ goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), // natefinch/lumberjack#56, It's a goroutine leak bug. Another ignore option PR https://github.com/pingcap/tidb/pull/27405/ goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6d557bc5a7c8..d94b2055f495 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -288,6 +288,7 @@ func (c *RaftCluster) Start(s Server) error { go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() go c.runStatsBackgroundJobs() + go c.syncRegions() go c.runReplicationMode() go c.runMinResolvedTSJob() go c.runSyncConfig() diff --git a/server/server.go b/server/server.go index 2ab40799b65f..fcfba7efcaa6 100644 --- a/server/server.go +++ b/server/server.go @@ -1427,9 +1427,14 @@ func (s *Server) leaderLoop() { } // Check the cluster dc-location after the PD leader is elected go s.tsoAllocatorManager.ClusterDCLocationChecker() + syncer := s.cluster.GetRegionSyncer() + if s.persistOptions.IsUseRegionStorage() { + syncer.StartSyncWithLeader(leader.GetClientUrls()[0]) + } log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader)) // WatchLeader will keep looping and never return unless the PD leader has changed. s.member.WatchLeader(s.serverLoopCtx, leader, rev) + syncer.StopSyncWithLeader() log.Info("pd leader has changed, try to re-campaign a pd leader") } @@ -1439,7 +1444,7 @@ func (s *Server) leaderLoop() { if s.member.GetLeader() == nil { lastUpdated := s.member.GetLastLeaderUpdatedTime() // use random timeout to avoid leader campaigning storm. - randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration + randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration // add failpoint to test the campaign leader logic. failpoint.Inject("timeoutWaitPDLeader", func() { log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader") diff --git a/tests/client/client_test.go b/tests/client/client_test.go index cde7ee3d7fcd..12563b1ff0bd 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/tikv/pd/client/retry" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -61,6 +63,7 @@ type client interface { ScheduleCheckLeader() GetURLs() []string GetAllocatorLeaderURLs() map[string]string + GetBackoffTimes() map[string]int } func TestClientClusterIDCheck(t *testing.T) { @@ -98,6 +101,10 @@ func TestClientClusterIDCheck(t *testing.T) { func TestClientLeaderChange(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`)) + defer func() { + failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval") + }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() cluster, err := tests.NewTestCluster(ctx, 3) @@ -350,6 +357,10 @@ func TestTSOFollowerProxy(t *testing.T) { // TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`)) + defer func() { + failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval") + }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() cluster, err := tests.NewTestCluster(ctx, 3) @@ -412,6 +423,10 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { func TestGlobalAndLocalTSO(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`)) + defer func() { + failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval") + }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() dcLocationConfig := map[string]string{ @@ -1415,3 +1430,50 @@ func (suite *clientTestSuite) TestScatterRegion() { resp.GetStatus() == pdpb.OperatorStatus_RUNNING }, testutil.WithTickInterval(time.Second)) } + +func (suite *clientTestSuite) TestRetryMemberUpdate() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedRequestTimeout", `return(true)`)) + defer func() { + failpoint.Disable("github.com/tikv/pd/client/acceleratedRequestTimeout") + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3) + re.NoError(err) + defer cluster.Destroy() + + endpoints := runServer(re, cluster) + cli := setupCli(re, ctx, endpoints) + + leader := cluster.GetLeader() + waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) + memberID := cluster.GetServer(leader).GetLeader().GetMemberId() + + re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`)) + + leader2 := waitLeaderChange(re, cluster, leader, cli.(client)) + re.NotEqual(leader, leader2) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader")) + + retryTimes := cli.(client).GetBackoffTimes()[retry.BoMemberUpdate.String()] + re.Greater(retryTimes, 0) +} + +func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli client) string { + var leader string + testutil.Eventually(re, func() bool { + cli.ScheduleCheckLeader() + leader = cluster.GetLeader() + if leader == old || leader == "" { + return false + } + return true + }) + return leader +}