From 9f5522ed674959f4e551bf38b8cbd43cc80c91c9 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 24 Jul 2024 01:32:49 +0800 Subject: [PATCH 1/4] add test for limiter Signed-off-by: nolouch --- .../resource_group/controller/limiter_test.go | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 15364989cd7..7ff6b316f1b 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -20,8 +20,10 @@ package controller import ( "context" + "fmt" "math" "sync" + "sync/atomic" "testing" "time" @@ -212,3 +214,81 @@ func TestCancelErrorOfReservation(t *testing.T) { re.Error(err) re.Contains(err.Error(), "context canceled") } + +func TestQPS(t *testing.T) { + re := require.New(t) + cases := []struct { + concurrency int + reserveN int64 + RU_PER_SEC int64 + }{ + {10000, 10, 400000}, + } + + for _, tc := range cases { + t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) { + qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.RU_PER_SEC) + t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime)) + re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)-ruSec), float64(1)) + re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)/float64(tc.reserveN)-qps), float64(1)) + }) + } +} + +const testCaseRunTime = 3 * time.Second + +func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) { + nc := make(chan notifyMsg, 1) + lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc) + ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + + var wg sync.WaitGroup + var totalRequests int64 + start := time.Now() + + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + r := lim.Reserve(context.Background(), 30*time.Second, time.Now(), float64(reserveN)) + if r.OK() { + delay := r.DelayFrom(time.Now()) + <-time.After(delay) + } else { + panic("r not ok") + } + atomic.AddInt64(&totalRequests, 1) + } + }() + } + qps := float64(0) + var wait time.Duration + ch := make(chan struct{}) + go func() { + var windowRequests int64 + for { + elapsed := time.Since(start) + if elapsed >= testCaseRunTime { + close(ch) + break + } + windowRequests = atomic.SwapInt64(&totalRequests, 0) + qps = float64(windowRequests) + r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN)) + fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait) + wait = r.Delay() + time.Sleep(1 * time.Second) + } + }() + <-ch + cancel() + wg.Wait() + return qps, qps * float64(reserveN), wait +} From d3ab23b944c061dcd91c29c1ee2739db3dec5099 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 24 Jul 2024 16:11:59 +0800 Subject: [PATCH 2/4] controller: fix limiter cannot work well in high concurrency scenario Signed-off-by: nolouch --- client/resource_group/controller/limiter.go | 8 +++++-- .../resource_group/controller/limiter_test.go | 23 +++++++++---------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 985b03761fd..3349c096415 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -406,7 +406,9 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur } // Update state if ok { - lim.last = now + if lim.last.Before(now) { + lim.last = now + } lim.tokens = tokens lim.maybeNotify() } else { @@ -424,7 +426,9 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Int("remaining-notify-times", lim.remainingNotifyTimes), zap.String("name", lim.name)) } - lim.last = last + if lim.last.Before(now) { + lim.last = last + } if lim.limit == 0 { lim.notify() } else if lim.remainingNotifyTimes > 0 { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 7ff6b316f1b..22f0b96a474 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -220,28 +220,27 @@ func TestQPS(t *testing.T) { cases := []struct { concurrency int reserveN int64 - RU_PER_SEC int64 + ruPerSec int64 }{ - {10000, 10, 400000}, + {1000, 10, 400000}, } for _, tc := range cases { - t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) { - qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.RU_PER_SEC) + t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.ruPerSec), func(t *testing.T) { + qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.ruPerSec) t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime)) - re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)-ruSec), float64(1)) - re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)/float64(tc.reserveN)-qps), float64(1)) + re.LessOrEqual(math.Abs(float64(tc.ruPerSec)-ruSec), float64(10)*float64(tc.reserveN)) + re.LessOrEqual(math.Abs(float64(tc.ruPerSec)/float64(tc.reserveN)-qps), float64(10)) }) } } -const testCaseRunTime = 3 * time.Second +const testCaseRunTime = 4 * time.Second -func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) { +func testQPSCase(concurrency int, reserveN int64, limit int64) (qps float64, ru float64, needWait time.Duration) { nc := make(chan notifyMsg, 1) lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc) ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() var wg sync.WaitGroup var totalRequests int64 @@ -268,7 +267,7 @@ func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64 } }() } - qps := float64(0) + var vQPS atomic.Value var wait time.Duration ch := make(chan struct{}) go func() { @@ -280,9 +279,8 @@ func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64 break } windowRequests = atomic.SwapInt64(&totalRequests, 0) - qps = float64(windowRequests) + vQPS.Store(float64(windowRequests)) r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN)) - fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait) wait = r.Delay() time.Sleep(1 * time.Second) } @@ -290,5 +288,6 @@ func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64 <-ch cancel() wg.Wait() + qps = vQPS.Load().(float64) return qps, qps * float64(reserveN), wait } From 5e697e0dbdb21c81932672023eff79e85de67655 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 24 Jul 2024 16:44:24 +0800 Subject: [PATCH 3/4] make test stable Signed-off-by: nolouch --- client/resource_group/controller/limiter_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 22f0b96a474..4ce46703de8 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -229,8 +229,8 @@ func TestQPS(t *testing.T) { t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.ruPerSec), func(t *testing.T) { qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.ruPerSec) t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime)) - re.LessOrEqual(math.Abs(float64(tc.ruPerSec)-ruSec), float64(10)*float64(tc.reserveN)) - re.LessOrEqual(math.Abs(float64(tc.ruPerSec)/float64(tc.reserveN)-qps), float64(10)) + re.LessOrEqual(math.Abs(float64(tc.ruPerSec)-ruSec), float64(100)*float64(tc.reserveN)) + re.LessOrEqual(math.Abs(float64(tc.ruPerSec)/float64(tc.reserveN)-qps), float64(100)) }) } } From b209858100d6576eb364f3508bce3cc9622259e5 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 24 Jul 2024 16:53:06 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: nolouch --- client/resource_group/controller/limiter.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 3349c096415..faa2bad927e 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) + logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) if args.NewBurst < 0 { lim.last = now lim.tokens = args.NewTokens @@ -351,7 +351,7 @@ func (lim *Limiter) Reconfigure(now time.Time, opt(lim) } lim.maybeNotify() - logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) + logControllerTrace("[resource group controller] after reconfigure", zap.String("name", lim.name), zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) } // AvailableTokens decreases the amount of tokens currently available. @@ -362,6 +362,14 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 { return tokens } +func (lim *Limiter) updateLast(t time.Time) { + // make sure lim.last is monotonic + // see issue: https://github.com/tikv/pd/issues/8435. + if lim.last.Before(t) { + lim.last = t + } +} + const reserveWarnLogInterval = 10 * time.Millisecond // reserveN is a helper method for Reserve. @@ -406,9 +414,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur } // Update state if ok { - if lim.last.Before(now) { - lim.last = now - } + lim.updateLast(now) lim.tokens = tokens lim.maybeNotify() } else { @@ -426,9 +432,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Int("remaining-notify-times", lim.remainingNotifyTimes), zap.String("name", lim.name)) } - if lim.last.Before(now) { - lim.last = last - } + lim.updateLast(last) if lim.limit == 0 { lim.notify() } else if lim.remainingNotifyTimes > 0 {