Skip to content

Commit

Permalink
tools/heartbeat: support to collect metrics (#8235)
Browse files Browse the repository at this point in the history
ref #8135

tools/heartbeat: support to collect metrics

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Jul 29, 2024
1 parent c53f1d5 commit 5d77447
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/common v0.51.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -132,7 +133,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.51.1 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/samber/lo v1.37.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions tools/pd-heartbeat-bench/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Config struct {
ReportRatio float64 `toml:"report-ratio" json:"report-ratio"`
Sample bool `toml:"sample" json:"sample"`
Round int `toml:"round" json:"round"`
MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"`
}

// NewConfig return a set of settings.
Expand All @@ -69,6 +70,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Security.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format")
fs.StringVar(&cfg.Security.KeyPath, "key", "", "path of file that contains X509 key in PEM format")
fs.Uint64Var(&cfg.InitEpochVer, "epoch-ver", 1, "the initial epoch version value")
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "127.0.0.1:9090", "the address to pull metrics")

return cfg
}
Expand Down
33 changes: 25 additions & 8 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"sync/atomic"
"syscall"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/tools/pd-heartbeat-bench/config"
"github.com/tikv/pd/tools/pd-heartbeat-bench/metrics"
"go.etcd.io/etcd/pkg/report"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -528,6 +530,7 @@ func main() {
defer heartbeatTicker.Stop()
var resolvedTSTicker = time.NewTicker(time.Second)
defer resolvedTSTicker.Stop()
withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr)
for {
select {
case <-heartbeatTicker.C:
Expand All @@ -544,21 +547,19 @@ func main() {
wg.Add(1)
go regions.handleRegionHeartbeat(wg, streams[id], id, rep)
}
if withMetric {
metrics.CollectMetrics(regions.updateRound, time.Second)
}
wg.Wait()

since := time.Since(startTime).Seconds()
close(rep.Results())
regions.result(cfg.RegionCount, since)
stats := <-r
log.Info("region heartbeat stats", zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())),
zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)),
zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)),
zap.String("average", fmt.Sprintf("%.4fs", stats.Average)),
zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)),
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
zap.Uint64("max-epoch-version", maxVersion),
)
log.Info("region heartbeat stats",
metrics.RegionFields(stats, zap.Uint64("max-epoch-version", maxVersion))...)
log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since)))
metrics.CollectRegionAndStoreStats(&stats, &since)
regions.update(cfg, options)
go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update.
case <-resolvedTSTicker.C:
Expand Down Expand Up @@ -594,6 +595,7 @@ func main() {
}

func exit(code int) {
metrics.OutputConclusion()
os.Exit(code)
}

Expand Down Expand Up @@ -689,6 +691,21 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {

c.IndentedJSON(http.StatusOK, output)
})
engine.GET("metrics-collect", func(c *gin.Context) {
second := c.Query("second")
if second == "" {
c.String(http.StatusBadRequest, "missing second")
return
}
secondInt, err := strconv.Atoi(second)
if err != nil {
c.String(http.StatusBadRequest, "invalid second")
return
}
metrics.CollectMetrics(metrics.WarmUpRound, time.Duration(secondInt)*time.Second)
c.IndentedJSON(http.StatusOK, "Successfully collect metrics")
})

engine.Run(cfg.StatusAddr)
}

Expand Down
231 changes: 231 additions & 0 deletions tools/pd-heartbeat-bench/metrics/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"
"fmt"
"math"
"net/url"
"strings"
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"go.etcd.io/etcd/pkg/report"
"go.uber.org/zap"
)

var (
prometheusCli api.Client
finalMetrics2Collect []Metric
avgRegionStats report.Stats
avgStoreTime float64
collectRound = 1.0

metrics2Collect = []Metric{
{promSQL: cpuMetric, name: "max cpu usage(%)", max: true},
{promSQL: memoryMetric, name: "max memory usage(G)", max: true},
{promSQL: goRoutineMetric, name: "max go routines", max: true},
{promSQL: hbLatency99Metric, name: "99% Heartbeat Latency(ms)"},
{promSQL: hbLatencyAvgMetric, name: "Avg Heartbeat Latency(ms)"},
}

// Prometheus SQL
cpuMetric = `max_over_time(irate(process_cpu_seconds_total{job=~".*pd.*"}[30s])[1h:30s]) * 100`
memoryMetric = `max_over_time(go_memstats_heap_inuse_bytes{job=~".*pd.*"}[1h])/1024/1024/1024`
goRoutineMetric = `max_over_time(go_goroutines{job=~".*pd.*"}[1h])`
hbLatency99Metric = `histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{}[1m])) by (le))`
hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m]))`

// Heartbeat Performance Duration BreakDown
breakdownNames = []string{
"AsyncHotStatsDuration",
"CollectRegionStats",
"Other",
"PreCheck",
"RegionGuide",
"SaveCache_CheckOverlaps",
"SaveCache_InvalidRegion",
"SaveCache_SetRegion",
"SaveCache_UpdateSubTree",
}
hbBreakdownMetricByName = func(name string) string {
return fmt.Sprintf(`sum(rate(pd_core_region_heartbeat_breakdown_handle_duration_seconds_sum{name="%s"}[1m]))`, name)
}
)

type Metric struct {
promSQL string
name string
value float64
// max indicates whether the metric is a max value
max bool
}

func InitMetric2Collect(endpoint string) (withMetric bool) {
for _, name := range breakdownNames {
metrics2Collect = append(metrics2Collect, Metric{
promSQL: hbBreakdownMetricByName(name),
name: name,
})
}
finalMetrics2Collect = metrics2Collect

if j := strings.Index(endpoint, "//"); j == -1 {
endpoint = "http://" + endpoint
}
cu, err := url.Parse(endpoint)
if err != nil {
log.Error("parse prometheus url error", zap.Error(err))
return false
}
prometheusCli, err = NewPrometheusClient(*cu)
if err != nil {
log.Error("create prometheus client error", zap.Error(err))
return false
}
// check whether the prometheus is available
_, err = getMetric(prometheusCli, goRoutineMetric, time.Now())
if err != nil {
log.Error("check prometheus availability error, please check the prometheus address", zap.Error(err))
return false
}
return true
}

func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) {
client, err := api.NewClient(api.Config{
Address: prometheusURL.String(),
})
if err != nil {
return nil, err
}

return client, nil
}

// WarmUpRound wait for the first round to warm up
const WarmUpRound = 1

func CollectMetrics(curRound int, wait time.Duration) {
if curRound < WarmUpRound {
return
}
// retry 5 times to get average value
res := make([]struct {
sum float64
count int
}, len(metrics2Collect))
for i := 0; i < 5; i++ {
for j, m := range metrics2Collect {
r, err := getMetric(prometheusCli, m.promSQL, time.Now())
if err != nil {
log.Error("get metric error", zap.String("name", m.name), zap.String("prom sql", m.promSQL), zap.Error(err))
} else if len(r) > 0 {
res[j].sum += r[0]
res[j].count += 1
}
}
time.Sleep(wait)
}
getRes := func(index int) float64 {
if res[index].count == 0 {
return 0
}
return res[index].sum / float64(res[index].count)
}
for i := 0; i < len(metrics2Collect); i++ {
metrics2Collect[i].value = getRes(i)
if metrics2Collect[i].max {
finalMetrics2Collect[i].value = max(finalMetrics2Collect[i].value, metrics2Collect[i].value)
} else {
finalMetrics2Collect[i].value = (finalMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1)
}
}

collectRound += 1
log.Info("metrics collected", zap.Float64("round", collectRound), zap.String("metrics", formatMetrics(metrics2Collect)))
}

func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) {
httpAPI := v1.NewAPI(cli)
val, _, err := httpAPI.Query(context.Background(), query, ts)
if err != nil {
return nil, err
}
valMatrix := val.(model.Vector)
if len(valMatrix) == 0 {
return nil, nil
}
var value []float64
for i := range valMatrix {
value = append(value, float64(valMatrix[i].Value))
// judge whether exceeded float maximum value
if math.IsNaN(value[i]) {
return nil, fmt.Errorf("prometheus query result exceeded float maximum value, result=%s", valMatrix[i].String())
}
}
return value, nil
}

func formatMetrics(ms []Metric) string {
res := ""
for _, m := range ms {
res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " "
}
return res
}

func CollectRegionAndStoreStats(regionStats *report.Stats, storeTime *float64) {
if regionStats != nil && storeTime != nil {
collect(*regionStats, *storeTime)
}
}

func collect(regionStats report.Stats, storeTime float64) {
average := func(avg, new float64) float64 {
return (avg*collectRound + new) / (collectRound + 1)
}

avgRegionStats.Total = time.Duration(average(float64(avgRegionStats.Total), float64(regionStats.Total)))
avgRegionStats.Average = average(avgRegionStats.Average, regionStats.Average)
avgRegionStats.Stddev = average(avgRegionStats.Stddev, regionStats.Stddev)
avgRegionStats.Fastest = average(avgRegionStats.Fastest, regionStats.Fastest)
avgRegionStats.Slowest = average(avgRegionStats.Slowest, regionStats.Slowest)
avgRegionStats.RPS = average(avgRegionStats.RPS, regionStats.RPS)
avgStoreTime = average(avgStoreTime, storeTime)
}

func OutputConclusion() {
logFields := RegionFields(avgRegionStats,
zap.Float64("avg store time", avgStoreTime),
zap.Float64("current round", collectRound),
zap.String("metrics", formatMetrics(finalMetrics2Collect)))
log.Info("final metrics collected", logFields...)
}

func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field {
return append([]zap.Field{
zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())),
zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)),
zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)),
zap.String("average", fmt.Sprintf("%.4fs", stats.Average)),
zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)),
zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)),
}, fields...)
}

0 comments on commit 5d77447

Please sign in to comment.