From f780e41c0e6e392eac54768a73ec7e215f34d9fd Mon Sep 17 00:00:00 2001 From: artem_danilov Date: Tue, 8 Oct 2024 09:33:25 -0700 Subject: [PATCH 1/3] add retry limiter to backoff function Signed-off-by: artem_danilov --- config/retry/backoff.go | 13 +++++++ config/retry/backoff_test.go | 18 ++++++++++ config/retry/config.go | 67 +++++++++++++++++++++++++++++++----- 3 files changed, 90 insertions(+), 8 deletions(-) diff --git a/config/retry/backoff.go b/config/retry/backoff.go index c18577ad0c..a874bb71ae 100644 --- a/config/retry/backoff.go +++ b/config/retry/backoff.go @@ -233,9 +233,22 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", cfg), zap.Reflect("txnStartTS", startTs)) + + // fail fast if we don't have enough retry tokens + if cfg.retryRateLimiter != nil && !cfg.retryRateLimiter.takeRetryToken() { + logutil.Logger(b.ctx).Warn(fmt.Sprintf("Retry limit for %s is exhausted", cfg.name)) + return errors.WithStack(err) + } + return nil } +func (b *Backoffer) OnSuccess(cfg *Config) { + if cfg.retryRateLimiter != nil { + cfg.retryRateLimiter.addRetryToken() + } +} + func (b *Backoffer) String() string { if b.totalSleep == 0 { return "" diff --git a/config/retry/backoff_test.go b/config/retry/backoff_test.go index 63af9d4ceb..d5bef27740 100644 --- a/config/retry/backoff_test.go +++ b/config/retry/backoff_test.go @@ -83,6 +83,24 @@ func TestBackoffErrorType(t *testing.T) { assert.Fail(t, "should not be here") } +func TestRetryLimit(t *testing.T) { + globalConfig := NewConfigWithRetryLimit("TestConfig", nil, NewBackoffFnCfg(1, 1000, NoJitter), NewRetryRateLimiter(1, 1), errors.New("test error")) + b := NewBackofferWithVars(context.TODO(), 100, nil) + // test we start with retry limit at cap (1 in this test) + assert.Nil(t, b.Backoff(globalConfig, errors.New("test error"))) + // test retry limit hit + assert.NotNil(t, b.Backoff(globalConfig, errors.New("test error"))) + // test the limit is global across difference Backoff instances + b2 := NewBackofferWithVars(context.TODO(), 100, nil) + assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error"))) + // test the limit is repopulated with the cap by populating 2 tokens + b.OnSuccess(globalConfig) + b.OnSuccess(globalConfig) + // test we have only one token due to cap + assert.Nil(t, b2.Backoff(globalConfig, errors.New("test error"))) + assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error"))) +} + func TestBackoffDeepCopy(t *testing.T) { var err error b := NewBackofferWithVars(context.TODO(), 4, nil) diff --git a/config/retry/config.go b/config/retry/config.go index 19ac34b688..889a19a03d 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -39,6 +39,7 @@ import ( "math" "math/rand" "strings" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -52,10 +53,11 @@ import ( // Config is the configuration of the Backoff function. type Config struct { - name string - metric *prometheus.Observer - fnCfg *BackoffFnCfg - err error + name string + metric *prometheus.Observer + fnCfg *BackoffFnCfg + retryRateLimiter *RetryRateLimiter + err error } // backoffFn is the backoff function which compute the sleep time and do sleep. @@ -96,6 +98,54 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn } } +// RetryRateLimiter is used to limit the number of retries +type RetryRateLimiter struct { + // tokenCount represents number of available tokens for retry + tokenCount int32 + // allowedRetryToSuccessRatio is a ratio representing number of retries allowed for each success + allowedRetryToSuccessRatio float32 + // cap limits the number of retry tokens which can be accumulated over time + cap int32 +} + +func NewRetryRateLimiter(cap int32, ratio float32) *RetryRateLimiter { + return &RetryRateLimiter{ + cap, // always init with full bucket to allow retries on startup + ratio, + cap, + } +} + +// add a token to the rate limiter bucket according to configured retry to success ratio and cap +func (r *RetryRateLimiter) addRetryToken() { + if rand.Float32() < r.allowedRetryToSuccessRatio { + if atomic.LoadInt32(&r.tokenCount) < r.cap { + // it is ok to add more than the cap, because the cap is the soft limit + atomic.AddInt32(&r.tokenCount, 1) + } + } +} + +// return true if there is a token to retry, false otherwise +func (r *RetryRateLimiter) takeRetryToken() bool { + if atomic.LoadInt32(&r.tokenCount) > 0 { + // it is ok to go below 0, because consumed token will still match added one at the end + atomic.AddInt32(&r.tokenCount, -1) + return true + } + return false +} + +func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config { + return &Config{ + name: name, + metric: metric, + fnCfg: backoffFnCfg, + retryRateLimiter: retryRateLimiter, + err: err, + } +} + // Base returns the base time of the backoff function. func (c *Config) Base() int { return c.fnCfg.base @@ -119,10 +169,11 @@ const txnLockFastName = "txnLockFast" // Backoff Config variables. var ( // TODO: distinguish tikv and tiflash in metrics - BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) - BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) - BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) - BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) + BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) + BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) + BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 0.1), tikverr.NewErrPDServerTimeout("")) // change base time to 2ms, because it may recover soon. BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) From a1b533943d5b03a5183711ecdbe8a39aae1416ec Mon Sep 17 00:00:00 2001 From: artem_danilov Date: Wed, 23 Oct 2024 21:04:09 -0700 Subject: [PATCH 2/3] change ratio to int, document and more tests Signed-off-by: artem_danilov --- config/retry/config.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/config/retry/config.go b/config/retry/config.go index 889a19a03d..59a4c5fe50 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -102,23 +102,26 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn type RetryRateLimiter struct { // tokenCount represents number of available tokens for retry tokenCount int32 - // allowedRetryToSuccessRatio is a ratio representing number of retries allowed for each success - allowedRetryToSuccessRatio float32 + // successForRetryCount represents how many success requests are need to allow a single retry + successForRetryCount int // cap limits the number of retry tokens which can be accumulated over time cap int32 } -func NewRetryRateLimiter(cap int32, ratio float32) *RetryRateLimiter { +// NewRetryRateLimiter creates a new RetryRateLimiter +// cap: the maximum number of retry tokens can be accumulated over time. Start with full bucket. +// successForRetryCount: how many success requests are needed to allow a single retry. E.g. if you want to allow a single retry per 10 calls, set it to 10. +func NewRetryRateLimiter(cap int32, successForRetryCount int) *RetryRateLimiter { return &RetryRateLimiter{ - cap, // always init with full bucket to allow retries on startup - ratio, + cap, + successForRetryCount, cap, } } -// add a token to the rate limiter bucket according to configured retry to success ratio and cap +// addRetryToken adds a token to the rate limiter bucket according to configured retry to success ratio and the cap func (r *RetryRateLimiter) addRetryToken() { - if rand.Float32() < r.allowedRetryToSuccessRatio { + if rand.Intn(r.successForRetryCount) == 0 { if atomic.LoadInt32(&r.tokenCount) < r.cap { // it is ok to add more than the cap, because the cap is the soft limit atomic.AddInt32(&r.tokenCount, 1) @@ -126,7 +129,7 @@ func (r *RetryRateLimiter) addRetryToken() { } } -// return true if there is a token to retry, false otherwise +// takeRetryToken returns true if there is a token to retry, false otherwise func (r *RetryRateLimiter) takeRetryToken() bool { if atomic.LoadInt32(&r.tokenCount) > 0 { // it is ok to go below 0, because consumed token will still match added one at the end @@ -136,6 +139,7 @@ func (r *RetryRateLimiter) takeRetryToken() bool { return false } +// NewConfigWithRetryLimit creates a new Config for the Backoff operation with a retry limit. func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config { return &Config{ name: name, @@ -173,7 +177,7 @@ var ( BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) - BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 0.1), tikverr.NewErrPDServerTimeout("")) + BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 10), tikverr.NewErrPDServerTimeout("")) // change base time to 2ms, because it may recover soon. BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) From 3a70f7ecbc23aa91ba3ae5213b5046a0c49ac58b Mon Sep 17 00:00:00 2001 From: artem_danilov Date: Thu, 24 Oct 2024 19:47:12 -0700 Subject: [PATCH 3/3] start using the retry limiter Signed-off-by: artem_danilov --- config/retry/config.go | 2 +- internal/locate/region_cache.go | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/config/retry/config.go b/config/retry/config.go index 59a4c5fe50..7431ad8f9f 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -177,7 +177,7 @@ var ( BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) - BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 10), tikverr.NewErrPDServerTimeout("")) + BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(100, 1), tikverr.NewErrPDServerTimeout("")) // change base time to 2ms, because it may recover soon. BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 01dfa385ca..cc9d54e242 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2059,7 +2059,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts = append(opts, pd.WithBuckets()) for { if backoffErr != nil { - err := bo.Backoff(retry.BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRegionMetadata, backoffErr) if err != nil { return nil, errors.WithStack(err) } @@ -2077,6 +2077,10 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, metrics.RegionCacheCounterWithGetCacheMissError.Inc() } else { metrics.RegionCacheCounterWithGetCacheMissOK.Inc() + if backoffErr == nil { + // refill retry allowance only for the original call + bo.OnSuccess(retry.BoPDRegionMetadata) + } } if err != nil { if apicodec.IsDecodeError(err) { @@ -2112,7 +2116,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg var backoffErr error for { if backoffErr != nil { - err := bo.Backoff(retry.BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRegionMetadata, backoffErr) if err != nil { return nil, errors.WithStack(err) } @@ -2124,6 +2128,10 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg metrics.RegionCacheCounterWithGetRegionByIDError.Inc() } else { metrics.RegionCacheCounterWithGetRegionByIDOK.Inc() + if backoffErr == nil { + // refill retry allowance only for the original call + bo.OnSuccess(retry.BoPDRegionMetadata) + } } if err != nil { if apicodec.IsDecodeError(err) {