diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index ea67bb847ef..ef402b8cbf9 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -41,7 +42,9 @@ const ( defaultConsumptionChanSize = 1024 metricsCleanupInterval = time.Minute metricsCleanupTimeout = 20 * time.Minute - metricsAvailableRUInterval = 30 * time.Second + metricsAvailableRUInterval = 1 * time.Second + defaultCollectIntervalSec = 20 + tickPerSecond = time.Second reservedDefaultGroupName = "default" middlePriority = 8 @@ -357,6 +360,9 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { defer cleanUpTicker.Stop() availableRUTicker := time.NewTicker(metricsAvailableRUInterval) defer availableRUTicker.Stop() + recordMaxTicker := time.NewTicker(tickPerSecond) + defer recordMaxTicker.Stop() + maxPerSecTrackers := make(map[string]*maxPerSecCostTracker) for { select { case <-ctx.Done(): @@ -386,6 +392,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { readRequestCountMetrics = requestCount.WithLabelValues(name, name, readTypeLabel) writeRequestCountMetrics = requestCount.WithLabelValues(name, name, writeTypeLabel) ) + t, ok := maxPerSecTrackers[name] + if !ok { + t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + maxPerSecTrackers[name] = t + } + t.CollectConsumption(consumption) + // RU info. if consumption.RRU > 0 { rruMetrics.Add(consumption.RRU) @@ -437,21 +450,101 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel) availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType) delete(m.consumptionRecord, r) + delete(maxPerSecTrackers, r.name) + readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) + writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) } } case <-availableRUTicker.C: m.RLock() + groups := make([]*ResourceGroup, 0, len(m.groups)) for name, group := range m.groups { if name == reservedDefaultGroupName { continue } + groups = append(groups, group) + } + m.RUnlock() + // prevent many groups and hold the lock long time. + for _, group := range groups { ru := group.getRUToken() if ru < 0 { ru = 0 } - availableRUCounter.WithLabelValues(name, name).Set(ru) + availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru) + } + + case <-recordMaxTicker.C: + // Record the sum of RRU and WRU every second. + m.RLock() + names := make([]string, 0, len(m.groups)) + for name := range m.groups { + names = append(names, name) } m.RUnlock() + for _, name := range names { + if t, ok := maxPerSecTrackers[name]; !ok { + maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + } else { + t.FlushMetrics() + } + } } } } + +type maxPerSecCostTracker struct { + name string + maxPerSecRRU float64 + maxPerSecWRU float64 + rruSum float64 + wruSum float64 + lastRRUSum float64 + lastWRUSum float64 + flushPeriod int + cnt int + rruMaxMetrics prometheus.Gauge + wruMaxMetrics prometheus.Gauge +} + +func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker { + return &maxPerSecCostTracker{ + name: name, + flushPeriod: flushPeriod, + rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name), + wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name), + } +} + +// CollectConsumption collects the consumption info. +func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) { + t.rruSum += consume.RRU + t.wruSum += consume.WRU +} + +// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics. +func (t *maxPerSecCostTracker) FlushMetrics() { + if t.lastRRUSum == 0 && t.lastWRUSum == 0 { + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + return + } + deltaRRU := t.rruSum - t.lastRRUSum + deltaWRU := t.wruSum - t.lastWRUSum + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + if deltaRRU > t.maxPerSecRRU { + t.maxPerSecRRU = deltaRRU + } + if deltaWRU > t.maxPerSecWRU { + t.maxPerSecWRU = deltaWRU + } + t.cnt++ + // flush to metrics in every flushPeriod. + if t.cnt%t.flushPeriod == 0 { + t.rruMaxMetrics.Set(t.maxPerSecRRU) + t.wruMaxMetrics.Set(t.maxPerSecWRU) + t.maxPerSecRRU = 0 + t.maxPerSecWRU = 0 + } +} diff --git a/pkg/mcs/resourcemanager/server/metrics.go b/pkg/mcs/resourcemanager/server/metrics.go index 6bb90c45d12..45c94e5c735 100644 --- a/pkg/mcs/resourcemanager/server/metrics.go +++ b/pkg/mcs/resourcemanager/server/metrics.go @@ -48,6 +48,22 @@ var ( Name: "write_request_unit_sum", Help: "Counter of the write request unit cost for all resource groups.", }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel}) + + readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "read_request_unit_max_per_sec", + Help: "Gauge of the max read request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "write_request_unit_max_per_sec", + Help: "Gauge of the max write request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + sqlLayerRequestUnitCost = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -112,4 +128,6 @@ func init() { prometheus.MustRegister(sqlCPUCost) prometheus.MustRegister(requestCount) prometheus.MustRegister(availableRUCounter) + prometheus.MustRegister(readRequestUnitMaxPerSecCost) + prometheus.MustRegister(writeRequestUnitMaxPerSecCost) } diff --git a/pkg/mcs/resourcemanager/server/metrics_test.go b/pkg/mcs/resourcemanager/server/metrics_test.go new file mode 100644 index 00000000000..62d07286eaf --- /dev/null +++ b/pkg/mcs/resourcemanager/server/metrics_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "testing" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestMaxPerSecCostTracker(t *testing.T) { + tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec) + re := require.New(t) + + // Define the expected max values for each flushPeriod + expectedMaxRU := []float64{19, 39, 59} + expectedSum := []float64{190, 780, 1770} + + for i := 0; i < 60; i++ { + // Record data + consumption := &rmpb.Consumption{ + RRU: float64(i), + WRU: float64(i), + } + tracker.CollectConsumption(consumption) + tracker.FlushMetrics() + + // Check the max values at the end of each flushPeriod + if (i+1)%20 == 0 { + period := i / 20 + re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1)) + re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1)) + re.Equal(tracker.rruSum, expectedSum[period]) + re.Equal(tracker.rruSum, expectedSum[period]) + } + } +}