Skip to content

Commit

Permalink
Merge branch 'master' into fix_client_cov
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Apr 9, 2024
2 parents 146b99f + 80a2fd9 commit 3b242a4
Show file tree
Hide file tree
Showing 15 changed files with 510 additions and 271 deletions.
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
var (
ErrClientListResourceGroup = errors.Normalize("get all resource group failed, %v", errors.RFCCodeText("PD:client:ErrClientListResourceGroup"))
ErrClientResourceGroupConfigUnavailable = errors.Normalize("resource group config is unavailable, %v", errors.RFCCodeText("PD:client:ErrClientResourceGroupConfigUnavailable"))
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation, estimated wait time %s, ltb state is %.2f:%.2f", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
)

// ErrClientGetResourceGroup is the error type for getting resource group.
Expand Down
26 changes: 15 additions & 11 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDuration time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
limit Limit
remainingTokens float64
}

// OK returns whether the limiter can provide the requested number of tokens
Expand Down Expand Up @@ -359,10 +360,11 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDuration: waitDuration,
remainingTokens: tokens,
}
if ok {
r.tokens = n
Expand All @@ -380,6 +382,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Float64("notify-threshold", lim.notifyThreshold),
zap.Bool("is-low-process", lim.isLowProcess),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
lim.last = last
Expand Down Expand Up @@ -461,7 +465,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens)
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestCancel(t *testing.T) {
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(4*time.Second, d)
re.Error(err)
re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00")
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
cancel1()
Expand Down
45 changes: 42 additions & 3 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,31 @@ package retry

import (
"context"
"reflect"
"runtime"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

// withMinLogInterval sets the minimum log interval for retrying.
// Because the retry interval may be not the factor of log interval, so this is the minimum interval.
func withMinLogInterval(interval time.Duration) Option {
return func(bo *Backoffer) {
bo.logInterval = interval
}
}

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
Expand All @@ -36,6 +52,10 @@ type Backoffer struct {
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
// nextLogTime is used to record the next log time.
nextLogTime time.Duration

attempt int
next time.Duration
Expand All @@ -50,10 +70,12 @@ func (bo *Backoffer) Exec(
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err := fn()
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
Expand All @@ -63,6 +85,13 @@ func (bo *Backoffer) Exec(
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if err != nil {
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
}
if after == nil {
after = time.NewTimer(currentInterval)
} else {
Expand Down Expand Up @@ -93,7 +122,7 @@ func (bo *Backoffer) Exec(
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
// - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
func InitialBackoffer(base, max, total time.Duration) *Backoffer {
func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer {
// Make sure the base is less than or equal to the max.
if base > max {
base = max
Expand All @@ -102,7 +131,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
if total > 0 && total < base {
total = base
}
return &Backoffer{
bo := &Backoffer{
base: base,
max: max,
total: total,
Expand All @@ -113,6 +142,10 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
currentTotal: 0,
attempt: 0,
}
for _, opt := range opts {
opt(bo)
}
return bo
}

// SetRetryableChecker sets the retryable checker.
Expand Down Expand Up @@ -152,6 +185,7 @@ func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
bo.attempt = 0
bo.nextLogTime = 0
}

// Only used for test.
Expand All @@ -161,3 +195,8 @@ var testBackOffExecuteFlag = false
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}

func getFunctionName(f any) string {
strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")
return strings.Split(strs[len(strs)-1], "-")[0]
}
95 changes: 95 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package retry

import (
"bytes"
"context"
"errors"
"testing"
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestBackoffer(t *testing.T) {
Expand Down Expand Up @@ -107,3 +110,95 @@ func TestBackoffer(t *testing.T) {
func isBackofferReset(bo *Backoffer) bool {
return bo.next == bo.base && bo.currentTotal == 0
}

func TestBackofferWithLog(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true}
lg := newZapTestLogger(conf)
log.ReplaceGlobals(lg.Logger, nil)

bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100))
err := bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms := lg.Messages()
len1 := len(ms)
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

var errTest = errors.New("test")

func testFn() error {
return errTest
}

// testingWriter is a WriteSyncer that writes the the messages.
type testingWriter struct {
messages []string
}

func newTestingWriter() *testingWriter {
return &testingWriter{}
}

func (w *testingWriter) Write(p []byte) (n int, err error) {
n = len(p)
p = bytes.TrimRight(p, "\n")
m := string(p)
w.messages = append(w.messages, m)
return n, nil
}
func (w *testingWriter) Sync() error {
return nil
}

type verifyLogger struct {
*zap.Logger
w *testingWriter
}

func (logger *verifyLogger) Message() string {
if logger.w.messages == nil {
return ""
}
return logger.w.messages[len(logger.w.messages)-1]
}

func (logger *verifyLogger) Messages() []string {
if logger.w.messages == nil {
return nil
}
return logger.w.messages
}

func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger {
// TestingWriter is used to write to memory.
// Used in the verify logger.
writer := newTestingWriter()
lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...)
return verifyLogger{
Logger: lg,
w: writer,
}
}
Loading

0 comments on commit 3b242a4

Please sign in to comment.