Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/retry: only return the latest error in backoffer (#8227) #8465

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.59.0
Expand All @@ -35,6 +34,7 @@ require (
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

Expand Down
76 changes: 53 additions & 23 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,27 @@ package retry

import (
"context"
"reflect"
"runtime"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/multierr"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20
// Option is used to customize the backoffer.
type Option func(*Backoffer)

// withMinLogInterval sets the minimum log interval for retrying.
// Because the retry interval may be not the factor of log interval, so this is the minimum interval.
func withMinLogInterval(interval time.Duration) Option {
return func(bo *Backoffer) {
bo.logInterval = interval
}
}

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
Expand All @@ -34,8 +47,12 @@ type Backoffer struct {
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
// If it's not set, it will always retry unconditionally no matter what the error is.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
// nextLogTime is used to record the next log time.
nextLogTime time.Duration

attempt int
next time.Duration
Expand All @@ -49,20 +66,23 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
allErrors error
after *time.Timer
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err := fn()
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
if err == nil || !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("[pd.backoffer] exec fn failed and retrying",
zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
if after == nil {
after = time.NewTimer(currentInterval)
} else {
Expand All @@ -71,7 +91,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -86,14 +106,14 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
// - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
func InitialBackoffer(base, max, total time.Duration) *Backoffer {
func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer {
// Make sure the base is less than or equal to the max.
if base > max {
base = max
Expand All @@ -102,21 +122,25 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
if total > 0 && total < base {
total = base
}
return &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
bo := &Backoffer{
base: base,
max: max,
total: total,
next: base,
currentTotal: 0,
attempt: 0,
}
for _, opt := range opts {
opt(bo)
}
return bo
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) {
if !overwrite && bo.retryableChecker != nil {
return
}
bo.retryableChecker = checker
}

Expand Down Expand Up @@ -152,6 +176,7 @@ func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
bo.attempt = 0
bo.nextLogTime = 0
}

// Only used for test.
Expand All @@ -161,3 +186,8 @@ var testBackOffExecuteFlag = false
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}

func getFunctionName(f any) string {
strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")
return strings.Split(strs[len(strs)-1], "-")[0]
}
147 changes: 141 additions & 6 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package retry

import (
"bytes"
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestBackoffer(t *testing.T) {
Expand Down Expand Up @@ -84,26 +88,157 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the retryable checker.
// Test the error returned.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(err error) bool {
return execCount < 2
err = bo.Exec(ctx, func() error {
execCount++
return fmt.Errorf("test %d", execCount)
})
re.Error(err)
re.Equal("test 4", err.Error())
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
execCount = 0
err = bo.Exec(ctx, func() error {
if execCount == 1 {
return nil
}
execCount++
return nil
return expectedErr
})
re.Equal(1, execCount)
re.NoError(err)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
retryableChecker := func(error) bool {
return execCount < 2
}
bo.SetRetryableChecker(retryableChecker, false)
execFunc := func() error {
execCount++
return expectedErr
}
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
// Test the retryable checker with overwrite.
execCount = 0
retryableChecker = func(error) bool {
return execCount < 4
}
bo.SetRetryableChecker(retryableChecker, false)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
execCount = 0
bo.SetRetryableChecker(retryableChecker, true)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
return bo.next == bo.base && bo.currentTotal == 0
}

func TestBackofferWithLog(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true}
lg := newZapTestLogger(conf)
log.ReplaceGlobals(lg.Logger, nil)

bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100))
err := bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms := lg.Messages()
len1 := len(ms)
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

var errTest = errors.New("test")

func testFn() error {
return errTest
}

// testingWriter is a WriteSyncer that writes the the messages.
type testingWriter struct {
messages []string
}

func newTestingWriter() *testingWriter {
return &testingWriter{}
}

func (w *testingWriter) Write(p []byte) (n int, err error) {
n = len(p)
p = bytes.TrimRight(p, "\n")
m := string(p)
w.messages = append(w.messages, m)
return n, nil
}
func (*testingWriter) Sync() error {
return nil
}

type verifyLogger struct {
*zap.Logger
w *testingWriter
}

func (logger *verifyLogger) Message() string {
if logger.w.messages == nil {
return ""
}
return logger.w.messages[len(logger.w.messages)-1]
}

func (logger *verifyLogger) Messages() []string {
if logger.w.messages == nil {
return nil
}
return logger.w.messages
}

func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger {
// TestingWriter is used to write to memory.
// Used in the verify logger.
writer := newTestingWriter()
lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...)
return verifyLogger{
Logger: lg,
w: writer,
}
}
Loading