diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 15364989cd7..98ad1bf86a1 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -20,8 +20,10 @@ package controller import ( "context" + "fmt" "math" "sync" + "sync/atomic" "testing" "time" @@ -212,3 +214,98 @@ func TestCancelErrorOfReservation(t *testing.T) { re.Error(err) re.Contains(err.Error(), "context canceled") } + +func TestQPS(t *testing.T) { + cases := []struct { + concurrency int + reserveN int64 + RU_PER_SEC int64 + }{ + {10000, 1000, 4000000}, + {5000, 1000, 4000000}, + {3840, 1000, 4000000}, + {1000, 1000, 4000000}, + {1000, 200, 4000000}, + {1000, 5000, 4000000}, + + {10000, 50, 400000}, + {5000, 50, 400000}, + {3840, 50, 400000}, + {1000, 50, 400000}, + {500, 50, 400000}, + {200, 50, 400000}, + {100, 50, 400000}, + {1000, 10, 400000}, + {1000, 250, 400000}, + + {10000, 500, 400000}, + {5000, 500, 400000}, + {3840, 500, 400000}, + {1000, 500, 400000}, + {1000, 100, 200000}, + {1000, 100, 400000}, + } + + for _, tc := range cases { + t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) { + qps, ruSec, waitTime := testQPSCase(t, tc.concurrency, tc.reserveN, tc.RU_PER_SEC) + t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime)) + }) + } +} + +func testQPSCase(t *testing.T, concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) { + nc := make(chan notifyMsg, 1) + lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc) + ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + + var wg sync.WaitGroup + var totalRequests int64 + start := time.Now() + + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + r := lim.Reserve(context.Background(), 30*time.Second, time.Now(), float64(reserveN)) + if r.OK() { + delay := r.DelayFrom(time.Now()) + <-time.After(delay) + } else { + panic("r not ok") + } + atomic.AddInt64(&totalRequests, 1) + } + }() + } + qps := float64(0) + var wait time.Duration + ch := make(chan struct{}) + go func() { + var windowRequests int64 + for { + elapsed := time.Since(start) + if elapsed >= 20*time.Second { + close(ch) + break + } + windowRequests = atomic.SwapInt64(&totalRequests, 0) + qps = float64(windowRequests) + r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN)) + wait = r.Delay() + // fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait) + time.Sleep(1 * time.Second) + } + }() + <-ch + cancel() + wg.Wait() + return qps, qps * float64(reserveN), wait +}