Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(collector): fix the write overwrite caused by multiple collectors sharing a global variable DataSource #1857

Merged
merged 6 commits into from
Jan 18, 2024
61 changes: 27 additions & 34 deletions collector/metrics/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

const (
MetaServer int = 0
ReplicaServer int = 1
MetaServer string = "meta"
ReplicaServer string = "replica"
)

type Metric struct {
Expand All @@ -53,36 +53,30 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec
var CounterMetricsMap map[string]prometheus.CounterVec
var SummaryMetricsMap map[string]prometheus.Summary

// DataSource 0 meta server, 1 replica server.
var DataSource int
var RoleByDataSource map[int]string

var TableNameByID map[string]string

type MetricCollector interface {
Start(tom *tomb.Tomb) error
}

func NewMetricCollector(
dataSource int,
role string,
detectInterval time.Duration,
detectTimeout time.Duration) MetricCollector {
DataSource = dataSource
GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
RoleByDataSource = make(map[int]string, 128)
TableNameByID = make(map[string]string, 128)
RoleByDataSource[0] = "meta"
RoleByDataSource[1] = "replica"
initMetrics()

return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
var collector = Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, role: role}
collector.initMetrics()
return &collector
}

type Collector struct {
detectInterval time.Duration
detectTimeout time.Duration
role string
}

func (collector *Collector) Start(tom *tomb.Tomb) error {
Expand All @@ -93,7 +87,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error {
return nil
case <-ticker.C:
updateClusterTableInfo()
processAllServerMetrics()
collector.processAllServerMetrics()
}
}
}
Expand Down Expand Up @@ -124,10 +118,10 @@ func getReplicaAddrs() ([]string, error) {
}

// Register all metrics.
func initMetrics() {
func (collector *Collector) initMetrics() {
var addrs []string
var err error
if DataSource == MetaServer {
if collector.role == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
Expand Down Expand Up @@ -188,10 +182,10 @@ func initMetrics() {
}

// Parse metric data and update metrics.
func processAllServerMetrics() {
func (collector *Collector) processAllServerMetrics() {
var addrs []string
var err error
if DataSource == MetaServer {
if collector.role == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
Expand Down Expand Up @@ -225,7 +219,7 @@ func processAllServerMetrics() {
tableID, &metricsByTableID)
collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
&metricsByServerTableID)
updateServerLevelTableMetrics(addr, metricsByServerTableID)
collector.updateServerLevelTableMetrics(addr, metricsByServerTableID)
case "server":
mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
metricsOfCluster)
Expand All @@ -237,13 +231,13 @@ func processAllServerMetrics() {
}
}

updateClusterLevelTableMetrics(metricsByTableID)
updateServerLevelServerMetrics(metricsByAddr)
updateClusterLevelMetrics(metricsOfCluster)
collector.updateClusterLevelTableMetrics(metricsByTableID)
collector.updateServerLevelServerMetrics(metricsByAddr)
collector.updateClusterLevelMetrics(metricsOfCluster)
}

// Update table metrics. They belong to a specified server.
func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
func (collector *Collector) updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
for tableID, metrics := range metricsByServerTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
Expand All @@ -252,29 +246,29 @@ func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[strin
tableName = name
}
for _, metric := range metrics {
updateMetric(metric, addr, "server", tableName)
collector.updateMetric(metric, addr, "server", tableName)
}
}
}

// Update server metrics. They belong to a specified server.
func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
func (collector *Collector) updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
for addr, metrics := range metricsByAddr {
for _, metric := range metrics {
updateMetric(metric, addr, "server", "server")
collector.updateMetric(metric, addr, "server", "server")
}
}
}

// Update cluster level metrics. They belong to a cluster.
func updateClusterLevelMetrics(metricsOfCluster []Metric) {
func (collector *Collector) updateClusterLevelMetrics(metricsOfCluster []Metric) {
for _, metric := range metricsOfCluster {
updateMetric(metric, "cluster", "server", metric.name)
collector.updateMetric(metric, "cluster", "server", metric.name)
}
}

// Update table metrics. They belong to a cluster.
func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
for tableID, metrics := range metricsByTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
Expand All @@ -283,19 +277,18 @@ func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
tableName = name
}
for _, metric := range metrics {
updateMetric(metric, "cluster", "table", tableName)
collector.updateMetric(metric, "cluster", "table", tableName)
}
}
}

func updateMetric(metric Metric, endpoint string, level string, title string) {
role := RoleByDataSource[DataSource]
func (collector *Collector) updateMetric(metric Metric, endpoint string, level string, title string) {
switch metric.mtype {
case "Counter":
if counter, ok := CounterMetricsMap[metric.name]; ok {
counter.With(
prometheus.Labels{"endpoint": endpoint,
"role": role, "level": level,
"role": collector.role, "level": level,
"title": title}).Add(float64(metric.value))
} else {
log.Warnf("Unknown metric name %s", metric.name)
Expand All @@ -304,7 +297,7 @@ func updateMetric(metric Metric, endpoint string, level string, title string) {
if gauge, ok := GaugeMetricsMap[metric.name]; ok {
gauge.With(
prometheus.Labels{"endpoint": endpoint,
"role": role, "level": level,
"role": collector.role, "level": level,
"title": title}).Set(float64(metric.value))
} else {
log.Warnf("Unknown metric name %s", metric.name)
Expand Down
Loading