Skip to content

Commit

Permalink
add test for limiter
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Jul 24, 2024
1 parent ca179e6 commit 9f5522e
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package controller

import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -212,3 +214,81 @@ func TestCancelErrorOfReservation(t *testing.T) {
re.Error(err)
re.Contains(err.Error(), "context canceled")
}

func TestQPS(t *testing.T) {
re := require.New(t)
cases := []struct {
concurrency int
reserveN int64
RU_PER_SEC int64
}{
{10000, 10, 400000},
}

for _, tc := range cases {
t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) {
qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.RU_PER_SEC)
t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)-ruSec), float64(1))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)/float64(tc.reserveN)-qps), float64(1))
})
}
}

const testCaseRunTime = 3 * time.Second

func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) {
nc := make(chan notifyMsg, 1)
lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc)
ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

var wg sync.WaitGroup
var totalRequests int64
start := time.Now()

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
r := lim.Reserve(context.Background(), 30*time.Second, time.Now(), float64(reserveN))
if r.OK() {
delay := r.DelayFrom(time.Now())
<-time.After(delay)
} else {
panic("r not ok")
}
atomic.AddInt64(&totalRequests, 1)
}
}()
}
qps := float64(0)
var wait time.Duration
ch := make(chan struct{})
go func() {
var windowRequests int64
for {
elapsed := time.Since(start)
if elapsed >= testCaseRunTime {
close(ch)
break
}
windowRequests = atomic.SwapInt64(&totalRequests, 0)
qps = float64(windowRequests)
r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN))
fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait)
wait = r.Delay()
time.Sleep(1 * time.Second)
}
}()
<-ch
cancel()
wg.Wait()
return qps, qps * float64(reserveN), wait
}

0 comments on commit 9f5522e

Please sign in to comment.