From b7b45374d93eada1ec7cb1c2fccf9fb256d8267c Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Thu, 28 Dec 2023 13:54:27 +0800 Subject: [PATCH] *: add API concurrency metrics (#7541) ref tikv/pd#7167 Signed-off-by: Cabinfever_B --- metrics/grafana/pd.json | 202 +++++++++++- pkg/ratelimit/concurrency_limiter.go | 20 +- pkg/ratelimit/concurrency_limiter_test.go | 15 + pkg/ratelimit/controller.go | 51 ++- pkg/ratelimit/controller_test.go | 15 +- pkg/ratelimit/limiter.go | 19 +- pkg/ratelimit/limiter_test.go | 2 +- pkg/ratelimit/option.go | 11 + server/api/service_middleware_test.go | 42 +-- server/config/service_middleware_config.go | 4 +- server/grpc_service.go | 344 ++++++++++++++++++++- server/metrics.go | 9 + server/server.go | 22 +- 13 files changed, 693 insertions(+), 63 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index a242235b204..44a02803187 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -8781,7 +8781,7 @@ "format": "time_series", "intervalFactor": 2, "legendFormat": "{{grpc_method}}", - "refId": "A", + "refId": "B", "step": 4 } ], @@ -8826,6 +8826,104 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The concurrency number of each kind of gRPC commands", + "editable": true, + "error": false, + "fill": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 127 + }, + "id": 903, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "paceLength": 10, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "pd_server_api_concurrency{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", kind=\"grpc\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{api}}", + "refId": "E", + "step": 20 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "gRPC commands concurrency number", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null, @@ -8858,7 +8956,7 @@ "h": 8, "w": 12, "x": 0, - "y": 119 + "y": 135 }, "id": 1001, "legend": { @@ -8954,7 +9052,7 @@ "h": 8, "w": 12, "x": 12, - "y": 119 + "y": 135 }, "id": 1002, "legend": { @@ -9036,6 +9134,104 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The concurrency number of each kind of HTTP commands", + "editable": true, + "error": false, + "fill": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 143 + }, + "id": 1003, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "paceLength": 10, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "pd_server_api_concurrency{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", kind=\"http\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{api}}", + "refId": "E", + "step": 20 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "HTTP commands concurrency number", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null, diff --git a/pkg/ratelimit/concurrency_limiter.go b/pkg/ratelimit/concurrency_limiter.go index f9b20c8f005..b1eef3c8101 100644 --- a/pkg/ratelimit/concurrency_limiter.go +++ b/pkg/ratelimit/concurrency_limiter.go @@ -20,18 +20,26 @@ type concurrencyLimiter struct { mu syncutil.RWMutex current uint64 limit uint64 + + // statistic + maxLimit uint64 } func newConcurrencyLimiter(limit uint64) *concurrencyLimiter { return &concurrencyLimiter{limit: limit} } +const unlimit = uint64(0) + func (l *concurrencyLimiter) allow() bool { l.mu.Lock() defer l.mu.Unlock() - if l.current+1 <= l.limit { + if l.limit == unlimit || l.current+1 <= l.limit { l.current++ + if l.current > l.maxLimit { + l.maxLimit = l.current + } return true } return false @@ -66,3 +74,13 @@ func (l *concurrencyLimiter) getCurrent() uint64 { return l.current } + +func (l *concurrencyLimiter) getMaxConcurrency() uint64 { + l.mu.Lock() + defer func() { + l.maxLimit = l.current + l.mu.Unlock() + }() + + return l.maxLimit +} diff --git a/pkg/ratelimit/concurrency_limiter_test.go b/pkg/ratelimit/concurrency_limiter_test.go index 4722e243b03..5fe03740394 100644 --- a/pkg/ratelimit/concurrency_limiter_test.go +++ b/pkg/ratelimit/concurrency_limiter_test.go @@ -31,9 +31,24 @@ func TestConcurrencyLimiter(t *testing.T) { cl.release() re.True(cl.allow()) re.Equal(uint64(10), cl.getLimit()) + re.Equal(uint64(10), cl.getMaxConcurrency()) + re.Equal(uint64(10), cl.getMaxConcurrency()) cl.setLimit(5) re.Equal(uint64(5), cl.getLimit()) re.Equal(uint64(10), cl.getCurrent()) cl.release() re.Equal(uint64(9), cl.getCurrent()) + for i := 0; i < 9; i++ { + cl.release() + } + re.Equal(uint64(10), cl.getMaxConcurrency()) + for i := 0; i < 5; i++ { + re.True(cl.allow()) + } + re.Equal(uint64(5), cl.getCurrent()) + for i := 0; i < 5; i++ { + cl.release() + } + re.Equal(uint64(5), cl.getMaxConcurrency()) + re.Equal(uint64(0), cl.getMaxConcurrency()) } diff --git a/pkg/ratelimit/controller.go b/pkg/ratelimit/controller.go index 0c95be9b11b..33614cb3423 100644 --- a/pkg/ratelimit/controller.go +++ b/pkg/ratelimit/controller.go @@ -15,11 +15,16 @@ package ratelimit import ( + "context" "sync" + "time" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" ) +const limiterMetricsInterval = time.Second * 15 + var emptyFunc = func() {} // Controller is a controller which holds multiple limiters to manage the request rate of different objects. @@ -27,12 +32,52 @@ type Controller struct { limiters sync.Map // the label which is in labelAllowList won't be limited, and only inited by hard code. labelAllowList map[string]struct{} + + ctx context.Context + cancel context.CancelFunc + apiType string + concurrencyGauge *prometheus.GaugeVec } // NewController returns a global limiter which can be updated in the later. -func NewController() *Controller { - return &Controller{ - labelAllowList: make(map[string]struct{}), +func NewController(ctx context.Context, typ string, concurrencyGauge *prometheus.GaugeVec) *Controller { + ctx, cancel := context.WithCancel(ctx) + l := &Controller{ + ctx: ctx, + cancel: cancel, + labelAllowList: make(map[string]struct{}), + apiType: typ, + concurrencyGauge: concurrencyGauge, + } + if concurrencyGauge != nil { + go l.collectMetrics() + } + return l +} + +// Close closes the Controller. +func (l *Controller) Close() { + l.cancel() +} + +func (l *Controller) collectMetrics() { + tricker := time.NewTicker(limiterMetricsInterval) + defer tricker.Stop() + for { + select { + case <-l.ctx.Done(): + return + case <-tricker.C: + l.limiters.Range(func(key, value any) bool { + limiter := value.(*limiter) + label := key.(string) + // Due to not in hot path, no need to save sub Gauge. + if con := limiter.getConcurrencyLimiter(); con != nil { + l.concurrencyGauge.WithLabelValues(l.apiType, label).Set(float64(con.getMaxConcurrency())) + } + return true + }) + } } } diff --git a/pkg/ratelimit/controller_test.go b/pkg/ratelimit/controller_test.go index 59cc0c16445..48a5ee2054b 100644 --- a/pkg/ratelimit/controller_test.go +++ b/pkg/ratelimit/controller_test.go @@ -15,6 +15,7 @@ package ratelimit import ( + "context" "sync" "testing" "time" @@ -79,7 +80,8 @@ func runMulitLabelLimiter(t *testing.T, limiter *Controller, testCase []labelCas func TestControllerWithConcurrencyLimiter(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + limiter := NewController(context.Background(), "grpc", nil) + defer limiter.Close() testCase := []labelCase{ { label: "test1", @@ -140,7 +142,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { checkStatusFunc: func(label string) { limit, current := limiter.GetConcurrencyLimiterStatus(label) re.Equal(uint64(0), limit) - re.Equal(uint64(0), current) + re.Equal(uint64(10), current) }, }, }, @@ -192,7 +194,8 @@ func TestBlockList(t *testing.T) { t.Parallel() re := require.New(t) opts := []Option{AddLabelAllowList()} - limiter := NewController() + limiter := NewController(context.Background(), "grpc", nil) + defer limiter.Close() label := "test" re.False(limiter.IsInAllowList(label)) @@ -212,7 +215,8 @@ func TestBlockList(t *testing.T) { func TestControllerWithQPSLimiter(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + limiter := NewController(context.Background(), "grpc", nil) + defer limiter.Close() testCase := []labelCase{ { label: "test1", @@ -321,7 +325,8 @@ func TestControllerWithQPSLimiter(t *testing.T) { func TestControllerWithTwoLimiters(t *testing.T) { t.Parallel() re := require.New(t) - limiter := NewController() + limiter := NewController(context.Background(), "grpc", nil) + defer limiter.Close() testCase := []labelCase{ { label: "test1", diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go index 444b5aa2481..dc744d9ac1b 100644 --- a/pkg/ratelimit/limiter.go +++ b/pkg/ratelimit/limiter.go @@ -41,7 +41,9 @@ type limiter struct { } func newLimiter() *limiter { - lim := &limiter{} + lim := &limiter{ + concurrency: newConcurrencyLimiter(0), + } return lim } @@ -64,13 +66,6 @@ func (l *limiter) deleteRateLimiter() bool { return l.isEmpty() } -func (l *limiter) deleteConcurrency() bool { - l.mu.Lock() - defer l.mu.Unlock() - l.concurrency = nil - return l.isEmpty() -} - func (l *limiter) isEmpty() bool { return l.concurrency == nil && l.rate == nil } @@ -96,14 +91,14 @@ func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus { if oldConcurrencyLimit == limit { return ConcurrencyNoChange } - if limit < 1 { - l.deleteConcurrency() - return ConcurrencyDeleted - } l.mu.Lock() defer l.mu.Unlock() if l.concurrency != nil { + if limit < 1 { + l.concurrency.setLimit(0) + return ConcurrencyDeleted + } l.concurrency.setLimit(limit) } else { l.concurrency = newConcurrencyLimiter(limit) diff --git a/pkg/ratelimit/limiter_test.go b/pkg/ratelimit/limiter_test.go index 88da865879b..fabb9d98917 100644 --- a/pkg/ratelimit/limiter_test.go +++ b/pkg/ratelimit/limiter_test.go @@ -99,7 +99,7 @@ func TestWithConcurrencyLimiter(t *testing.T) { limit, current = limiter.getConcurrencyLimiterStatus() re.Equal(uint64(0), limit) - re.Equal(uint64(0), current) + re.Equal(uint64(15), current) } func TestWithQPSLimiter(t *testing.T) { diff --git a/pkg/ratelimit/option.go b/pkg/ratelimit/option.go index b1cc459d786..f1faac5b550 100644 --- a/pkg/ratelimit/option.go +++ b/pkg/ratelimit/option.go @@ -81,3 +81,14 @@ func UpdateDimensionConfig(cfg *DimensionConfig) Option { return lim.(*limiter).updateDimensionConfig(cfg) } } + +// InitLimiter creates empty concurrency limiter for a given label by config if it doesn't exist. +func InitLimiter() Option { + return func(label string, l *Controller) UpdateStatus { + if _, allow := l.labelAllowList[label]; allow { + return InAllowList + } + l.limiters.LoadOrStore(label, newLimiter()) + return ConcurrencyChanged + } +} diff --git a/server/api/service_middleware_test.go b/server/api/service_middleware_test.go index 584e3600a4f..c4db58c39ae 100644 --- a/server/api/service_middleware_test.go +++ b/server/api/service_middleware_test.go @@ -62,31 +62,23 @@ func (suite *auditMiddlewareTestSuite) TestConfigAuditSwitch() { re.True(sc.EnableAudit) ms := map[string]interface{}{ - "enable-audit": "true", - "enable-rate-limit": "true", - "enable-grpc-rate-limit": "true", + "audit.enable-audit": "false", } postData, err := json.Marshal(ms) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re))) sc = &config.ServiceMiddlewareConfig{} re.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc)) - re.True(sc.EnableAudit) - re.True(sc.RateLimitConfig.EnableRateLimit) - re.True(sc.GRPCRateLimitConfig.EnableRateLimit) + re.False(sc.EnableAudit) ms = map[string]interface{}{ - "audit.enable-audit": "false", - "enable-rate-limit": "false", - "enable-grpc-rate-limit": "false", + "enable-audit": "true", } postData, err = json.Marshal(ms) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re))) sc = &config.ServiceMiddlewareConfig{} re.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc)) - re.False(sc.EnableAudit) - re.False(sc.RateLimitConfig.EnableRateLimit) - re.False(sc.GRPCRateLimitConfig.EnableRateLimit) + re.True(sc.EnableAudit) // test empty ms = map[string]interface{}{} @@ -101,7 +93,7 @@ func (suite *auditMiddlewareTestSuite) TestConfigAuditSwitch() { re.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "config item audit not found"))) re.NoError(failpoint.Enable("github.com/tikv/pd/server/config/persistServiceMiddlewareFail", "return(true)")) ms = map[string]interface{}{ - "audit.enable-audit": "true", + "audit.enable-audit": "false", } postData, err = json.Marshal(ms) re.NoError(err) @@ -389,31 +381,31 @@ func (suite *rateLimitConfigTestSuite) TestConfigRateLimitSwitch() { sc := &config.ServiceMiddlewareConfig{} re := suite.Require() re.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc)) - re.False(sc.RateLimitConfig.EnableRateLimit) - re.False(sc.GRPCRateLimitConfig.EnableRateLimit) + re.True(sc.RateLimitConfig.EnableRateLimit) + re.True(sc.GRPCRateLimitConfig.EnableRateLimit) ms := map[string]interface{}{ - "enable-rate-limit": "true", - "enable-grpc-rate-limit": "true", + "enable-rate-limit": "false", + "enable-grpc-rate-limit": "false", } postData, err := json.Marshal(ms) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re))) sc = &config.ServiceMiddlewareConfig{} re.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc)) - re.True(sc.RateLimitConfig.EnableRateLimit) - re.True(sc.GRPCRateLimitConfig.EnableRateLimit) + re.False(sc.RateLimitConfig.EnableRateLimit) + re.False(sc.GRPCRateLimitConfig.EnableRateLimit) ms = map[string]interface{}{ - "enable-rate-limit": "false", - "enable-grpc-rate-limit": "false", + "enable-rate-limit": "true", + "enable-grpc-rate-limit": "true", } postData, err = json.Marshal(ms) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re))) sc = &config.ServiceMiddlewareConfig{} re.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc)) - re.False(sc.RateLimitConfig.EnableRateLimit) - re.False(sc.GRPCRateLimitConfig.EnableRateLimit) + re.True(sc.RateLimitConfig.EnableRateLimit) + re.True(sc.GRPCRateLimitConfig.EnableRateLimit) // test empty ms = map[string]interface{}{} @@ -428,8 +420,8 @@ func (suite *rateLimitConfigTestSuite) TestConfigRateLimitSwitch() { re.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "config item rate-limit not found"))) re.NoError(failpoint.Enable("github.com/tikv/pd/server/config/persistServiceMiddlewareFail", "return(true)")) ms = map[string]interface{}{ - "rate-limit.enable-rate-limit": "true", - "grpc-rate-limit.enable-grpc-rate-limit": "true", + "rate-limit.enable-rate-limit": "false", + "grpc-rate-limit.enable-grpc-rate-limit": "false", } postData, err = json.Marshal(ms) re.NoError(err) diff --git a/server/config/service_middleware_config.go b/server/config/service_middleware_config.go index b13e3398ac5..223e19dba12 100644 --- a/server/config/service_middleware_config.go +++ b/server/config/service_middleware_config.go @@ -18,8 +18,8 @@ import "github.com/tikv/pd/pkg/ratelimit" const ( defaultEnableAuditMiddleware = true - defaultEnableRateLimitMiddleware = false - defaultEnableGRPCRateLimitMiddleware = false + defaultEnableRateLimitMiddleware = true + defaultEnableGRPCRateLimitMiddleware = true ) // ServiceMiddlewareConfig is the configuration for PD Service middleware. diff --git a/server/grpc_service.go b/server/grpc_service.go index 24280f46437..14eea9442c7 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -59,6 +59,8 @@ const ( retryIntervalRequestTSOServer = 500 * time.Millisecond getMinTSFromTSOServerTimeout = 1 * time.Second defaultGRPCDialTimeout = 3 * time.Second + + gRPCServiceName = "pdpb.PD" ) // gRPC errors @@ -282,6 +284,17 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR func (s *GrpcServer) GetMinTS( ctx context.Context, request *pdpb.GetMinTSRequest, ) (*pdpb.GetMinTSResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.GetMinTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetMinTS(ctx, request) } @@ -492,6 +505,15 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb // Tso implements gRPC PDServer. func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return err + } + } if s.IsAPIServiceMode() { return s.forwardTSO(stream) } @@ -568,6 +590,17 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { // Bootstrap implements gRPC PDServer. func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.BootstrapResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).Bootstrap(ctx, request) } @@ -601,6 +634,17 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque // IsBootstrapped implements gRPC PDServer. func (s *GrpcServer) IsBootstrapped(ctx context.Context, request *pdpb.IsBootstrappedRequest) (*pdpb.IsBootstrappedResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.IsBootstrappedResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).IsBootstrapped(ctx, request) } @@ -619,6 +663,17 @@ func (s *GrpcServer) IsBootstrapped(ctx context.Context, request *pdpb.IsBootstr // AllocID implements gRPC PDServer. func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) (*pdpb.AllocIDResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.AllocIDResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).AllocID(ctx, request) } @@ -644,6 +699,17 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) // IsSnapshotRecovering implements gRPC PDServer. func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, request *pdpb.IsSnapshotRecoveringRequest) (*pdpb.IsSnapshotRecoveringResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.IsSnapshotRecoveringResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } // recovering mark is stored in etcd directly, there's no need to forward. marked, err := s.Server.IsSnapshotRecovering(ctx) if err != nil { @@ -678,7 +744,6 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest } else if rsp != nil { return rsp.(*pdpb.GetStoreResponse), err } - rc := s.GetRaftCluster() if rc == nil { return &pdpb.GetStoreResponse{Header: s.notBootstrappedHeader()}, nil @@ -716,6 +781,17 @@ func checkStore(rc *cluster.RaftCluster, storeID uint64) *pdpb.Error { // PutStore implements gRPC PDServer. func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (*pdpb.PutStoreResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.PutStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).PutStore(ctx, request) } @@ -988,6 +1064,15 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { cancel() } }() + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return err + } + } for { request, err := server.Recv() failpoint.Inject("grpcClientClosed", func() { @@ -1088,7 +1173,15 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error cancel() } }() - + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return err + } + } for { request, err := server.Recv() if err == io.EOF { @@ -1290,7 +1383,7 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque defer done() } else { return &pdpb.GetRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } } @@ -1334,7 +1427,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR defer done() } else { return &pdpb.GetRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } } @@ -1379,7 +1472,7 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB defer done() } else { return &pdpb.GetRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } } @@ -1423,7 +1516,7 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR defer done() } else { return &pdpb.ScanRegionsResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } } @@ -1462,6 +1555,17 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR // AskSplit implements gRPC PDServer. func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.AskSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).AskSplit(ctx, request) } @@ -1500,6 +1604,17 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { @@ -1568,6 +1683,17 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp // ReportSplit implements gRPC PDServer. func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitRequest) (*pdpb.ReportSplitResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.ReportSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).ReportSplit(ctx, request) } @@ -1595,6 +1721,17 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR // ReportBatchSplit implements gRPC PDServer. func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportBatchSplitRequest) (*pdpb.ReportBatchSplitResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.ReportBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).ReportBatchSplit(ctx, request) } @@ -1623,6 +1760,17 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB // GetClusterConfig implements gRPC PDServer. func (s *GrpcServer) GetClusterConfig(ctx context.Context, request *pdpb.GetClusterConfigRequest) (*pdpb.GetClusterConfigResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.GetClusterConfigResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetClusterConfig(ctx, request) } @@ -1644,6 +1792,17 @@ func (s *GrpcServer) GetClusterConfig(ctx context.Context, request *pdpb.GetClus // PutClusterConfig implements gRPC PDServer. func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClusterConfigRequest) (*pdpb.PutClusterConfigResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.PutClusterConfigResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).PutClusterConfig(ctx, request) } @@ -1674,6 +1833,17 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { @@ -1777,6 +1947,17 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg // GetGCSafePoint implements gRPC PDServer. func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafePointRequest) (*pdpb.GetGCSafePointResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.GetGCSafePointResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetGCSafePoint(ctx, request) } @@ -1807,6 +1988,15 @@ func (s *GrpcServer) SyncRegions(stream pdpb.PD_SyncRegionsServer) error { if s.IsClosed() || s.cluster == nil { return ErrNotStarted } + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return err + } + } ctx := s.cluster.Context() if ctx == nil { return ErrNotStarted @@ -1816,6 +2006,17 @@ func (s *GrpcServer) SyncRegions(stream pdpb.PD_SyncRegionsServer) error { // UpdateGCSafePoint implements gRPC PDServer. func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSafePointRequest) (*pdpb.UpdateGCSafePointResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.UpdateGCSafePointResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).UpdateGCSafePoint(ctx, request) } @@ -1854,6 +2055,17 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update // UpdateServiceGCSafePoint update the safepoint for specific service func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.UpdateServiceGCSafePointRequest) (*pdpb.UpdateServiceGCSafePointResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.UpdateServiceGCSafePointResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).UpdateServiceGCSafePoint(ctx, request) } @@ -1899,6 +2111,17 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.GetOperatorResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { @@ -2079,6 +2302,17 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest if err := s.validateInternalRequest(request.GetHeader(), true); err != nil { return nil, err } + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } tsoAllocatorManager := s.GetTSOAllocatorManager() // There is no dc-location found in this server, return err. if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 { @@ -2173,6 +2407,17 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.SplitRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { @@ -2226,6 +2471,17 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion // Only regions which split successfully will be scattered. // scatterFinishedPercentage indicates the percentage of successfully splited regions that are scattered. func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.SplitAndScatterRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).SplitAndScatterRegions(ctx, request) } @@ -2281,6 +2537,17 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL if !s.member.IsLeader() { return nil, ErrNotLeader } + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.GetDCLocationInfoResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } am := s.GetTSOAllocatorManager() info, ok := am.GetDCLocationInfo(request.GetDcLocation()) if !ok { @@ -2336,6 +2603,20 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo if s.client == nil { return nil, ErrEtcdNotStarted } + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.StoreGlobalConfigResponse{ + Error: &pdpb.Error{ + Type: pdpb.ErrorType_UNKNOWN, + Message: err.Error(), + }, + }, nil + } + } configPath := request.GetConfigPath() if configPath == "" { configPath = globalConfigPath @@ -2373,6 +2654,15 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo if s.client == nil { return nil, ErrEtcdNotStarted } + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return nil, err + } + } configPath := request.GetConfigPath() if configPath == "" { configPath = globalConfigPath @@ -2412,6 +2702,15 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve if s.client == nil { return ErrEtcdNotStarted } + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return err + } + } ctx, cancel := context.WithCancel(s.Context()) defer cancel() configPath := req.GetConfigPath() @@ -2498,6 +2797,17 @@ func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) { // ReportMinResolvedTS implements gRPC PDServer. func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.ReportMinResolvedTsRequest) (*pdpb.ReportMinResolvedTsResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.ReportMinResolvedTsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).ReportMinResolvedTS(ctx, request) } @@ -2527,6 +2837,17 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo // SetExternalTimestamp implements gRPC PDServer. func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.SetExternalTimestampRequest) (*pdpb.SetExternalTimestampResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.SetExternalTimestampResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).SetExternalTimestamp(ctx, request) } @@ -2554,6 +2875,17 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set // GetExternalTimestamp implements gRPC PDServer. func (s *GrpcServer) GetExternalTimestamp(ctx context.Context, request *pdpb.GetExternalTimestampRequest) (*pdpb.GetExternalTimestampResponse, error) { + if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { + fName := currentFunction() + limiter := s.GetGRPCRateLimiter() + if done, err := limiter.Allow(fName); err == nil { + defer done() + } else { + return &pdpb.GetExternalTimestampResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetExternalTimestamp(ctx, request) } diff --git a/server/metrics.go b/server/metrics.go index e06a0cc20dd..0935008a420 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -145,6 +145,14 @@ var ( Buckets: prometheus.DefBuckets, }, []string{"service", "method", "caller_id", "ip"}) + apiConcurrencyGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "api_concurrency", + Help: "Concurrency number of the api.", + }, []string{"kind", "api"}) + forwardFailCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", @@ -170,5 +178,6 @@ func init() { prometheus.MustRegister(bucketReportLatency) prometheus.MustRegister(serviceAuditHistogram) prometheus.MustRegister(bucketReportInterval) + prometheus.MustRegister(apiConcurrencyGauge) prometheus.MustRegister(forwardFailCounter) } diff --git a/server/server.go b/server/server.go index 21ab983b3d1..fc2cc7466d0 100644 --- a/server/server.go +++ b/server/server.go @@ -274,8 +274,8 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le audit.NewLocalLogBackend(true), audit.NewPrometheusHistogramBackend(serviceAuditHistogram, false), } - s.serviceRateLimiter = ratelimit.NewController() - s.grpcServiceRateLimiter = ratelimit.NewController() + s.serviceRateLimiter = ratelimit.NewController(s.ctx, "http", apiConcurrencyGauge) + s.grpcServiceRateLimiter = ratelimit.NewController(s.ctx, "grpc", apiConcurrencyGauge) s.serviceAuditBackendLabels = make(map[string]*audit.BackendLabels) s.serviceLabels = make(map[string][]apiutil.AccessPath) s.grpcServiceLabels = make(map[string]struct{}) @@ -383,9 +383,11 @@ func (s *Server) startEtcd(ctx context.Context) error { } func (s *Server) initGRPCServiceLabels() { - for _, serviceInfo := range s.grpcServer.GetServiceInfo() { - for _, methodInfo := range serviceInfo.Methods { - s.grpcServiceLabels[methodInfo.Name] = struct{}{} + for name, serviceInfo := range s.grpcServer.GetServiceInfo() { + if name == gRPCServiceName { + for _, methodInfo := range serviceInfo.Methods { + s.grpcServiceLabels[methodInfo.Name] = struct{}{} + } } } } @@ -503,6 +505,14 @@ func (s *Server) startServer(ctx context.Context) error { cb() } + // to init all rate limiter and metrics + for service := range s.serviceLabels { + s.serviceRateLimiter.Update(service, ratelimit.InitLimiter()) + } + for service := range s.grpcServiceLabels { + s.grpcServiceRateLimiter.Update(service, ratelimit.InitLimiter()) + } + // Server has started. atomic.StoreInt64(&s.isRunning, 1) bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) @@ -560,6 +570,8 @@ func (s *Server) Close() { } } + s.grpcServiceRateLimiter.Close() + s.serviceRateLimiter.Close() // Run callbacks log.Info("triggering the close callback functions") for _, cb := range s.closeCallbacks {