Skip to content

Commit

Permalink
add retry
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 25, 2023
1 parent 3199dfe commit 550b349
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 3 deletions.
34 changes: 33 additions & 1 deletion client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,54 @@ func (c *baseClient) memberLoop() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

// max retry backoff time is 1 * Second
// recalculate backoff time: (2+4+8+10*n)*100 = 1.4+1*n(s)
backOffBaseTime := 100 * time.Millisecond
backOffTime := backOffBaseTime

for {
select {
case <-c.checkLeaderCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:
case <-ctx.Done():
log.Info("[pd.reconnectLoop] exit reconnectLoop")
return
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
backOffTime *= 2
if backOffTime > updateMemberTimeout {
backOffTime = updateMemberTimeout
}
select {
case <-time.After(backOffTime):
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
case <-ctx.Done():
log.Info("[pd.reconnectLoop] exit backOff")
return
}
} else {
backOffTime = backOffBaseTime
}
}
}

// Only used for test.
var testBackOffExecuteFlag = false

// TestBackOffExecute Only used for test.
func (c *baseClient) TestBackOffExecute() bool {
return testBackOffExecuteFlag
}

// ScheduleCheckLeader is used to check leader.
func (c *baseClient) ScheduleCheckLeader() {
select {
Expand Down
2 changes: 1 addition & 1 deletion client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package pd

import (
"context"
"go.uber.org/zap"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down
1 change: 1 addition & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (c *RaftCluster) Start(s Server) error {
go c.runMetricsCollectionJob()
go c.runNodeStateCheckJob()
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
go c.runMinResolvedTSJob()
go c.runSyncConfig()
Expand Down
7 changes: 6 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,9 +1427,14 @@ func (s *Server) leaderLoop() {
}
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()
syncer := s.cluster.GetRegionSyncer()
if s.persistOptions.IsUseRegionStorage() {
syncer.StartSyncWithLeader(leader.GetClientUrls()[0])
}
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
// WatchLeader will keep looping and never return unless the PD leader has changed.
s.member.WatchLeader(s.serverLoopCtx, leader, rev)
syncer.StopSyncWithLeader()
log.Info("pd leader has changed, try to re-campaign a pd leader")
}

Expand All @@ -1439,7 +1444,7 @@ func (s *Server) leaderLoop() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
Expand Down
47 changes: 47 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type client interface {
ScheduleCheckLeader()
GetURLs() []string
GetAllocatorLeaderURLs() map[string]string
TestBackOffExecute() bool
}

func TestClientClusterIDCheck(t *testing.T) {
Expand Down Expand Up @@ -1415,3 +1416,49 @@ func (suite *clientTestSuite) TestScatterRegion() {
resp.GetStatus() == pdpb.OperatorStatus_RUNNING
}, testutil.WithTickInterval(time.Second))
}

func (suite *clientTestSuite) TestRetryMemberUpdate() {
re := suite.Require()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()

leader := cluster.GetLeader()
waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)
memberID := cluster.GetServer(leader).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))

re.NoError(failpoint.Enable("github.com/tikv/pd/client/backOffExecute", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader, cli.(client))
re.True(cli.(client).TestBackOffExecute())

re.NotEqual(leader, leader2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/backOffExecute"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli client) string {
var leader string
testutil.Eventually(re, func() bool {
cli.ScheduleCheckLeader()
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
}
return true
})
return leader
}

0 comments on commit 550b349

Please sign in to comment.