diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 215db2e676c..8e4b23c8c9f 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -42,6 +43,8 @@ const ( metricsCleanupInterval = time.Minute metricsCleanupTimeout = 20 * time.Minute metricsAvailableRUInterval = 30 * time.Second + defaultCollectIntervalSec = 20 + tickPerSecond = time.Second reservedDefaultGroupName = "default" middlePriority = 8 @@ -350,6 +353,9 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { defer cleanUpTicker.Stop() availableRUTicker := time.NewTicker(metricsAvailableRUInterval) defer availableRUTicker.Stop() + recordMaxTicker := time.NewTicker(tickPerSecond) + defer recordMaxTicker.Stop() + maxPerSecTrackers := make(map[string]*maxPerSecCostTracker) for { select { case <-ctx.Done(): @@ -371,6 +377,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel) writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) ) + t, ok := maxPerSecTrackers[name] + if !ok { + t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + maxPerSecTrackers[name] = t + } + t.CollectConsumption(consumption) + // RU info. if consumption.RRU > 0 { rruMetrics.Observe(consumption.RRU) @@ -418,21 +431,101 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { requestCount.DeleteLabelValues(name, writeTypeLabel) availableRUCounter.DeleteLabelValues(name) delete(m.consumptionRecord, name) + delete(maxPerSecTrackers, name) + readRequestUnitMaxPerSecCost.DeleteLabelValues(name) + writeRequestUnitMaxPerSecCost.DeleteLabelValues(name) } } case <-availableRUTicker.C: m.RLock() + groups := make([]*ResourceGroup, 0, len(m.groups)) for name, group := range m.groups { if name == reservedDefaultGroupName { continue } + groups = append(groups, group) + } + m.RUnlock() + // prevent many groups and hold the lock long time. + for _, group := range groups { ru := group.getRUToken() if ru < 0 { ru = 0 } - availableRUCounter.WithLabelValues(name).Set(ru) + availableRUCounter.WithLabelValues(group.Name).Set(ru) + } + + case <-recordMaxTicker.C: + // Record the sum of RRU and WRU every second. + m.RLock() + names := make([]string, 0, len(m.groups)) + for name := range m.groups { + names = append(names, name) } m.RUnlock() + for _, name := range names { + if t, ok := maxPerSecTrackers[name]; !ok { + maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + } else { + t.FlushMetrics() + } + } } } } + +type maxPerSecCostTracker struct { + name string + maxPerSecRRU float64 + maxPerSecWRU float64 + rruSum float64 + wruSum float64 + lastRRUSum float64 + lastWRUSum float64 + flushPeriod int + cnt int + rruMaxMetrics prometheus.Gauge + wruMaxMetrics prometheus.Gauge +} + +func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker { + return &maxPerSecCostTracker{ + name: name, + flushPeriod: flushPeriod, + rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name), + wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name), + } +} + +// CollectConsumption collects the consumption info. +func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) { + t.rruSum += consume.RRU + t.wruSum += consume.WRU +} + +// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics. +func (t *maxPerSecCostTracker) FlushMetrics() { + if t.lastRRUSum == 0 && t.lastWRUSum == 0 { + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + return + } + deltaRRU := t.rruSum - t.lastRRUSum + deltaWRU := t.wruSum - t.lastWRUSum + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + if deltaRRU > t.maxPerSecRRU { + t.maxPerSecRRU = deltaRRU + } + if deltaWRU > t.maxPerSecWRU { + t.maxPerSecWRU = deltaWRU + } + t.cnt++ + // flush to metrics in every flushPeriod. + if t.cnt%t.flushPeriod == 0 { + t.rruMaxMetrics.Set(t.maxPerSecRRU) + t.wruMaxMetrics.Set(t.maxPerSecWRU) + t.maxPerSecRRU = 0 + t.maxPerSecWRU = 0 + } +} diff --git a/pkg/mcs/resource_manager/server/metrics.go b/pkg/mcs/resource_manager/server/metrics.go index c33cafa3627..dec877d8c86 100644 --- a/pkg/mcs/resource_manager/server/metrics.go +++ b/pkg/mcs/resource_manager/server/metrics.go @@ -19,14 +19,15 @@ import ( ) const ( - namespace = "resource_manager" - serverSubsystem = "server" - ruSubsystem = "resource_unit" - resourceSubsystem = "resource" - resourceGroupNameLabel = "name" - typeLabel = "type" - readTypeLabel = "read" - writeTypeLabel = "write" + namespace = "resource_manager" + serverSubsystem = "server" + ruSubsystem = "resource_unit" + resourceSubsystem = "resource" + resourceGroupNameLabel = "name" + typeLabel = "type" + readTypeLabel = "read" + writeTypeLabel = "write" + newResourceGroupNameLabel = "resource_group" ) var ( @@ -55,6 +56,21 @@ var ( Help: "Bucketed histogram of the write request unit cost for all resource groups.", Buckets: prometheus.ExponentialBuckets(3, 10, 5), // 3 ~ 300000 }, []string{resourceGroupNameLabel}) + readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "read_request_unit_max_per_sec", + Help: "Gauge of the max read request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "write_request_unit_max_per_sec", + Help: "Gauge of the max write request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + sqlLayerRequestUnitCost = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, diff --git a/pkg/mcs/resource_manager/server/metrics_test.go b/pkg/mcs/resource_manager/server/metrics_test.go new file mode 100644 index 00000000000..62d07286eaf --- /dev/null +++ b/pkg/mcs/resource_manager/server/metrics_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "testing" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestMaxPerSecCostTracker(t *testing.T) { + tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec) + re := require.New(t) + + // Define the expected max values for each flushPeriod + expectedMaxRU := []float64{19, 39, 59} + expectedSum := []float64{190, 780, 1770} + + for i := 0; i < 60; i++ { + // Record data + consumption := &rmpb.Consumption{ + RRU: float64(i), + WRU: float64(i), + } + tracker.CollectConsumption(consumption) + tracker.FlushMetrics() + + // Check the max values at the end of each flushPeriod + if (i+1)%20 == 0 { + period := i / 20 + re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1)) + re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1)) + re.Equal(tracker.rruSum, expectedSum[period]) + re.Equal(tracker.rruSum, expectedSum[period]) + } + } +}