diff --git a/client/base_client.go b/client/base_client.go index 26c4505c608..a5da38dd15c 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/tlsutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -130,18 +131,23 @@ func (c *baseClient) memberLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() + + bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout) for { select { case <-c.checkLeaderCh: - case <-time.After(memberUpdateInterval): + case <-ticker.C: case <-ctx.Done(): + log.Info("[pd] exit member loop due to context canceled") return } failpoint.Inject("skipUpdateMember", func() { failpoint.Continue() }) - if err := c.updateMember(); err != nil { - log.Error("[pd] failed updateMember", errs.ZapError(err)) + if err := bo.Exec(ctx, c.updateMember); err != nil { + log.Error("[pd] failed update member with retry", errs.ZapError(err)) } } } diff --git a/client/client.go b/client/client.go index b7e15fe6eb2..cb4d3a1ba93 100644 --- a/client/client.go +++ b/client/client.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/retry" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -325,12 +326,13 @@ type lastTSO struct { } const ( - dialTimeout = 3 * time.Second - updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. - tsLoopDCCheckInterval = time.Minute - defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst - retryInterval = 500 * time.Millisecond - maxRetryTimes = 6 + dialTimeout = 3 * time.Second + updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. + tsLoopDCCheckInterval = time.Minute + defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst + retryInterval = 500 * time.Millisecond + maxRetryTimes = 6 + updateMemberBackOffBaseTime = 100 * time.Millisecond ) // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. @@ -765,6 +767,7 @@ func (c *client) handleDispatcher( // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) + bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout) tsoBatchLoop: for { select { @@ -861,7 +864,7 @@ tsoBatchLoop: stream = nil // Because ScheduleCheckLeader is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := c.updateMember(); err != nil { + if err := bo.Exec(dispatcherCtx, c.updateMember); err != nil { select { case <-dispatcherCtx.Done(): return @@ -885,7 +888,7 @@ func (c *client) allowTSOFollowerProxy(dc string) bool { } // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. -// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. +// connectionCtxs will only have one stream to choose when the TSO Follower Proxy is off. func (c *client) chooseStream(connectionCtxs *sync.Map) (connectionCtx *connectionContext) { idx := 0 connectionCtxs.Range(func(addr, cc interface{}) bool { diff --git a/client/keyspace_client.go b/client/keyspace_client.go index a469a45b00e..f8a0dd04bb3 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -16,7 +16,6 @@ package pd import ( "context" - "go.uber.org/zap" "time" "github.com/opentracing/opentracing-go" @@ -24,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/tikv/pd/client/grpcutil" + "go.uber.org/zap" "google.golang.org/grpc" ) diff --git a/client/retry/backoff.go b/client/retry/backoff.go new file mode 100644 index 00000000000..e2ca9ab3972 --- /dev/null +++ b/client/retry/backoff.go @@ -0,0 +1,86 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "time" + + "github.com/pingcap/failpoint" +) + +// BackOffer is a backoff policy for retrying operations. +type BackOffer struct { + max time.Duration + next time.Duration + base time.Duration +} + +// Exec is a helper function to exec backoff. +func (bo *BackOffer) Exec( + ctx context.Context, + fn func() error, +) error { + if err := fn(); err != nil { + select { + case <-ctx.Done(): + case <-time.After(bo.nextInterval()): + failpoint.Inject("backOffExecute", func() { + testBackOffExecuteFlag = true + }) + } + return err + } + // reset backoff when fn() succeed. + bo.resetBackoff() + return nil +} + +// InitialBackOffer make the initial state for retrying. +func InitialBackOffer(base, max time.Duration) BackOffer { + return BackOffer{ + max: max, + base: base, + next: base, + } +} + +// nextInterval for now use the `exponentialInterval`. +func (bo *BackOffer) nextInterval() time.Duration { + return bo.exponentialInterval() +} + +// exponentialInterval returns the exponential backoff duration. +func (bo *BackOffer) exponentialInterval() time.Duration { + backoffInterval := bo.next + bo.next *= 2 + if bo.next > bo.max { + bo.next = bo.max + } + return backoffInterval +} + +// resetBackoff resets the backoff to initial state. +func (bo *BackOffer) resetBackoff() { + bo.next = bo.base +} + +// Only used for test. +var testBackOffExecuteFlag = false + +// TestBackOffExecute Only used for test. +func TestBackOffExecute() bool { + return testBackOffExecuteFlag +} diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go new file mode 100644 index 00000000000..c6feb15bce7 --- /dev/null +++ b/client/retry/backoff_test.go @@ -0,0 +1,47 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/stretchr/testify/require" +) + +func TestExponentialBackoff(t *testing.T) { + re := require.New(t) + + baseBackoff := 100 * time.Millisecond + maxBackoff := 1 * time.Second + + backoff := InitialBackOffer(baseBackoff, maxBackoff) + re.Equal(backoff.nextInterval(), baseBackoff) + re.Equal(backoff.nextInterval(), 2*baseBackoff) + + for i := 0; i < 10; i++ { + re.LessOrEqual(backoff.nextInterval(), maxBackoff) + } + re.Equal(backoff.nextInterval(), maxBackoff) + + // Reset backoff + backoff.resetBackoff() + err := backoff.Exec(context.Background(), func() error { + return errors.New("test") + }) + re.Error(err) +} diff --git a/server/server.go b/server/server.go index ee0359b9f74..fcfba7efcaa 100644 --- a/server/server.go +++ b/server/server.go @@ -1444,7 +1444,7 @@ func (s *Server) leaderLoop() { if s.member.GetLeader() == nil { lastUpdated := s.member.GetLastLeaderUpdatedTime() // use random timeout to avoid leader campaigning storm. - randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration + randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration // add failpoint to test the campaign leader logic. failpoint.Inject("timeoutWaitPDLeader", func() { log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader") diff --git a/tests/client/client_test.go b/tests/client/client_test.go index cde7ee3d7fc..5cc068a8c20 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/testutil" @@ -1415,3 +1416,49 @@ func (suite *clientTestSuite) TestScatterRegion() { resp.GetStatus() == pdpb.OperatorStatus_RUNNING }, testutil.WithTickInterval(time.Second)) } + +func (suite *clientTestSuite) TestMemberUpdateBackOff() { + re := suite.Require() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3) + re.NoError(err) + defer cluster.Destroy() + + endpoints := runServer(re, cluster) + cli := setupCli(re, ctx, endpoints) + defer cli.Close() + + leader := cluster.GetLeader() + waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) + memberID := cluster.GetServer(leader).GetLeader().GetMemberId() + + re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID))) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`)) + // make sure back off executed. + re.NoError(failpoint.Enable("github.com/tikv/pd/client/retry/backOffExecute", `return(true)`)) + leader2 := waitLeaderChange(re, cluster, leader, cli.(client)) + re.True(retry.TestBackOffExecute()) + + re.NotEqual(leader, leader2) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/retry/backOffExecute")) +} + +func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli client) string { + var leader string + testutil.Eventually(re, func() bool { + cli.ScheduleCheckLeader() + leader = cluster.GetLeader() + if leader == old || leader == "" { + return false + } + return true + }) + return leader +} diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 384261f484a..1fc69c89e79 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -397,22 +397,20 @@ func TestScheduler(t *testing.T) { pdctl.MustPutStore(re, leaderServer.GetServer(), store) } re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) // After upgrading, we should not use query. - expected1["read-priorities"] = []interface{}{"query", "byte"} - re.NotEqual(expected1, conf1) - expected1["read-priorities"] = []interface{}{"key", "byte"} - re.Equal(expected1, conf1) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) // cannot set qps as write-peer-priorities echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) // test remove and add - mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) - re.Equal(expected1, conf1) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") // test balance leader config conf = make(map[string]interface{})