diff --git a/client/go.mod b/client/go.mod index fd76c80da92..bff2f414fce 100644 --- a/client/go.mod +++ b/client/go.mod @@ -16,7 +16,6 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 google.golang.org/grpc v1.59.0 @@ -34,7 +33,11 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect +<<<<<<< HEAD github.com/stretchr/objx v0.5.0 // indirect +======= + go.uber.org/multierr v1.11.0 // indirect +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/client/http/client.go b/client/http/client.go index 30144ebe2c5..7b34193c2a4 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry( } // Copy a new backoffer for each request. bo := *reqInfo.bo - // Backoffer also needs to check the status code to determine whether to retry. + // Set the retryable checker for the backoffer if it's not set. bo.SetRetryableChecker(func(err error) bool { + // Backoffer also needs to check the status code to determine whether to retry. return err != nil && !noNeedRetry(statusCode) - }) + }, false) return bo.Exec(ctx, execFunc) } diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 6c293098971..2cb11205b7b 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -20,10 +20,27 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" +<<<<<<< HEAD "go.uber.org/multierr" ) const maxRecordErrorCount = 20 +======= + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Option is used to customize the backoffer. +type Option func(*Backoffer) + +// withMinLogInterval sets the minimum log interval for retrying. +// Because the retry interval may be not the factor of log interval, so this is the minimum interval. +func withMinLogInterval(interval time.Duration) Option { + return func(bo *Backoffer) { + bo.logInterval = interval + } +} +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) // Backoffer is a backoff policy for retrying operations. type Backoffer struct { @@ -34,7 +51,7 @@ type Backoffer struct { // total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. total time.Duration // retryableChecker is used to check if the error is retryable. - // By default, all errors are retryable. + // If it's not set, it will always retry unconditionally no matter what the error is. retryableChecker func(err error) bool attempt int @@ -49,20 +66,30 @@ func (bo *Backoffer) Exec( ) error { defer bo.resetBackoff() var ( +<<<<<<< HEAD allErrors error after *time.Timer +======= + err error + after *time.Timer +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) ) for { err := fn() bo.attempt++ - if bo.attempt < maxRecordErrorCount { - // multierr.Append will ignore nil error. - allErrors = multierr.Append(allErrors, err) - } - if !bo.isRetryable(err) { + if err == nil || !bo.isRetryable(err) { break } currentInterval := bo.nextInterval() +<<<<<<< HEAD +======= + bo.nextLogTime += currentInterval + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("[pd.backoffer] exec fn failed and retrying", + zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) + } +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) if after == nil { after = time.NewTimer(currentInterval) } else { @@ -71,7 +98,7 @@ func (bo *Backoffer) Exec( select { case <-ctx.Done(): after.Stop() - return multierr.Append(allErrors, errors.Trace(ctx.Err())) + return errors.Trace(ctx.Err()) case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true @@ -86,7 +113,7 @@ func (bo *Backoffer) Exec( } } } - return allErrors + return err } // InitialBackoffer make the initial state for retrying. @@ -102,6 +129,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { if total > 0 && total < base { total = base } +<<<<<<< HEAD return &Backoffer{ base: base, max: max, @@ -109,14 +137,23 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { retryableChecker: func(err error) bool { return err != nil }, +======= + bo := &Backoffer{ + base: base, + max: max, + total: total, +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) next: base, currentTotal: 0, attempt: 0, } } -// SetRetryableChecker sets the retryable checker. -func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) { +// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker. +func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) { + if !overwrite && bo.retryableChecker != nil { + return + } bo.retryableChecker = checker } diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 32a42d083bd..5885b3164c7 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -17,6 +17,7 @@ package retry import ( "context" "errors" + "fmt" "testing" "time" @@ -84,26 +85,164 @@ func TestBackoffer(t *testing.T) { return expectedErr }) re.InDelta(total, time.Since(start), float64(250*time.Millisecond)) - re.ErrorContains(err, "test; test; test; test") + re.ErrorContains(err, "test") re.ErrorIs(err, expectedErr) re.Equal(4, execCount) re.True(isBackofferReset(bo)) - // Test the retryable checker. + // Test the error returned. execCount = 0 - bo = InitialBackoffer(base, max, total) - bo.SetRetryableChecker(func(err error) bool { - return execCount < 2 + err = bo.Exec(ctx, func() error { + execCount++ + return fmt.Errorf("test %d", execCount) }) + re.Error(err) + re.Equal("test 4", err.Error()) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) + execCount = 0 err = bo.Exec(ctx, func() error { + if execCount == 1 { + return nil + } execCount++ - return nil + return expectedErr }) + re.Equal(1, execCount) re.NoError(err) + re.True(isBackofferReset(bo)) + + // Test the retryable checker. + execCount = 0 + bo = InitialBackoffer(base, max, total) +<<<<<<< HEAD + bo.SetRetryableChecker(func(err error) bool { +======= + retryableChecker := func(error) bool { +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) + return execCount < 2 + } + bo.SetRetryableChecker(retryableChecker, false) + execFunc := func() error { + execCount++ + return expectedErr + } + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) re.Equal(2, execCount) re.True(isBackofferReset(bo)) + // Test the retryable checker with overwrite. + execCount = 0 + retryableChecker = func(error) bool { + return execCount < 4 + } + bo.SetRetryableChecker(retryableChecker, false) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(2, execCount) + re.True(isBackofferReset(bo)) + execCount = 0 + bo.SetRetryableChecker(retryableChecker, true) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) } func isBackofferReset(bo *Backoffer) bool { return bo.next == bo.base && bo.currentTotal == 0 } +<<<<<<< HEAD +======= + +func TestBackofferWithLog(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true} + lg := newZapTestLogger(conf) + log.ReplaceGlobals(lg.Logger, nil) + + bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100)) + err := bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms := lg.Messages() + len1 := len(ms) + // 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times. + re.Len(ms, 10) + // 10 + 20 + 40 + 80 + 100 * 9, 13 times retry. + rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + // 10 + 20 + 40 + 80(log), 4 times retry. + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` + re.Contains(ms[0], rfc) + + err = bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms = lg.Messages() + re.Len(ms, 20) + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` + re.Contains(ms[len1], rfc) +} + +var errTest = errors.New("test") + +func testFn() error { + return errTest +} + +// testingWriter is a WriteSyncer that writes the the messages. +type testingWriter struct { + messages []string +} + +func newTestingWriter() *testingWriter { + return &testingWriter{} +} + +func (w *testingWriter) Write(p []byte) (n int, err error) { + n = len(p) + p = bytes.TrimRight(p, "\n") + m := string(p) + w.messages = append(w.messages, m) + return n, nil +} +func (*testingWriter) Sync() error { + return nil +} + +type verifyLogger struct { + *zap.Logger + w *testingWriter +} + +func (logger *verifyLogger) Message() string { + if logger.w.messages == nil { + return "" + } + return logger.w.messages[len(logger.w.messages)-1] +} + +func (logger *verifyLogger) Messages() []string { + if logger.w.messages == nil { + return nil + } + return logger.w.messages +} + +func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger { + // TestingWriter is used to write to memory. + // Used in the verify logger. + writer := newTestingWriter() + lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...) + return verifyLogger{ + Logger: lg, + w: writer, + } +} +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 9efbc587847..8abc6a0f07b 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -21,6 +21,7 @@ import ( "net/url" "sort" "strings" + "sync" "testing" "time" @@ -571,14 +572,24 @@ func (suite *httpClientTestSuite) checkSchedulers(mode mode, client pd.Client) { schedulers, err := client.GetSchedulers(env.ctx) re.NoError(err) - re.Empty(schedulers) + const schedulerName = "evict-leader-scheduler" + re.NotContains(schedulers, schedulerName) +<<<<<<< HEAD err = client.CreateScheduler(env.ctx, "evict-leader-scheduler", 1) +======= + err = client.CreateScheduler(ctx, schedulerName, 1) +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) re.NoError(err) schedulers, err = client.GetSchedulers(env.ctx) re.NoError(err) +<<<<<<< HEAD re.Len(schedulers, 1) err = client.SetSchedulerDelay(env.ctx, "evict-leader-scheduler", 100) +======= + re.Contains(schedulers, schedulerName) + err = client.SetSchedulerDelay(ctx, schedulerName, 100) +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227)) re.NoError(err) err = client.SetSchedulerDelay(env.ctx, "not-exist", 100) re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message @@ -811,3 +822,59 @@ func (suite *httpClientTestSuite) checkUpdateKeyspaceGCManagementType(mode mode, re.True(ok) re.Equal(expectGCManagementType, val) } +<<<<<<< HEAD +======= + +func (suite *httpClientTestSuite) TestGetHealthStatus() { + re := suite.Require() + healths, err := suite.client.GetHealthStatus(suite.ctx) + re.NoError(err) + re.Len(healths, 2) + sort.Slice(healths, func(i, j int) bool { + return healths[i].Name < healths[j].Name + }) + re.Equal("pd1", healths[0].Name) + re.Equal("pd2", healths[1].Name) + re.True(healths[0].Health && healths[1].Health) +} + +func (suite *httpClientTestSuite) TestRetryOnLeaderChange() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0) + client := suite.client.WithBackoffer(bo) + for { + healths, err := client.GetHealthStatus(ctx) + if err != nil && strings.Contains(err.Error(), "context canceled") { + return + } + re.NoError(err) + re.Len(healths, 2) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + leader := suite.cluster.GetLeaderServer() + re.NotNil(leader) + for i := 0; i < 3; i++ { + leader.ResignLeader() + re.NotEmpty(suite.cluster.WaitLeader()) + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + } + + // Cancel the context to stop the goroutine. + cancel() + wg.Wait() +} +>>>>>>> 199b01792 (client/retry: only return the latest error in backoffer (#8227))