diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index ea67bb847ef0..4803cecb2b83 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -41,7 +42,8 @@ const ( defaultConsumptionChanSize = 1024 metricsCleanupInterval = time.Minute metricsCleanupTimeout = 20 * time.Minute - metricsAvailableRUInterval = 30 * time.Second + metricsAvailableRUInterval = 1 * time.Second + defaultCollectIntervalSec = 20 reservedDefaultGroupName = "default" middlePriority = 8 @@ -357,6 +359,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { defer cleanUpTicker.Stop() availableRUTicker := time.NewTicker(metricsAvailableRUInterval) defer availableRUTicker.Stop() + recordMaxTicker := time.NewTicker(time.Second) + defer recordMaxTicker.Stop() + maxPerSecTrackers := make(map[string]*maxPerSecCostTracker) + rruSum := make(map[string]float64) + wruSum := make(map[string]float64) for { select { case <-ctx.Done(): @@ -389,9 +396,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { // RU info. if consumption.RRU > 0 { rruMetrics.Add(consumption.RRU) + rruSum[name] += consumption.RRU } if consumption.WRU > 0 { wruMetrics.Add(consumption.WRU) + wruSum[name] += consumption.WRU } // Byte info. if consumption.ReadBytes > 0 { @@ -437,21 +446,94 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel) availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType) delete(m.consumptionRecord, r) + delete(maxPerSecTrackers, r.name) + delete(rruSum, r.name) + delete(wruSum, r.name) + readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) + writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) } } case <-availableRUTicker.C: m.RLock() + groups := make([]*ResourceGroup, 0, len(m.groups)) for name, group := range m.groups { if name == reservedDefaultGroupName { continue } + groups = append(groups, group) + } + m.RUnlock() + // prevent many groups and hold the lock long time. + for _, group := range groups { ru := group.getRUToken() if ru < 0 { ru = 0 } - availableRUCounter.WithLabelValues(name, name).Set(ru) + availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru) + } + + case <-recordMaxTicker.C: + // Record the sum of RRU and WRU every second. + m.RLock() + names := make([]string, 0, len(m.groups)) + for name := range m.groups { + names = append(names, name) } m.RUnlock() + for _, name := range names { + if maxPerSecTrackers[name] == nil { + maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + } + maxPerSecTrackers[name].RecordPerSecond(rruSum[name], wruSum[name]) + } } } } + +type maxPerSecCostTracker struct { + name string + maxPerSecRRU float64 + maxPerSecWRU float64 + lastRRUSum float64 + lastWRUSum float64 + flushPeriod int + cnt int + rruMaxMetrics prometheus.Gauge + wruMaxMetrics prometheus.Gauge +} + +func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker { + return &maxPerSecCostTracker{ + name: name, + flushPeriod: flushPeriod, + rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name), + wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name), + } +} + +// Set the maxPerSecRRU and maxPerSecWRU to the metrics. +func (t *maxPerSecCostTracker) RecordPerSecond(rruSum, wruSum float64) { + if t.lastRRUSum == 0 && t.lastWRUSum == 0 { + t.lastRRUSum = rruSum + t.lastWRUSum = wruSum + return + } + deltaRRU := rruSum - t.lastRRUSum + deltaWRU := wruSum - t.lastWRUSum + t.lastRRUSum = rruSum + t.lastWRUSum = wruSum + if deltaRRU > t.maxPerSecRRU { + t.maxPerSecRRU = deltaRRU + } + if deltaWRU > t.maxPerSecWRU { + t.maxPerSecWRU = deltaWRU + } + t.cnt++ + // flush to metrics in every flushPeriod. + if t.cnt%t.flushPeriod == 0 { + t.rruMaxMetrics.Set(t.maxPerSecRRU) + t.wruMaxMetrics.Set(t.maxPerSecWRU) + t.maxPerSecRRU = 0 + t.maxPerSecWRU = 0 + } +} diff --git a/pkg/mcs/resourcemanager/server/metrics.go b/pkg/mcs/resourcemanager/server/metrics.go index 6bb90c45d122..45c94e5c7355 100644 --- a/pkg/mcs/resourcemanager/server/metrics.go +++ b/pkg/mcs/resourcemanager/server/metrics.go @@ -48,6 +48,22 @@ var ( Name: "write_request_unit_sum", Help: "Counter of the write request unit cost for all resource groups.", }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel}) + + readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "read_request_unit_max_per_sec", + Help: "Gauge of the max read request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "write_request_unit_max_per_sec", + Help: "Gauge of the max write request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + sqlLayerRequestUnitCost = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -112,4 +128,6 @@ func init() { prometheus.MustRegister(sqlCPUCost) prometheus.MustRegister(requestCount) prometheus.MustRegister(availableRUCounter) + prometheus.MustRegister(readRequestUnitMaxPerSecCost) + prometheus.MustRegister(writeRequestUnitMaxPerSecCost) } diff --git a/pkg/mcs/resourcemanager/server/metrics_test.go b/pkg/mcs/resourcemanager/server/metrics_test.go new file mode 100644 index 000000000000..3f36bf51a84d --- /dev/null +++ b/pkg/mcs/resourcemanager/server/metrics_test.go @@ -0,0 +1,47 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMaxPerSecCostTracker(t *testing.T) { + tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec) + re := require.New(t) + + // Define the expected max values for each flushPeriod + expectedMaxRRU := []float64{19, 39, 59} + expectedMaxWRU := []float64{19, 39, 59} + + rSum := 0 + wSum := 0 + for i := 0; i < 60; i++ { + // Record data + rSum += i + wSum += i + tracker.RecordPerSecond(float64(rSum), float64(wSum)) + + // Check the max values at the end of each flushPeriod + if (i+1)%20 == 0 { + period := i / 20 + re.Equal(tracker.maxPerSecRRU, expectedMaxRRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1)) + re.Equal(tracker.maxPerSecWRU, expectedMaxWRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1)) + } + } +}