From a0cc33b02164f92188f40259003435138920b822 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 31 May 2024 16:29:52 +0800 Subject: [PATCH] This is an automated cherry-pick of #8227 Signed-off-by: JmPotato --- client/go.mod | 2 +- client/http/client.go | 5 +- client/retry/backoff.go | 76 ++++++++++++------ client/retry/backoff_test.go | 147 +++++++++++++++++++++++++++++++++-- 4 files changed, 198 insertions(+), 32 deletions(-) diff --git a/client/go.mod b/client/go.mod index fd76c80da92..70b4c0261e6 100644 --- a/client/go.mod +++ b/client/go.mod @@ -16,7 +16,6 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 google.golang.org/grpc v1.59.0 @@ -35,6 +34,7 @@ require ( github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/stretchr/objx v0.5.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/client/http/client.go b/client/http/client.go index a282904bb91..123ca616422 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -172,10 +172,11 @@ func (ci *clientInner) requestWithRetry( } // Copy a new backoffer for each request. bo := *reqInfo.bo - // Backoffer also needs to check the status code to determine whether to retry. + // Set the retryable checker for the backoffer if it's not set. bo.SetRetryableChecker(func(err error) bool { + // Backoffer also needs to check the status code to determine whether to retry. return err != nil && !noNeedRetry(statusCode) - }) + }, false) return bo.Exec(ctx, execFunc) } diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 6c293098971..9161ad0fea1 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -16,14 +16,27 @@ package retry import ( "context" + "reflect" + "runtime" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "go.uber.org/multierr" + "github.com/pingcap/log" + "go.uber.org/zap" ) -const maxRecordErrorCount = 20 +// Option is used to customize the backoffer. +type Option func(*Backoffer) + +// withMinLogInterval sets the minimum log interval for retrying. +// Because the retry interval may be not the factor of log interval, so this is the minimum interval. +func withMinLogInterval(interval time.Duration) Option { + return func(bo *Backoffer) { + bo.logInterval = interval + } +} // Backoffer is a backoff policy for retrying operations. type Backoffer struct { @@ -34,8 +47,12 @@ type Backoffer struct { // total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. total time.Duration // retryableChecker is used to check if the error is retryable. - // By default, all errors are retryable. + // If it's not set, it will always retry unconditionally no matter what the error is. retryableChecker func(err error) bool + // logInterval defines the log interval for retrying. + logInterval time.Duration + // nextLogTime is used to record the next log time. + nextLogTime time.Duration attempt int next time.Duration @@ -49,20 +66,23 @@ func (bo *Backoffer) Exec( ) error { defer bo.resetBackoff() var ( - allErrors error - after *time.Timer + err error + after *time.Timer ) + fnName := getFunctionName(fn) for { - err := fn() + err = fn() bo.attempt++ - if bo.attempt < maxRecordErrorCount { - // multierr.Append will ignore nil error. - allErrors = multierr.Append(allErrors, err) - } - if !bo.isRetryable(err) { + if err == nil || !bo.isRetryable(err) { break } currentInterval := bo.nextInterval() + bo.nextLogTime += currentInterval + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("[pd.backoffer] exec fn failed and retrying", + zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) + } if after == nil { after = time.NewTimer(currentInterval) } else { @@ -71,7 +91,7 @@ func (bo *Backoffer) Exec( select { case <-ctx.Done(): after.Stop() - return multierr.Append(allErrors, errors.Trace(ctx.Err())) + return errors.Trace(ctx.Err()) case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true @@ -86,14 +106,14 @@ func (bo *Backoffer) Exec( } } } - return allErrors + return err } // InitialBackoffer make the initial state for retrying. // - `base` defines the initial time interval to wait before each retry. // - `max` defines the max time interval to wait before each retry. // - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. -func InitialBackoffer(base, max, total time.Duration) *Backoffer { +func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer { // Make sure the base is less than or equal to the max. if base > max { base = max @@ -102,21 +122,25 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { if total > 0 && total < base { total = base } - return &Backoffer{ - base: base, - max: max, - total: total, - retryableChecker: func(err error) bool { - return err != nil - }, + bo := &Backoffer{ + base: base, + max: max, + total: total, next: base, currentTotal: 0, attempt: 0, } + for _, opt := range opts { + opt(bo) + } + return bo } -// SetRetryableChecker sets the retryable checker. -func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) { +// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker. +func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) { + if !overwrite && bo.retryableChecker != nil { + return + } bo.retryableChecker = checker } @@ -152,6 +176,7 @@ func (bo *Backoffer) resetBackoff() { bo.next = bo.base bo.currentTotal = 0 bo.attempt = 0 + bo.nextLogTime = 0 } // Only used for test. @@ -161,3 +186,8 @@ var testBackOffExecuteFlag = false func TestBackOffExecute() bool { return testBackOffExecuteFlag } + +func getFunctionName(f any) string { + strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".") + return strings.Split(strs[len(strs)-1], "-")[0] +} diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 32a42d083bd..22d487b1885 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -15,12 +15,16 @@ package retry import ( + "bytes" "context" "errors" + "fmt" "testing" "time" + "github.com/pingcap/log" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestBackoffer(t *testing.T) { @@ -84,26 +88,157 @@ func TestBackoffer(t *testing.T) { return expectedErr }) re.InDelta(total, time.Since(start), float64(250*time.Millisecond)) - re.ErrorContains(err, "test; test; test; test") + re.ErrorContains(err, "test") re.ErrorIs(err, expectedErr) re.Equal(4, execCount) re.True(isBackofferReset(bo)) - // Test the retryable checker. + // Test the error returned. execCount = 0 - bo = InitialBackoffer(base, max, total) - bo.SetRetryableChecker(func(err error) bool { - return execCount < 2 + err = bo.Exec(ctx, func() error { + execCount++ + return fmt.Errorf("test %d", execCount) }) + re.Error(err) + re.Equal("test 4", err.Error()) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) + execCount = 0 err = bo.Exec(ctx, func() error { + if execCount == 1 { + return nil + } execCount++ - return nil + return expectedErr }) + re.Equal(1, execCount) re.NoError(err) + re.True(isBackofferReset(bo)) + + // Test the retryable checker. + execCount = 0 + bo = InitialBackoffer(base, max, total) + retryableChecker := func(error) bool { + return execCount < 2 + } + bo.SetRetryableChecker(retryableChecker, false) + execFunc := func() error { + execCount++ + return expectedErr + } + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(2, execCount) + re.True(isBackofferReset(bo)) + // Test the retryable checker with overwrite. + execCount = 0 + retryableChecker = func(error) bool { + return execCount < 4 + } + bo.SetRetryableChecker(retryableChecker, false) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) re.Equal(2, execCount) re.True(isBackofferReset(bo)) + execCount = 0 + bo.SetRetryableChecker(retryableChecker, true) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) } func isBackofferReset(bo *Backoffer) bool { return bo.next == bo.base && bo.currentTotal == 0 } + +func TestBackofferWithLog(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true} + lg := newZapTestLogger(conf) + log.ReplaceGlobals(lg.Logger, nil) + + bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100)) + err := bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms := lg.Messages() + len1 := len(ms) + // 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times. + re.Len(ms, 10) + // 10 + 20 + 40 + 80 + 100 * 9, 13 times retry. + rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + // 10 + 20 + 40 + 80(log), 4 times retry. + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` + re.Contains(ms[0], rfc) + + err = bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms = lg.Messages() + re.Len(ms, 20) + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` + re.Contains(ms[len1], rfc) +} + +var errTest = errors.New("test") + +func testFn() error { + return errTest +} + +// testingWriter is a WriteSyncer that writes the the messages. +type testingWriter struct { + messages []string +} + +func newTestingWriter() *testingWriter { + return &testingWriter{} +} + +func (w *testingWriter) Write(p []byte) (n int, err error) { + n = len(p) + p = bytes.TrimRight(p, "\n") + m := string(p) + w.messages = append(w.messages, m) + return n, nil +} +func (*testingWriter) Sync() error { + return nil +} + +type verifyLogger struct { + *zap.Logger + w *testingWriter +} + +func (logger *verifyLogger) Message() string { + if logger.w.messages == nil { + return "" + } + return logger.w.messages[len(logger.w.messages)-1] +} + +func (logger *verifyLogger) Messages() []string { + if logger.w.messages == nil { + return nil + } + return logger.w.messages +} + +func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger { + // TestingWriter is used to write to memory. + // Used in the verify logger. + writer := newTestingWriter() + lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...) + return verifyLogger{ + Logger: lg, + w: writer, + } +}