From 550b349dc747b50e5915496258f6d4cf457f394a Mon Sep 17 00:00:00 2001 From: husharp Date: Fri, 25 Aug 2023 09:40:17 +0800 Subject: [PATCH] add retry Signed-off-by: husharp --- client/base_client.go | 34 ++++++++++++++++++++++++++- client/keyspace_client.go | 2 +- server/cluster/cluster.go | 1 + server/server.go | 7 +++++- tests/client/client_test.go | 47 +++++++++++++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 3 deletions(-) diff --git a/client/base_client.go b/client/base_client.go index 26c4505c6083..98044c1721e8 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -130,11 +130,20 @@ func (c *baseClient) memberLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() + + // max retry backoff time is 1 * Second + // recalculate backoff time: (2+4+8+10*n)*100 = 1.4+1*n(s) + backOffBaseTime := 100 * time.Millisecond + backOffTime := backOffBaseTime + for { select { case <-c.checkLeaderCh: - case <-time.After(memberUpdateInterval): + case <-ticker.C: case <-ctx.Done(): + log.Info("[pd.reconnectLoop] exit reconnectLoop") return } failpoint.Inject("skipUpdateMember", func() { @@ -142,10 +151,33 @@ func (c *baseClient) memberLoop() { }) if err := c.updateMember(); err != nil { log.Error("[pd] failed updateMember", errs.ZapError(err)) + backOffTime *= 2 + if backOffTime > updateMemberTimeout { + backOffTime = updateMemberTimeout + } + select { + case <-time.After(backOffTime): + failpoint.Inject("backOffExecute", func() { + testBackOffExecuteFlag = true + }) + case <-ctx.Done(): + log.Info("[pd.reconnectLoop] exit backOff") + return + } + } else { + backOffTime = backOffBaseTime } } } +// Only used for test. +var testBackOffExecuteFlag = false + +// TestBackOffExecute Only used for test. +func (c *baseClient) TestBackOffExecute() bool { + return testBackOffExecuteFlag +} + // ScheduleCheckLeader is used to check leader. func (c *baseClient) ScheduleCheckLeader() { select { diff --git a/client/keyspace_client.go b/client/keyspace_client.go index a469a45b00e4..f8a0dd04bb31 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -16,7 +16,6 @@ package pd import ( "context" - "go.uber.org/zap" "time" "github.com/opentracing/opentracing-go" @@ -24,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/tikv/pd/client/grpcutil" + "go.uber.org/zap" "google.golang.org/grpc" ) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6d557bc5a7c8..d94b2055f495 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -288,6 +288,7 @@ func (c *RaftCluster) Start(s Server) error { go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() go c.runStatsBackgroundJobs() + go c.syncRegions() go c.runReplicationMode() go c.runMinResolvedTSJob() go c.runSyncConfig() diff --git a/server/server.go b/server/server.go index 2ab40799b65f..fcfba7efcaa6 100644 --- a/server/server.go +++ b/server/server.go @@ -1427,9 +1427,14 @@ func (s *Server) leaderLoop() { } // Check the cluster dc-location after the PD leader is elected go s.tsoAllocatorManager.ClusterDCLocationChecker() + syncer := s.cluster.GetRegionSyncer() + if s.persistOptions.IsUseRegionStorage() { + syncer.StartSyncWithLeader(leader.GetClientUrls()[0]) + } log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader)) // WatchLeader will keep looping and never return unless the PD leader has changed. s.member.WatchLeader(s.serverLoopCtx, leader, rev) + syncer.StopSyncWithLeader() log.Info("pd leader has changed, try to re-campaign a pd leader") } @@ -1439,7 +1444,7 @@ func (s *Server) leaderLoop() { if s.member.GetLeader() == nil { lastUpdated := s.member.GetLastLeaderUpdatedTime() // use random timeout to avoid leader campaigning storm. - randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration + randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration // add failpoint to test the campaign leader logic. failpoint.Inject("timeoutWaitPDLeader", func() { log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader") diff --git a/tests/client/client_test.go b/tests/client/client_test.go index cde7ee3d7fcd..81c64af358ee 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -61,6 +61,7 @@ type client interface { ScheduleCheckLeader() GetURLs() []string GetAllocatorLeaderURLs() map[string]string + TestBackOffExecute() bool } func TestClientClusterIDCheck(t *testing.T) { @@ -1415,3 +1416,49 @@ func (suite *clientTestSuite) TestScatterRegion() { resp.GetStatus() == pdpb.OperatorStatus_RUNNING }, testutil.WithTickInterval(time.Second)) } + +func (suite *clientTestSuite) TestRetryMemberUpdate() { + re := suite.Require() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3) + re.NoError(err) + defer cluster.Destroy() + + endpoints := runServer(re, cluster) + cli := setupCli(re, ctx, endpoints) + defer cli.Close() + + leader := cluster.GetLeader() + waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) + memberID := cluster.GetServer(leader).GetLeader().GetMemberId() + + re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`)) + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/backOffExecute", `return(true)`)) + leader2 := waitLeaderChange(re, cluster, leader, cli.(client)) + re.True(cli.(client).TestBackOffExecute()) + + re.NotEqual(leader, leader2) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/backOffExecute")) +} + +func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli client) string { + var leader string + testutil.Eventually(re, func() bool { + cli.ScheduleCheckLeader() + leader = cluster.GetLeader() + if leader == old || leader == "" { + return false + } + return true + }) + return leader +}