Skip to content

Commit

Permalink
This is an automated cherry-pick of #8227
Browse files Browse the repository at this point in the history
ref #8142

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JmPotato authored and ti-chi-bot committed Jul 31, 2024
1 parent 632ee6e commit a2b8e77
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 20 deletions.
5 changes: 4 additions & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.59.0
Expand All @@ -34,7 +33,11 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
<<<<<<< HEAD
github.com/stretchr/objx v0.5.0 // indirect
=======
go.uber.org/multierr v1.11.0 // indirect
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

Expand Down
57 changes: 47 additions & 10 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,27 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
<<<<<<< HEAD
"go.uber.org/multierr"
)

const maxRecordErrorCount = 20
=======
"github.com/pingcap/log"
"go.uber.org/zap"
)

// Option is used to customize the backoffer.
type Option func(*Backoffer)

// withMinLogInterval sets the minimum log interval for retrying.
// Because the retry interval may be not the factor of log interval, so this is the minimum interval.
func withMinLogInterval(interval time.Duration) Option {
return func(bo *Backoffer) {
bo.logInterval = interval
}
}
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
Expand All @@ -34,7 +51,7 @@ type Backoffer struct {
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
// If it's not set, it will always retry unconditionally no matter what the error is.
retryableChecker func(err error) bool

attempt int
Expand All @@ -49,20 +66,30 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
<<<<<<< HEAD
allErrors error
after *time.Timer
=======
err error
after *time.Timer
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))
)
for {
err := fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
if err == nil || !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
<<<<<<< HEAD
=======
bo.nextLogTime += currentInterval
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("[pd.backoffer] exec fn failed and retrying",
zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))
if after == nil {
after = time.NewTimer(currentInterval)
} else {
Expand All @@ -71,7 +98,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -86,7 +113,7 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
Expand All @@ -102,21 +129,31 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
if total > 0 && total < base {
total = base
}
<<<<<<< HEAD
return &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
=======
bo := &Backoffer{
base: base,
max: max,
total: total,
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))
next: base,
currentTotal: 0,
attempt: 0,
}
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) {
if !overwrite && bo.retryableChecker != nil {
return
}
bo.retryableChecker = checker
}

Expand Down
151 changes: 145 additions & 6 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package retry
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -84,26 +85,164 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the retryable checker.
// Test the error returned.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(err error) bool {
return execCount < 2
err = bo.Exec(ctx, func() error {
execCount++
return fmt.Errorf("test %d", execCount)
})
re.Error(err)
re.Equal("test 4", err.Error())
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
execCount = 0
err = bo.Exec(ctx, func() error {
if execCount == 1 {
return nil
}
execCount++
return nil
return expectedErr
})
re.Equal(1, execCount)
re.NoError(err)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
<<<<<<< HEAD
bo.SetRetryableChecker(func(err error) bool {
=======
retryableChecker := func(error) bool {
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))
return execCount < 2
}
bo.SetRetryableChecker(retryableChecker, false)
execFunc := func() error {
execCount++
return expectedErr
}
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
// Test the retryable checker with overwrite.
execCount = 0
retryableChecker = func(error) bool {
return execCount < 4
}
bo.SetRetryableChecker(retryableChecker, false)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
execCount = 0
bo.SetRetryableChecker(retryableChecker, true)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
return bo.next == bo.base && bo.currentTotal == 0
}
<<<<<<< HEAD
=======

func TestBackofferWithLog(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true}
lg := newZapTestLogger(conf)
log.ReplaceGlobals(lg.Logger, nil)

bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100))
err := bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms := lg.Messages()
len1 := len(ms)
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

var errTest = errors.New("test")

func testFn() error {
return errTest
}

// testingWriter is a WriteSyncer that writes the the messages.
type testingWriter struct {
messages []string
}

func newTestingWriter() *testingWriter {
return &testingWriter{}
}

func (w *testingWriter) Write(p []byte) (n int, err error) {
n = len(p)
p = bytes.TrimRight(p, "\n")
m := string(p)
w.messages = append(w.messages, m)
return n, nil
}
func (*testingWriter) Sync() error {
return nil
}

type verifyLogger struct {
*zap.Logger
w *testingWriter
}

func (logger *verifyLogger) Message() string {
if logger.w.messages == nil {
return ""
}
return logger.w.messages[len(logger.w.messages)-1]
}

func (logger *verifyLogger) Messages() []string {
if logger.w.messages == nil {
return nil
}
return logger.w.messages
}

func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger {
// TestingWriter is used to write to memory.
// Used in the verify logger.
writer := newTestingWriter()
lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...)
return verifyLogger{
Logger: lg,
w: writer,
}
}
>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))
Loading

0 comments on commit a2b8e77

Please sign in to comment.