Skip to content

Commit

Permalink
Only return the latest error in backoffer
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed May 30, 2024
1 parent 52389b0 commit ce72132
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 36 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

Expand Down
40 changes: 18 additions & 22 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

Expand All @@ -50,7 +47,7 @@ type Backoffer struct {
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
// If it's not set, it will use `defaultRetryableChecker` to retry on all non-nil errors.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
Expand All @@ -69,18 +66,13 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
break
}
Expand All @@ -100,7 +92,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -115,7 +107,7 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
Expand All @@ -132,12 +124,9 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
total = base
}
bo := &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
base: base,
max: max,
total: total,
next: base,
currentTotal: 0,
attempt: 0,
Expand All @@ -148,18 +137,25 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
return bo
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) {
if !overwrite && bo.retryableChecker != nil {
return
}
bo.retryableChecker = checker
}

func (bo *Backoffer) isRetryable(err error) bool {
if bo.retryableChecker == nil {
return true
return defaultRetryableChecker(err)
}
return bo.retryableChecker(err)
}

func defaultRetryableChecker(err error) bool {
return err != nil
}

// nextInterval for now use the `exponentialInterval`.
func (bo *Backoffer) nextInterval() time.Duration {
return bo.exponentialInterval()
Expand Down
54 changes: 47 additions & 7 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -87,24 +88,64 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the retryable checker.
// Test the error returned.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(error) bool {
return execCount < 2
err = bo.Exec(ctx, func() error {
execCount++
return fmt.Errorf("test %d", execCount)
})
re.Error(err)
re.Equal("test 4", err.Error())
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
execCount = 0
err = bo.Exec(ctx, func() error {
if execCount == 1 {
return nil
}
execCount++
return nil
return expectedErr
})
re.Equal(1, execCount)
re.NoError(err)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
retryableChecker := func(error) bool {
return execCount < 2
}
bo.SetRetryableChecker(retryableChecker, false)
execFunc := func() error {
execCount++
return expectedErr
}
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
// Test the retryable checker with overwrite.
execCount = 0
retryableChecker = func(error) bool {
return execCount < 4
}
bo.SetRetryableChecker(retryableChecker, false)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
execCount = 0
bo.SetRetryableChecker(retryableChecker, true)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
Expand Down Expand Up @@ -135,7 +176,6 @@ func TestBackofferWithLog(t *testing.T) {
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

Expand Down
50 changes: 46 additions & 4 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -531,14 +532,15 @@ func (suite *httpClientTestSuite) TestSchedulers() {
defer cancel()
schedulers, err := client.GetSchedulers(ctx)
re.NoError(err)
re.Empty(schedulers)
const schedulerName = "evict-leader-scheduler"
re.NotContains(schedulers, schedulerName)

err = client.CreateScheduler(ctx, "evict-leader-scheduler", 1)
err = client.CreateScheduler(ctx, schedulerName, 1)
re.NoError(err)
schedulers, err = client.GetSchedulers(ctx)
re.NoError(err)
re.Len(schedulers, 1)
err = client.SetSchedulerDelay(ctx, "evict-leader-scheduler", 100)
re.Contains(schedulers, schedulerName)
err = client.SetSchedulerDelay(ctx, schedulerName, 100)
re.NoError(err)
err = client.SetSchedulerDelay(ctx, "not-exist", 100)
re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message
Expand Down Expand Up @@ -757,3 +759,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() {
re.Equal("pd2", healths[1].Name)
re.True(healths[0].Health && healths[1].Health)
}

func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)
client := suite.client.WithBackoffer(bo)
for {
healths, err := client.GetHealthStatus(ctx)
if err != nil && strings.Contains(err.Error(), "context canceled") {
return
}
re.NoError(err)
re.Len(healths, 2)
select {
case <-ctx.Done():
return
default:
}
}
}()

leader := suite.cluster.GetLeaderServer()
re.NotNil(leader)
for i := 0; i < 3; i++ {
leader.ResignLeader()
re.NotEmpty(suite.cluster.WaitLeader())
leader = suite.cluster.GetLeaderServer()
re.NotNil(leader)
}

// Cancel the context to stop the goroutine.
cancel()
wg.Wait()
}

0 comments on commit ce72132

Please sign in to comment.