Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support backoff mechanism for memberLoop #6978

Merged
merged 12 commits into from
Aug 29, 2023
12 changes: 9 additions & 3 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -130,18 +131,23 @@
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
for {
select {
case <-c.checkLeaderCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:

Check warning on line 141 in client/base_client.go

View check run for this annotation

Codecov / codecov/patch

client/base_client.go#L141

Added line #L141 was not covered by tests
case <-ctx.Done():
log.Info("[pd] exit member loop due to context canceled")
return
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed update member with retry", errs.ZapError(err))
}
}
}
Expand Down
19 changes: 11 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -325,12 +326,13 @@ type lastTSO struct {
}

const (
dialTimeout = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
dialTimeout = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
updateMemberBackOffBaseTime = 100 * time.Millisecond
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -765,6 +767,7 @@ func (c *client) handleDispatcher(

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
tsoBatchLoop:
for {
select {
Expand Down Expand Up @@ -861,7 +864,7 @@ tsoBatchLoop:
stream = nil
// Because ScheduleCheckLeader is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if err := c.updateMember(); err != nil {
if err := bo.Exec(dispatcherCtx, c.updateMember); err != nil {
select {
case <-dispatcherCtx.Done():
return
Expand All @@ -885,7 +888,7 @@ func (c *client) allowTSOFollowerProxy(dc string) bool {
}

// chooseStream uses the reservoir sampling algorithm to randomly choose a connection.
// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off.
// connectionCtxs will only have one stream to choose when the TSO Follower Proxy is off.
func (c *client) chooseStream(connectionCtxs *sync.Map) (connectionCtx *connectionContext) {
idx := 0
connectionCtxs.Range(func(addr, cc interface{}) bool {
Expand Down
2 changes: 1 addition & 1 deletion client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package pd

import (
"context"
"go.uber.org/zap"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down
86 changes: 86 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"time"

"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
base time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
}
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
bo.next *= 2
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
bo.next = bo.base
}

// Only used for test.
var testBackOffExecuteFlag = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to make sure back off executed


// TestBackOffExecute Only used for test.
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}
47 changes: 47 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

func TestExponentialBackoff(t *testing.T) {
re := require.New(t)

baseBackoff := 100 * time.Millisecond
maxBackoff := 1 * time.Second

backoff := InitialBackOffer(baseBackoff, maxBackoff)
re.Equal(backoff.nextInterval(), baseBackoff)
re.Equal(backoff.nextInterval(), 2*baseBackoff)

for i := 0; i < 10; i++ {
re.LessOrEqual(backoff.nextInterval(), maxBackoff)
}
re.Equal(backoff.nextInterval(), maxBackoff)

// Reset backoff
backoff.resetBackoff()
err := backoff.Exec(context.Background(), func() error {
return errors.New("test")
})
re.Error(err)
}
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ func (s *Server) leaderLoop() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
Expand Down
47 changes: 47 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/assertutil"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/testutil"
Expand Down Expand Up @@ -1415,3 +1416,49 @@ func (suite *clientTestSuite) TestScatterRegion() {
resp.GetStatus() == pdpb.OperatorStatus_RUNNING
}, testutil.WithTickInterval(time.Second))
}

func (suite *clientTestSuite) TestMemberUpdateBackOff() {
re := suite.Require()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()

leader := cluster.GetLeader()
waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)
memberID := cluster.GetServer(leader).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))
// make sure back off executed.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/retry/backOffExecute", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader, cli.(client))
re.True(retry.TestBackOffExecute())

re.NotEqual(leader, leader2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/retry/backOffExecute"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli client) string {
var leader string
testutil.Eventually(re, func() bool {
cli.ScheduleCheckLeader()
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
}
return true
})
return leader
}
16 changes: 7 additions & 9 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,22 +397,20 @@ func TestScheduler(t *testing.T) {
pdctl.MustPutStore(re, leaderServer.GetServer(), store)
}
re.Equal("5.2.0", leaderServer.GetClusterVersion().String())
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
// After upgrading, we should not use query.
expected1["read-priorities"] = []interface{}{"query", "byte"}
re.NotEqual(expected1, conf1)
expected1["read-priorities"] = []interface{}{"key", "byte"}
re.Equal(expected1, conf1)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"})
// cannot set qps as write-peer-priorities
echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil)
re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities")
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
re.Equal(expected1, conf1)
re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"})

// test remove and add
mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil)
re.Equal(expected1, conf1)
echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil)
re.Contains(echo, "Success")
echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil)
re.Contains(echo, "Success")
Comment on lines -400 to +413
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cp #5847 to make TestScheduler stable


// test balance leader config
conf = make(map[string]interface{})
Expand Down
Loading