Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 24, 2023
1 parent 3199dfe commit f4541bf
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 6 deletions.
119 changes: 115 additions & 4 deletions client/base_client.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

const (
Expand Down Expand Up @@ -64,6 +66,10 @@ type baseClient struct {

// Client option.
option *option

successReConnect chan struct{}

bo *retry.Backoffer
}

// SecurityOption records options about tls
Expand All @@ -88,6 +94,8 @@ func newBaseClient(ctx context.Context, urls []string, security SecurityOption)
cancel: clientCancel,
security: security,
option: newOption(),
bo: retry.NewBackoffer(clientCtx, maxRetryTimes),
successReConnect: make(chan struct{}, 1),
}
bc.urls.Store(urls)
return bc
Expand All @@ -105,7 +113,7 @@ func (c *baseClient) init() error {
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(1)
go c.memberLoop()
go c.reconnectMemberLoop()
return nil
}

Expand All @@ -124,32 +132,129 @@ func (c *baseClient) initRetry(f func() error) error {
return errors.WithStack(err)
}

func (c *baseClient) memberLoop() {
func (c *baseClient) reconnectMemberLoop() {
defer c.wg.Done()

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

failpoint.Inject("acceleratedMemberUpdateInterval", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond * 100)
})

for {
select {
case <-c.checkLeaderCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:
case <-ctx.Done():
log.Info("[pd.reconnectLoop] exit reconnectLoop")
return
}

failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})

if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
log.Error("[pd.reconnectLoop] failed updateMember", errs.ZapError(err))
} else {
c.SuccessReconnect()
}
}
}

func (c *baseClient) waitForReady() error {
if e1 := c.waitForLeaderReady(); e1 != nil {
log.Error("[pd.waitForReady] wait for leader ready failed", errs.ZapError(e1))
} else if e2 := c.loadMembers(); e2 != nil {
log.Error("[pd.waitForReady] load members failed", errs.ZapError(e2))
} else {
return nil
}

deadline := time.Now().Add(requestTimeout)
failpoint.Inject("acceleratedRequestTimeout", func() {
deadline = time.Now().Add(500 * time.Millisecond)
})
for {
select {
case <-c.successReConnect:
return nil
case <-time.After(time.Until(deadline)):
log.Error("[pd.waitForReady] timeout")
return errors.New("wait for ready timeout")
}
}
}

// waitForLeaderReady waits for the leader to be ready.
func (c *baseClient) waitForLeaderReady() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

for {
old, ok := c.clientConns.Load(c.GetLeaderAddr())
if !ok {
return errors.New("no leader")
}
cc := old.(*grpc.ClientConn)

s := cc.GetState()
if s == connectivity.Ready {
return nil
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return ctx.Err()
}
}
}

func (c *baseClient) loadMembers() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

members, err := c.getMembers(ctx, c.GetLeaderAddr(), updateMemberTimeout)
if err != nil {
log.Warn("[pd.loadMembers] failed to load members ", zap.String("url", c.GetLeaderAddr()), errs.ZapError(err))
return errors.WithStack(err)
} else if members.GetHeader() == nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist")
log.Warn("[pd.loadMembers] leader address don't exist. ", zap.String("url", c.GetLeaderAddr()), errs.ZapError(err))
return errors.WithStack(err)
}

return nil
}

func (c *baseClient) SuccessReconnect() {
select {
case c.successReConnect <- struct{}{}:
default:
}
}

// ScheduleCheckLeader is used to check leader.
func (c *baseClient) ScheduleCheckLeader() {
select {
case c.checkLeaderCh <- struct{}{}:
if err := c.waitForReady(); err != nil {
// If backoff times count is greater than 10, reset it.
if c.bo.GetBackoffTimeCnt(retry.BoMemberUpdate.String()) >= 10 {
c.bo.Reset()
}
e := c.bo.Backoff(retry.BoMemberUpdate, err)

if e != nil {
log.Error("[pd] wait for ready backoff failed", errs.ZapError(e))
return
}
log.Error("[pd] wait for ready failed", errs.ZapError(err))
}
default:
}
}
Expand Down Expand Up @@ -454,3 +559,9 @@ func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error)
c.clientConns.Store(addr, cc)
return cc, nil
}

// GetBackoffTimes returns a map contains backoff time count by type.
// For test purpose
func (c *baseClient) GetBackoffTimes() map[string]int {
return c.bo.GetBackoffTimes()
}
1 change: 1 addition & 0 deletions client/client.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ const (
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
requestTimeout = 2 * time.Second
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.7.0
go.uber.org/goleak v1.1.11
Expand Down
2 changes: 1 addition & 1 deletion client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package pd

import (
"context"
"go.uber.org/zap"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down
166 changes: 166 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"fmt"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// Backoffer is a utility for retrying queries.
type Backoffer struct {
ctx context.Context

fn map[string]backoffFn
maxSleep int
totalSleep int

errors []error
configs []*Config
backoffSleepMS map[string]int
backoffTimes map[string]int
}

// NewBackoffer (Deprecated) creates a Backoffer with maximum sleep time(in ms).
func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer {
return &Backoffer{
ctx: ctx,
maxSleep: maxSleep,
}
}

// Backoff sleeps a while base on the Config and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(cfg *Config, err error) error {
if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", cfg), opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(b.ctx, span1)
}
return b.BackoffWithCfgAndMaxSleep(cfg, -1, err)
}

// BackoffWithCfgAndMaxSleep sleeps a while base on the Config and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error {
select {
case <-b.ctx.Done():
return errors.WithStack(err)
default:
}
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
b.configs = append(b.configs, cfg)

// Lazy initialize.
if b.fn == nil {
b.fn = make(map[string]backoffFn)
}
f, ok := b.fn[cfg.name]
if !ok {
f = cfg.createBackoffFn()
b.fn[cfg.name] = f
}
realSleep := f(b.ctx, maxSleepMs)

b.totalSleep += realSleep
if b.backoffSleepMS == nil {
b.backoffSleepMS = make(map[string]int)
}
b.backoffSleepMS[cfg.name] += realSleep
if b.backoffTimes == nil {
b.backoffTimes = make(map[string]int)
}
b.backoffTimes[cfg.name]++

log.Debug("retry later",
zap.Error(err),
zap.Int("totalSleep", b.totalSleep),
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", cfg))
return nil
}

func (b *Backoffer) String() string {
if b.totalSleep == 0 {
return ""
}
return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.configs)
}

// GetTotalSleep returns total sleep time.
func (b *Backoffer) GetTotalSleep() int {
return b.totalSleep
}

// GetCtx returns the bound context.
func (b *Backoffer) GetCtx() context.Context {
return b.ctx
}

// SetCtx sets the bound context to ctx.
func (b *Backoffer) SetCtx(ctx context.Context) {
b.ctx = ctx
}

// GetBackoffTimes returns a map contains backoff time count by type.
func (b *Backoffer) GetBackoffTimes() map[string]int {
return b.backoffTimes
}

// GetBackoffTimeCnt returns backoff time count by specific type.
func (b *Backoffer) GetBackoffTimeCnt(s string) int {
return b.backoffTimes[s]
}

// GetTotalBackoffTimes returns the total backoff times of the backoffer.
func (b *Backoffer) GetTotalBackoffTimes() int {
total := 0
for _, t := range b.backoffTimes {
total += t
}
return total
}

// GetBackoffSleepMS returns a map contains backoff sleep time by type.
func (b *Backoffer) GetBackoffSleepMS() map[string]int {
return b.backoffSleepMS
}

// ErrorsNum returns the number of errors.
func (b *Backoffer) ErrorsNum() int {
return len(b.errors)
}

// Reset resets the sleep state of the backoffer, so that following backoff
// can sleep shorter. The reason why we don't create a new backoffer is that
// backoffer is similar to context, and it records some metrics that we
// want to record for an entire process which is composed of serveral stages.
func (b *Backoffer) Reset() {
b.fn = nil
b.totalSleep = 0
}

// ResetMaxSleep resets the sleep state and max sleep limit of the backoffer.
// It's used when switches to the next stage of the process.
func (b *Backoffer) ResetMaxSleep(maxSleep int) {
b.Reset()
b.maxSleep = maxSleep
}
Loading

0 comments on commit f4541bf

Please sign in to comment.