Skip to content

Commit

Permalink
resource_manager: record the max RU per second
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Mar 19, 2024
1 parent da97f9c commit ad4b3c0
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 2 deletions.
86 changes: 84 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand All @@ -41,7 +42,8 @@ const (
defaultConsumptionChanSize = 1024
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
metricsAvailableRUInterval = 30 * time.Second
metricsAvailableRUInterval = 1 * time.Second
defaultCollectIntervalSec = 20

reservedDefaultGroupName = "default"
middlePriority = 8
Expand Down Expand Up @@ -357,6 +359,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
defer cleanUpTicker.Stop()
availableRUTicker := time.NewTicker(metricsAvailableRUInterval)
defer availableRUTicker.Stop()
recordMaxTicker := time.NewTicker(time.Second)
defer recordMaxTicker.Stop()
maxPerSecTrackers := make(map[string]*maxPerSecCostTracker)
rruSum := make(map[string]float64)
wruSum := make(map[string]float64)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -389,9 +396,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
// RU info.
if consumption.RRU > 0 {
rruMetrics.Add(consumption.RRU)
rruSum[name] += consumption.RRU
}
if consumption.WRU > 0 {
wruMetrics.Add(consumption.WRU)
wruSum[name] += consumption.WRU
}
// Byte info.
if consumption.ReadBytes > 0 {
Expand Down Expand Up @@ -437,21 +446,94 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel)
availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType)
delete(m.consumptionRecord, r)
delete(maxPerSecTrackers, r.name)
delete(rruSum, r.name)
delete(wruSum, r.name)
readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)

Check warning on line 453 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L449-L453

Added lines #L449 - L453 were not covered by tests
}
}
case <-availableRUTicker.C:
m.RLock()
groups := make([]*ResourceGroup, 0, len(m.groups))
for name, group := range m.groups {
if name == reservedDefaultGroupName {
continue
}
groups = append(groups, group)
}
m.RUnlock()
// prevent many groups and hold the lock long time.
for _, group := range groups {
ru := group.getRUToken()
if ru < 0 {
ru = 0
}
availableRUCounter.WithLabelValues(name, name).Set(ru)
availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru)
}

case <-recordMaxTicker.C:
// Record the sum of RRU and WRU every second.
m.RLock()
names := make([]string, 0, len(m.groups))
for name := range m.groups {
names = append(names, name)
}
m.RUnlock()
for _, name := range names {
if maxPerSecTrackers[name] == nil {
maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
}
maxPerSecTrackers[name].Observe(rruSum[name], wruSum[name])
}
}
}
}

type maxPerSecCostTracker struct {
name string
maxPerSecRRU float64
maxPerSecWRU float64
lastRRUSum float64
lastWRUSum float64
flushPeriod int
cnt int
rruMaxMetrics prometheus.Gauge
wruMaxMetrics prometheus.Gauge
}

func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker {
return &maxPerSecCostTracker{
name: name,
flushPeriod: flushPeriod,
rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name),
wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name),
}
}

// Observe and set the maxPerSecRRU and maxPerSecWRU to the metrics.
func (t *maxPerSecCostTracker) Observe(rruSum, wruSum float64) {
if t.lastRRUSum == 0 && t.lastWRUSum == 0 {
t.lastRRUSum = rruSum
t.lastWRUSum = wruSum
return
}
deltaRRU := rruSum - t.lastRRUSum
deltaWRU := wruSum - t.lastWRUSum
t.lastRRUSum = rruSum
t.lastWRUSum = wruSum
if deltaRRU > t.maxPerSecRRU {
t.maxPerSecRRU = deltaRRU
}
if deltaWRU > t.maxPerSecWRU {
t.maxPerSecWRU = deltaWRU
}
t.cnt++
// flush to metrics in every flushPeriod.
if t.cnt%t.flushPeriod == 0 {
t.rruMaxMetrics.Set(t.maxPerSecRRU)
t.wruMaxMetrics.Set(t.maxPerSecWRU)
t.maxPerSecRRU = 0
t.maxPerSecWRU = 0
}
}
18 changes: 18 additions & 0 deletions pkg/mcs/resourcemanager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ var (
Name: "write_request_unit_sum",
Help: "Counter of the write request unit cost for all resource groups.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel})

readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "read_request_unit_max_per_sec",
Help: "Gauge of the max read request unit per second for all resource groups.",
}, []string{newResourceGroupNameLabel})
writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "write_request_unit_max_per_sec",
Help: "Gauge of the max write request unit per second for all resource groups.",
}, []string{newResourceGroupNameLabel})

sqlLayerRequestUnitCost = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -112,4 +128,6 @@ func init() {
prometheus.MustRegister(sqlCPUCost)
prometheus.MustRegister(requestCount)
prometheus.MustRegister(availableRUCounter)
prometheus.MustRegister(readRequestUnitMaxPerSecCost)
prometheus.MustRegister(writeRequestUnitMaxPerSecCost)
}
47 changes: 47 additions & 0 deletions pkg/mcs/resourcemanager/server/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestMaxPerSecCostTracker(t *testing.T) {
tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec)
re := require.New(t)

// Define the expected max values for each flushPeriod
expectedMaxRRU := []float64{19, 39, 59}
expectedMaxWRU := []float64{19, 39, 59}

rSum := 0
wSum := 0
for i := 0; i < 60; i++ {
// Record data
rSum += i
wSum += i
tracker.Observe(float64(rSum), float64(wSum))

// Check the max values at the end of each flushPeriod
if (i+1)%20 == 0 {
period := i / 20
re.Equal(tracker.maxPerSecRRU, expectedMaxRRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1))
re.Equal(tracker.maxPerSecWRU, expectedMaxWRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1))
}
}
}

0 comments on commit ad4b3c0

Please sign in to comment.