Skip to content

Commit

Permalink
resource_manager: record the max RU per second (#7936) (#8011)
Browse files Browse the repository at this point in the history
close #7908

resource_manager: record the max RU per second

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: nolouch <[email protected]>

Co-authored-by: ShuNing <[email protected]>
Co-authored-by: nolouch <[email protected]>
  • Loading branch information
ti-chi-bot and nolouch committed Apr 11, 2024
1 parent 39fa88e commit 442f8c9
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 9 deletions.
95 changes: 94 additions & 1 deletion pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand All @@ -42,6 +43,8 @@ const (
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
metricsAvailableRUInterval = 30 * time.Second
defaultCollectIntervalSec = 20
tickPerSecond = time.Second

reservedDefaultGroupName = "default"
middlePriority = 8
Expand Down Expand Up @@ -350,6 +353,9 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
defer cleanUpTicker.Stop()
availableRUTicker := time.NewTicker(metricsAvailableRUInterval)
defer availableRUTicker.Stop()
recordMaxTicker := time.NewTicker(tickPerSecond)
defer recordMaxTicker.Stop()
maxPerSecTrackers := make(map[string]*maxPerSecCostTracker)
for {
select {
case <-ctx.Done():
Expand All @@ -371,6 +377,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel)
)
t, ok := maxPerSecTrackers[name]
if !ok {
t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
maxPerSecTrackers[name] = t
}
t.CollectConsumption(consumption)

// RU info.
if consumption.RRU > 0 {
rruMetrics.Observe(consumption.RRU)
Expand Down Expand Up @@ -418,21 +431,101 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
requestCount.DeleteLabelValues(name, writeTypeLabel)
availableRUCounter.DeleteLabelValues(name)
delete(m.consumptionRecord, name)
delete(maxPerSecTrackers, name)
readRequestUnitMaxPerSecCost.DeleteLabelValues(name)
writeRequestUnitMaxPerSecCost.DeleteLabelValues(name)
}
}
case <-availableRUTicker.C:
m.RLock()
groups := make([]*ResourceGroup, 0, len(m.groups))
for name, group := range m.groups {
if name == reservedDefaultGroupName {
continue
}
groups = append(groups, group)
}
m.RUnlock()
// prevent many groups and hold the lock long time.
for _, group := range groups {
ru := group.getRUToken()
if ru < 0 {
ru = 0
}
availableRUCounter.WithLabelValues(name).Set(ru)
availableRUCounter.WithLabelValues(group.Name).Set(ru)
}

case <-recordMaxTicker.C:
// Record the sum of RRU and WRU every second.
m.RLock()
names := make([]string, 0, len(m.groups))
for name := range m.groups {
names = append(names, name)
}
m.RUnlock()
for _, name := range names {
if t, ok := maxPerSecTrackers[name]; !ok {
maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
} else {
t.FlushMetrics()
}
}
}
}
}

type maxPerSecCostTracker struct {
name string
maxPerSecRRU float64
maxPerSecWRU float64
rruSum float64
wruSum float64
lastRRUSum float64
lastWRUSum float64
flushPeriod int
cnt int
rruMaxMetrics prometheus.Gauge
wruMaxMetrics prometheus.Gauge
}

func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker {
return &maxPerSecCostTracker{
name: name,
flushPeriod: flushPeriod,
rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name),
wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name),
}
}

// CollectConsumption collects the consumption info.
func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) {
t.rruSum += consume.RRU
t.wruSum += consume.WRU
}

// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics.
func (t *maxPerSecCostTracker) FlushMetrics() {
if t.lastRRUSum == 0 && t.lastWRUSum == 0 {
t.lastRRUSum = t.rruSum
t.lastWRUSum = t.wruSum
return
}
deltaRRU := t.rruSum - t.lastRRUSum
deltaWRU := t.wruSum - t.lastWRUSum
t.lastRRUSum = t.rruSum
t.lastWRUSum = t.wruSum
if deltaRRU > t.maxPerSecRRU {
t.maxPerSecRRU = deltaRRU
}
if deltaWRU > t.maxPerSecWRU {
t.maxPerSecWRU = deltaWRU
}
t.cnt++
// flush to metrics in every flushPeriod.
if t.cnt%t.flushPeriod == 0 {
t.rruMaxMetrics.Set(t.maxPerSecRRU)
t.wruMaxMetrics.Set(t.maxPerSecWRU)
t.maxPerSecRRU = 0
t.maxPerSecWRU = 0
}
}
32 changes: 24 additions & 8 deletions pkg/mcs/resource_manager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
)

const (
namespace = "resource_manager"
serverSubsystem = "server"
ruSubsystem = "resource_unit"
resourceSubsystem = "resource"
resourceGroupNameLabel = "name"
typeLabel = "type"
readTypeLabel = "read"
writeTypeLabel = "write"
namespace = "resource_manager"
serverSubsystem = "server"
ruSubsystem = "resource_unit"
resourceSubsystem = "resource"
resourceGroupNameLabel = "name"
typeLabel = "type"
readTypeLabel = "read"
writeTypeLabel = "write"
newResourceGroupNameLabel = "resource_group"
)

var (
Expand Down Expand Up @@ -55,6 +56,21 @@ var (
Help: "Bucketed histogram of the write request unit cost for all resource groups.",
Buckets: prometheus.ExponentialBuckets(3, 10, 5), // 3 ~ 300000
}, []string{resourceGroupNameLabel})
readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "read_request_unit_max_per_sec",
Help: "Gauge of the max read request unit per second for all resource groups.",
}, []string{newResourceGroupNameLabel})
writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "write_request_unit_max_per_sec",
Help: "Gauge of the max write request unit per second for all resource groups.",
}, []string{newResourceGroupNameLabel})

sqlLayerRequestUnitCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand Down
51 changes: 51 additions & 0 deletions pkg/mcs/resource_manager/server/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"testing"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

func TestMaxPerSecCostTracker(t *testing.T) {
tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec)
re := require.New(t)

// Define the expected max values for each flushPeriod
expectedMaxRU := []float64{19, 39, 59}
expectedSum := []float64{190, 780, 1770}

for i := 0; i < 60; i++ {
// Record data
consumption := &rmpb.Consumption{
RRU: float64(i),
WRU: float64(i),
}
tracker.CollectConsumption(consumption)
tracker.FlushMetrics()

// Check the max values at the end of each flushPeriod
if (i+1)%20 == 0 {
period := i / 20
re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1))
re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1))
re.Equal(tracker.rruSum, expectedSum[period])
re.Equal(tracker.rruSum, expectedSum[period])
}
}
}

0 comments on commit 442f8c9

Please sign in to comment.