Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Nov 9, 2023
1 parent 2feca12 commit dfe893d
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 27 deletions.
3 changes: 3 additions & 0 deletions pkg/ratelimit/bbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ func (l *bbr) checkFullStatus() {
}

func (l *bbr) process() DoneFunc {
if l == nil {
return func() {}
}
l.inFlightStat.Add(1)
start := time.Now().UnixMicro()
l.checkFullStatus()
Expand Down
49 changes: 47 additions & 2 deletions pkg/ratelimit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,69 @@
package ratelimit

import (
"context"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

var emptyFunc = func() {}

// Controller is a controller which holds multiple limiters to manage the request rate of different objects.
type Controller struct {
ctx context.Context
apiType string
limiters sync.Map
// the label which is in labelAllowList won't be limited, and only inited by hard code.
labelAllowList map[string]struct{}

counter *prometheus.CounterVec
gauge *prometheus.GaugeVec
}

// NewController returns a global limiter which can be updated in the later.
func NewController() *Controller {
return &Controller{
func NewController(ctx context.Context, typ string, baseCounter *prometheus.CounterVec, baseGauge *prometheus.GaugeVec) *Controller {
l := &Controller{
ctx: ctx,
apiType: typ,
labelAllowList: make(map[string]struct{}),
counter: baseCounter,
gauge: baseGauge,
}
go l.collectMetrics()
return l
}

func (l *Controller) collectMetrics() {
tricker := time.NewTicker(time.Second)
defer tricker.Stop()
for {
select {
case <-l.ctx.Done():
return
case <-tricker.C:
if l.gauge != nil {
l.limiters.Range(func(key, value any) bool {
limiter := value.(*limiter)
label := key.(string)
// Due to not in hot path, no need to save sub Gauge.
if con := limiter.getConcurrencyLimiter(); con != nil {
l.gauge.WithLabelValues(l.apiType, label, "concurrency").Set(float64(con.getCurrent()))
l.gauge.WithLabelValues(l.apiType, label, "concurrency-limit").Set(float64(con.getLimit()))
}
if bbr := limiter.getBBR(); bbr != nil {
if bbr.bbrStatus.getMinRT() != infRT {
l.gauge.WithLabelValues(l.apiType, label, "bdp").Set(float64(bbr.bbrStatus.getMaxInFlight()))
} else {
l.gauge.WithLabelValues(l.apiType, label, "bdp").Set(float64(0))
}
}
return true
})
}
}
}
}

Expand Down
89 changes: 84 additions & 5 deletions pkg/ratelimit/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
package ratelimit

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/syncutil"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -87,7 +94,20 @@ func runMulitLabelLimiter(t *testing.T, limiter *Controller, testCase []labelCas
func TestControllerWithConcurrencyLimiter(t *testing.T) {
t.Parallel()
re := require.New(t)
limiter := NewController()
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()
ts := httptest.NewServer(promhttp.Handler())
defer ts.Close()
apiLimiterCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "testn",
Subsystem: "server",
Name: "api_limit",
Help: "Counter of requests denied for exceeding the limit.",
}, []string{"kind", "api", "type"})
prometheus.MustRegister(apiLimiterCounter)
limiter := NewController(ctx, "grpc", apiLimiterCounter, nil)
testCase := []labelCase{
{
label: "test1",
Expand Down Expand Up @@ -194,13 +214,28 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
},
}
runMulitLabelLimiter(t, limiter, testCase)
// For test, sleep time needs longer than the push interval
time.Sleep(time.Second)
req, _ := http.NewRequest(http.MethodGet, ts.URL, nil)
resp, err := http.DefaultClient.Do(req)
re.NoError(err)
content, _ := io.ReadAll(resp.Body)
resp.Body.Close()
output := string(content)
re.Contains(output, "testn_server_api_limit{api=\"test2\",kind=\"grpc\",type=\"concurrency\"} 10")
re.Contains(output, "testn_server_api_limit{api=\"test2\",kind=\"grpc\",type=\"rate\"} 0")
re.Contains(output, "testn_server_api_limit{api=\"test1\",kind=\"grpc\",type=\"concurrency\"} 15")
re.Contains(output, "testn_server_api_limit{api=\"test1\",kind=\"grpc\",type=\"rate\"} 0")
}

func TestBlockList(t *testing.T) {
t.Parallel()
re := require.New(t)
opts := []Option{AddLabelAllowList()}
limiter := NewController()
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()
limiter := NewController(ctx, "http", nil, nil)
label := "test"

re.False(limiter.IsInAllowList(label))
Expand All @@ -220,7 +255,10 @@ func TestBlockList(t *testing.T) {
func TestControllerWithQPSLimiter(t *testing.T) {
t.Parallel()
re := require.New(t)
limiter := NewController()
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()
limiter := NewController(ctx, "grpc", nil, nil)
testCase := []labelCase{
{
label: "test1",
Expand Down Expand Up @@ -329,7 +367,10 @@ func TestControllerWithQPSLimiter(t *testing.T) {
func TestControllerWithTwoLimiters(t *testing.T) {
t.Parallel()
re := require.New(t)
limiter := NewController()
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()
limiter := NewController(ctx, "grpc", nil, nil)
testCase := []labelCase{
{
label: "test1",
Expand Down Expand Up @@ -422,7 +463,22 @@ func TestControllerWithTwoLimiters(t *testing.T) {
func TestControllerWithEnableBBR(t *testing.T) {
t.Parallel()
re := require.New(t)
limiter := NewController()
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()

ts := httptest.NewServer(promhttp.Handler())
defer ts.Close()
limiterStatusGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "testn",
Subsystem: "server",
Name: "limiter_status",
Help: "Status of the api limiter.",
}, []string{"kind", "api", "type"})
prometheus.MustRegister(limiterStatusGauge)

limiter := NewController(ctx, "grpc", nil, limiterStatusGauge)
testCase := []labelCase{
{
label: "test1",
Expand Down Expand Up @@ -451,6 +507,17 @@ func TestControllerWithEnableBBR(t *testing.T) {
re.Less(current, uint64(200))
re.Less(uint64(1), current)
re.Equal(climit, current)
// For test, sleep time needs longer than the push interval
time.Sleep(time.Second)
req, _ := http.NewRequest(http.MethodGet, ts.URL, nil)
resp, err := http.DefaultClient.Do(req)
re.NoError(err)
content, _ := io.ReadAll(resp.Body)
resp.Body.Close()
output := string(content)
re.Contains(output, fmt.Sprintf("testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"bdp\"} %d", 0))
re.Contains(output, fmt.Sprintf("testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency\"} %d", current))
re.Contains(output, fmt.Sprintf("testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency-limit\"} %d", climit))
},
},
{
Expand Down Expand Up @@ -518,6 +585,18 @@ func TestControllerWithEnableBBR(t *testing.T) {
},
}
runMulitLabelLimiter(t, limiter, testCase)

// For test, sleep time needs longer than the push interval
time.Sleep(time.Second)
req, _ := http.NewRequest(http.MethodGet, ts.URL, nil)
resp, err := http.DefaultClient.Do(req)
re.NoError(err)
content, _ := io.ReadAll(resp.Body)
resp.Body.Close()
output := string(content)
re.Contains(output, "testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"bdp\"} 0")
re.Contains(output, "testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency\"} 200")
re.Contains(output, "testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency-limit\"} 200")
}

func countRateLimiterHandleResult(limiter *Controller, label string, successCount *int,
Expand Down
26 changes: 16 additions & 10 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ratelimit

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/syncutil"
"golang.org/x/time/rate"
Expand All @@ -40,12 +41,19 @@ type limiter struct {
concurrency *concurrencyLimiter
rate *RateLimiter
bbr *bbr

rateCounter prometheus.Counter
concurrencyCounter prometheus.Counter
}

func newLimiter() *limiter {
func newLimiter(typ, api string, counter *prometheus.CounterVec) *limiter {
lim := &limiter{
cfg: &DimensionConfig{},
}
if counter != nil {
lim.rateCounter = counter.WithLabelValues(typ, api, "rate")
lim.concurrencyCounter = counter.WithLabelValues(typ, api, "concurrency")
}
return lim
}

Expand Down Expand Up @@ -214,24 +222,22 @@ func (l *limiter) updateDimensionConfig(cfg *DimensionConfig, op ...bbrOption) U
func (l *limiter) allow() (DoneFunc, error) {
concurrency := l.getConcurrencyLimiter()
if concurrency != nil && !concurrency.allow() {
if l.concurrencyCounter != nil {
l.concurrencyCounter.Add(1)
}
return nil, errs.ErrRateLimitExceeded
}
rate := l.getRateLimiter()
if rate != nil && !rate.Allow() {
if concurrency != nil {
concurrency.release()
}
if l.rateCounter != nil {
l.rateCounter.Add(1)
}
return nil, errs.ErrRateLimitExceeded
}
bbr := l.getBBR()
if bbr == nil {
return func() {
if concurrency != nil {
concurrency.release()
}
}, nil
}
done := bbr.process()
done := l.getBBR().process()
return func() {
done()
if concurrency != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/ratelimit/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestWithConcurrencyLimiter(t *testing.T) {
t.Parallel()
re := require.New(t)

limiter := newLimiter()
limiter := newLimiter("http", "test1", nil)
status := limiter.updateConcurrencyConfig(10)
re.True(status&ConcurrencyChanged != 0)
var lock syncutil.Mutex
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestWithConcurrencyLimiter(t *testing.T) {
func TestWithQPSLimiter(t *testing.T) {
t.Parallel()
re := require.New(t)
limiter := newLimiter()
limiter := newLimiter("http", "test2", nil)
status := limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1)
re.True(status&QPSChanged != 0)

Expand Down Expand Up @@ -189,7 +189,8 @@ func TestWithBBR(t *testing.T) {
cfg := &DimensionConfig{
EnableBBR: true,
}
limiter := newLimiter()

limiter := newLimiter("http", "test3", nil)
status := limiter.updateDimensionConfig(cfg, optsForTest...)
re.True(status&BBRChanged != 0)
re.True(status&QPSNoChange != 0)
Expand Down Expand Up @@ -331,7 +332,7 @@ func TestWithTwoLimitersAndBBRConfig(t *testing.T) {
QPSBurst: 100,
ConcurrencyLimit: 100,
}
limiter := newLimiter()
limiter := newLimiter("http", "test4", nil)
status := limiter.updateDimensionConfig(cfg)
re.True(status&QPSChanged != 0)
re.True(status&ConcurrencyChanged != 0)
Expand Down
8 changes: 4 additions & 4 deletions pkg/ratelimit/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func updateConcurrencyLimiter(limit uint64) Option {
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter))
return lim.(*limiter).updateConcurrencyConfig(limit)
}
}
Expand All @@ -68,7 +68,7 @@ func updateQPSLimiter(limit float64, burst int) Option {
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter))
return lim.(*limiter).updateQPSConfig(limit, burst)
}
}
Expand All @@ -79,7 +79,7 @@ func UpdateDimensionConfig(cfg *DimensionConfig) Option {
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter))
return lim.(*limiter).updateDimensionConfig(cfg)
}
}
Expand All @@ -91,7 +91,7 @@ func UpdateDimensionConfigForTest(cfg *DimensionConfig, opt ...bbrOption) Option
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter))
return lim.(*limiter).updateDimensionConfig(cfg, opt...)
}
}
Loading

0 comments on commit dfe893d

Please sign in to comment.