From dfe893d1bfa07f822408543afd133acfdd9d1e16 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 9 Nov 2023 18:42:45 +0800 Subject: [PATCH] add metrics Signed-off-by: Cabinfever_B --- pkg/ratelimit/bbr.go | 3 + pkg/ratelimit/controller.go | 49 +++++++++- pkg/ratelimit/controller_test.go | 89 +++++++++++++++++-- pkg/ratelimit/limiter.go | 26 +++--- pkg/ratelimit/limiter_test.go | 9 +- pkg/ratelimit/option.go | 8 +- server/metrics.go | 18 ++++ server/server.go | 4 +- .../client/limit_and_backoff_test.go | 10 +++ 9 files changed, 189 insertions(+), 27 deletions(-) diff --git a/pkg/ratelimit/bbr.go b/pkg/ratelimit/bbr.go index 5c5679054f5..4022d9facc7 100644 --- a/pkg/ratelimit/bbr.go +++ b/pkg/ratelimit/bbr.go @@ -298,6 +298,9 @@ func (l *bbr) checkFullStatus() { } func (l *bbr) process() DoneFunc { + if l == nil { + return func() {} + } l.inFlightStat.Add(1) start := time.Now().UnixMicro() l.checkFullStatus() diff --git a/pkg/ratelimit/controller.go b/pkg/ratelimit/controller.go index bbcf7c46588..85f29f89ae2 100644 --- a/pkg/ratelimit/controller.go +++ b/pkg/ratelimit/controller.go @@ -15,8 +15,11 @@ package ratelimit import ( + "context" "sync" + "time" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" ) @@ -24,15 +27,57 @@ var emptyFunc = func() {} // Controller is a controller which holds multiple limiters to manage the request rate of different objects. type Controller struct { + ctx context.Context + apiType string limiters sync.Map // the label which is in labelAllowList won't be limited, and only inited by hard code. labelAllowList map[string]struct{} + + counter *prometheus.CounterVec + gauge *prometheus.GaugeVec } // NewController returns a global limiter which can be updated in the later. -func NewController() *Controller { - return &Controller{ +func NewController(ctx context.Context, typ string, baseCounter *prometheus.CounterVec, baseGauge *prometheus.GaugeVec) *Controller { + l := &Controller{ + ctx: ctx, + apiType: typ, labelAllowList: make(map[string]struct{}), + counter: baseCounter, + gauge: baseGauge, + } + go l.collectMetrics() + return l +} + +func (l *Controller) collectMetrics() { + tricker := time.NewTicker(time.Second) + defer tricker.Stop() + for { + select { + case <-l.ctx.Done(): + return + case <-tricker.C: + if l.gauge != nil { + l.limiters.Range(func(key, value any) bool { + limiter := value.(*limiter) + label := key.(string) + // Due to not in hot path, no need to save sub Gauge. + if con := limiter.getConcurrencyLimiter(); con != nil { + l.gauge.WithLabelValues(l.apiType, label, "concurrency").Set(float64(con.getCurrent())) + l.gauge.WithLabelValues(l.apiType, label, "concurrency-limit").Set(float64(con.getLimit())) + } + if bbr := limiter.getBBR(); bbr != nil { + if bbr.bbrStatus.getMinRT() != infRT { + l.gauge.WithLabelValues(l.apiType, label, "bdp").Set(float64(bbr.bbrStatus.getMaxInFlight())) + } else { + l.gauge.WithLabelValues(l.apiType, label, "bdp").Set(float64(0)) + } + } + return true + }) + } + } } } diff --git a/pkg/ratelimit/controller_test.go b/pkg/ratelimit/controller_test.go index c71de9bb9d7..6a8a3e63110 100644 --- a/pkg/ratelimit/controller_test.go +++ b/pkg/ratelimit/controller_test.go @@ -15,10 +15,17 @@ package ratelimit import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" "sync" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/syncutil" "golang.org/x/time/rate" @@ -87,7 +94,20 @@ func runMulitLabelLimiter(t *testing.T, limiter *Controller, testCase []labelCas func TestControllerWithConcurrencyLimiter(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + pctx := context.Background() + ctx, cancel := context.WithCancel(pctx) + defer cancel() + ts := httptest.NewServer(promhttp.Handler()) + defer ts.Close() + apiLimiterCounter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "testn", + Subsystem: "server", + Name: "api_limit", + Help: "Counter of requests denied for exceeding the limit.", + }, []string{"kind", "api", "type"}) + prometheus.MustRegister(apiLimiterCounter) + limiter := NewController(ctx, "grpc", apiLimiterCounter, nil) testCase := []labelCase{ { label: "test1", @@ -194,13 +214,28 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { }, } runMulitLabelLimiter(t, limiter, testCase) + // For test, sleep time needs longer than the push interval + time.Sleep(time.Second) + req, _ := http.NewRequest(http.MethodGet, ts.URL, nil) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + content, _ := io.ReadAll(resp.Body) + resp.Body.Close() + output := string(content) + re.Contains(output, "testn_server_api_limit{api=\"test2\",kind=\"grpc\",type=\"concurrency\"} 10") + re.Contains(output, "testn_server_api_limit{api=\"test2\",kind=\"grpc\",type=\"rate\"} 0") + re.Contains(output, "testn_server_api_limit{api=\"test1\",kind=\"grpc\",type=\"concurrency\"} 15") + re.Contains(output, "testn_server_api_limit{api=\"test1\",kind=\"grpc\",type=\"rate\"} 0") } func TestBlockList(t *testing.T) { t.Parallel() re := require.New(t) opts := []Option{AddLabelAllowList()} - limiter := NewController() + pctx := context.Background() + ctx, cancel := context.WithCancel(pctx) + defer cancel() + limiter := NewController(ctx, "http", nil, nil) label := "test" re.False(limiter.IsInAllowList(label)) @@ -220,7 +255,10 @@ func TestBlockList(t *testing.T) { func TestControllerWithQPSLimiter(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + pctx := context.Background() + ctx, cancel := context.WithCancel(pctx) + defer cancel() + limiter := NewController(ctx, "grpc", nil, nil) testCase := []labelCase{ { label: "test1", @@ -329,7 +367,10 @@ func TestControllerWithQPSLimiter(t *testing.T) { func TestControllerWithTwoLimiters(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + pctx := context.Background() + ctx, cancel := context.WithCancel(pctx) + defer cancel() + limiter := NewController(ctx, "grpc", nil, nil) testCase := []labelCase{ { label: "test1", @@ -422,7 +463,22 @@ func TestControllerWithTwoLimiters(t *testing.T) { func TestControllerWithEnableBBR(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + pctx := context.Background() + ctx, cancel := context.WithCancel(pctx) + defer cancel() + + ts := httptest.NewServer(promhttp.Handler()) + defer ts.Close() + limiterStatusGauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "testn", + Subsystem: "server", + Name: "limiter_status", + Help: "Status of the api limiter.", + }, []string{"kind", "api", "type"}) + prometheus.MustRegister(limiterStatusGauge) + + limiter := NewController(ctx, "grpc", nil, limiterStatusGauge) testCase := []labelCase{ { label: "test1", @@ -451,6 +507,17 @@ func TestControllerWithEnableBBR(t *testing.T) { re.Less(current, uint64(200)) re.Less(uint64(1), current) re.Equal(climit, current) + // For test, sleep time needs longer than the push interval + time.Sleep(time.Second) + req, _ := http.NewRequest(http.MethodGet, ts.URL, nil) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + content, _ := io.ReadAll(resp.Body) + resp.Body.Close() + output := string(content) + re.Contains(output, fmt.Sprintf("testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"bdp\"} %d", 0)) + re.Contains(output, fmt.Sprintf("testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency\"} %d", current)) + re.Contains(output, fmt.Sprintf("testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency-limit\"} %d", climit)) }, }, { @@ -518,6 +585,18 @@ func TestControllerWithEnableBBR(t *testing.T) { }, } runMulitLabelLimiter(t, limiter, testCase) + + // For test, sleep time needs longer than the push interval + time.Sleep(time.Second) + req, _ := http.NewRequest(http.MethodGet, ts.URL, nil) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + content, _ := io.ReadAll(resp.Body) + resp.Body.Close() + output := string(content) + re.Contains(output, "testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"bdp\"} 0") + re.Contains(output, "testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency\"} 200") + re.Contains(output, "testn_server_limiter_status{api=\"test1\",kind=\"grpc\",type=\"concurrency-limit\"} 200") } func countRateLimiterHandleResult(limiter *Controller, label string, successCount *int, diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go index d0596df8bf8..bb2dda495e3 100644 --- a/pkg/ratelimit/limiter.go +++ b/pkg/ratelimit/limiter.go @@ -15,6 +15,7 @@ package ratelimit import ( + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/syncutil" "golang.org/x/time/rate" @@ -40,12 +41,19 @@ type limiter struct { concurrency *concurrencyLimiter rate *RateLimiter bbr *bbr + + rateCounter prometheus.Counter + concurrencyCounter prometheus.Counter } -func newLimiter() *limiter { +func newLimiter(typ, api string, counter *prometheus.CounterVec) *limiter { lim := &limiter{ cfg: &DimensionConfig{}, } + if counter != nil { + lim.rateCounter = counter.WithLabelValues(typ, api, "rate") + lim.concurrencyCounter = counter.WithLabelValues(typ, api, "concurrency") + } return lim } @@ -214,6 +222,9 @@ func (l *limiter) updateDimensionConfig(cfg *DimensionConfig, op ...bbrOption) U func (l *limiter) allow() (DoneFunc, error) { concurrency := l.getConcurrencyLimiter() if concurrency != nil && !concurrency.allow() { + if l.concurrencyCounter != nil { + l.concurrencyCounter.Add(1) + } return nil, errs.ErrRateLimitExceeded } rate := l.getRateLimiter() @@ -221,17 +232,12 @@ func (l *limiter) allow() (DoneFunc, error) { if concurrency != nil { concurrency.release() } + if l.rateCounter != nil { + l.rateCounter.Add(1) + } return nil, errs.ErrRateLimitExceeded } - bbr := l.getBBR() - if bbr == nil { - return func() { - if concurrency != nil { - concurrency.release() - } - }, nil - } - done := bbr.process() + done := l.getBBR().process() return func() { done() if concurrency != nil { diff --git a/pkg/ratelimit/limiter_test.go b/pkg/ratelimit/limiter_test.go index 432c10dfda6..d2859c1f196 100644 --- a/pkg/ratelimit/limiter_test.go +++ b/pkg/ratelimit/limiter_test.go @@ -50,7 +50,7 @@ func TestWithConcurrencyLimiter(t *testing.T) { t.Parallel() re := require.New(t) - limiter := newLimiter() + limiter := newLimiter("http", "test1", nil) status := limiter.updateConcurrencyConfig(10) re.True(status&ConcurrencyChanged != 0) var lock syncutil.Mutex @@ -112,7 +112,7 @@ func TestWithConcurrencyLimiter(t *testing.T) { func TestWithQPSLimiter(t *testing.T) { t.Parallel() re := require.New(t) - limiter := newLimiter() + limiter := newLimiter("http", "test2", nil) status := limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1) re.True(status&QPSChanged != 0) @@ -189,7 +189,8 @@ func TestWithBBR(t *testing.T) { cfg := &DimensionConfig{ EnableBBR: true, } - limiter := newLimiter() + + limiter := newLimiter("http", "test3", nil) status := limiter.updateDimensionConfig(cfg, optsForTest...) re.True(status&BBRChanged != 0) re.True(status&QPSNoChange != 0) @@ -331,7 +332,7 @@ func TestWithTwoLimitersAndBBRConfig(t *testing.T) { QPSBurst: 100, ConcurrencyLimit: 100, } - limiter := newLimiter() + limiter := newLimiter("http", "test4", nil) status := limiter.updateDimensionConfig(cfg) re.True(status&QPSChanged != 0) re.True(status&ConcurrencyChanged != 0) diff --git a/pkg/ratelimit/option.go b/pkg/ratelimit/option.go index 3c644b734ff..40cf9eb5fe4 100644 --- a/pkg/ratelimit/option.go +++ b/pkg/ratelimit/option.go @@ -58,7 +58,7 @@ func updateConcurrencyLimiter(limit uint64) Option { if _, allow := l.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter)) return lim.(*limiter).updateConcurrencyConfig(limit) } } @@ -68,7 +68,7 @@ func updateQPSLimiter(limit float64, burst int) Option { if _, allow := l.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter)) return lim.(*limiter).updateQPSConfig(limit, burst) } } @@ -79,7 +79,7 @@ func UpdateDimensionConfig(cfg *DimensionConfig) Option { if _, allow := l.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter)) return lim.(*limiter).updateDimensionConfig(cfg) } } @@ -91,7 +91,7 @@ func UpdateDimensionConfigForTest(cfg *DimensionConfig, opt ...bbrOption) Option if _, allow := l.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, _ := l.limiters.LoadOrStore(label, newLimiter(l.apiType, label, l.counter)) return lim.(*limiter).updateDimensionConfig(cfg, opt...) } } diff --git a/server/metrics.go b/server/metrics.go index 94eb9bf19a2..23d60349181 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -159,6 +159,22 @@ var ( Name: "maxprocs", Help: "The value of GOMAXPROCS.", }) + + apiLimiterCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "api_limit", + Help: "Counter of requests denied for exceeding the limit.", + }, []string{"kind", "api", "type"}) + + limiterStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "limiter_status", + Help: "Status of the api limiter.", + }, []string{"kind", "api", "type"}) ) func init() { @@ -179,4 +195,6 @@ func init() { prometheus.MustRegister(serviceAuditHistogram) prometheus.MustRegister(bucketReportInterval) prometheus.MustRegister(serverMaxProcs) + prometheus.MustRegister(apiLimiterCounter) + prometheus.MustRegister(limiterStatusGauge) } diff --git a/server/server.go b/server/server.go index ff9bb3747b2..147bc360fb6 100644 --- a/server/server.go +++ b/server/server.go @@ -273,8 +273,8 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le audit.NewLocalLogBackend(true), audit.NewPrometheusHistogramBackend(serviceAuditHistogram, false), } - s.serviceRateLimiter = ratelimit.NewController() - s.grpcServiceRateLimiter = ratelimit.NewController() + s.serviceRateLimiter = ratelimit.NewController(s.ctx, "http", apiLimiterCounter, limiterStatusGauge) + s.grpcServiceRateLimiter = ratelimit.NewController(s.ctx, "grpc", apiLimiterCounter, limiterStatusGauge) s.serviceAuditBackendLabels = make(map[string]*audit.BackendLabels) s.serviceLabels = make(map[string][]apiutil.AccessPath) s.grpcServiceLabels = make(map[string]struct{}) diff --git a/tests/integrations/client/limit_and_backoff_test.go b/tests/integrations/client/limit_and_backoff_test.go index f7150b9e453..56b1a2ca790 100644 --- a/tests/integrations/client/limit_and_backoff_test.go +++ b/tests/integrations/client/limit_and_backoff_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "net/http" "reflect" @@ -230,4 +231,13 @@ func (suite *limitTestSuite) TestLimitStoreHeartbeart() { re.Less(fail, int32(15)) re.Greater(fail, int32(5)) suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/slowHeartbeat")) + + req, _ := http.NewRequest(http.MethodGet, suite.getLeader().GetAddr()+"/metrics", nil) + resp, err = dialClient.Do(req) + suite.NoError(err) + defer resp.Body.Close() + content, _ := io.ReadAll(resp.Body) + output := string(content) + fmt.Println(output) + suite.Contains(output, "pd_server_limiter_status{api=\"StoreHeartbeat\",kind=\"grpc\",type=\"bdp\"} 5") }