Skip to content

Commit

Permalink
fix(collector): fix the write overwrite caused by multiple collectors…
Browse files Browse the repository at this point in the history
… sharing a global variable DataSource (#1857)

#1820
  • Loading branch information
limowang authored Jan 18, 2024
1 parent edc5b53 commit 09dc9e0
Showing 1 changed file with 27 additions and 34 deletions.
61 changes: 27 additions & 34 deletions collector/metrics/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

const (
MetaServer int = 0
ReplicaServer int = 1
MetaServer string = "meta"
ReplicaServer string = "replica"
)

type Metric struct {
Expand All @@ -53,36 +53,30 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec
var CounterMetricsMap map[string]prometheus.CounterVec
var SummaryMetricsMap map[string]prometheus.Summary

// DataSource 0 meta server, 1 replica server.
var DataSource int
var RoleByDataSource map[int]string

var TableNameByID map[string]string

type MetricCollector interface {
Start(tom *tomb.Tomb) error
}

func NewMetricCollector(
dataSource int,
role string,
detectInterval time.Duration,
detectTimeout time.Duration) MetricCollector {
DataSource = dataSource
GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
RoleByDataSource = make(map[int]string, 128)
TableNameByID = make(map[string]string, 128)
RoleByDataSource[0] = "meta"
RoleByDataSource[1] = "replica"
initMetrics()

return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
var collector = Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, role: role}
collector.initMetrics()
return &collector
}

type Collector struct {
detectInterval time.Duration
detectTimeout time.Duration
role string
}

func (collector *Collector) Start(tom *tomb.Tomb) error {
Expand All @@ -93,7 +87,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error {
return nil
case <-ticker.C:
updateClusterTableInfo()
processAllServerMetrics()
collector.processAllServerMetrics()
}
}
}
Expand Down Expand Up @@ -124,10 +118,10 @@ func getReplicaAddrs() ([]string, error) {
}

// Register all metrics.
func initMetrics() {
func (collector *Collector) initMetrics() {
var addrs []string
var err error
if DataSource == MetaServer {
if collector.role == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
Expand Down Expand Up @@ -188,10 +182,10 @@ func initMetrics() {
}

// Parse metric data and update metrics.
func processAllServerMetrics() {
func (collector *Collector) processAllServerMetrics() {
var addrs []string
var err error
if DataSource == MetaServer {
if collector.role == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
Expand Down Expand Up @@ -225,7 +219,7 @@ func processAllServerMetrics() {
tableID, &metricsByTableID)
collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
&metricsByServerTableID)
updateServerLevelTableMetrics(addr, metricsByServerTableID)
collector.updateServerLevelTableMetrics(addr, metricsByServerTableID)
case "server":
mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
metricsOfCluster)
Expand All @@ -237,13 +231,13 @@ func processAllServerMetrics() {
}
}

updateClusterLevelTableMetrics(metricsByTableID)
updateServerLevelServerMetrics(metricsByAddr)
updateClusterLevelMetrics(metricsOfCluster)
collector.updateClusterLevelTableMetrics(metricsByTableID)
collector.updateServerLevelServerMetrics(metricsByAddr)
collector.updateClusterLevelMetrics(metricsOfCluster)
}

// Update table metrics. They belong to a specified server.
func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
func (collector *Collector) updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
for tableID, metrics := range metricsByServerTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
Expand All @@ -252,29 +246,29 @@ func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[strin
tableName = name
}
for _, metric := range metrics {
updateMetric(metric, addr, "server", tableName)
collector.updateMetric(metric, addr, "server", tableName)
}
}
}

// Update server metrics. They belong to a specified server.
func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
func (collector *Collector) updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
for addr, metrics := range metricsByAddr {
for _, metric := range metrics {
updateMetric(metric, addr, "server", "server")
collector.updateMetric(metric, addr, "server", "server")
}
}
}

// Update cluster level metrics. They belong to a cluster.
func updateClusterLevelMetrics(metricsOfCluster []Metric) {
func (collector *Collector) updateClusterLevelMetrics(metricsOfCluster []Metric) {
for _, metric := range metricsOfCluster {
updateMetric(metric, "cluster", "server", metric.name)
collector.updateMetric(metric, "cluster", "server", metric.name)
}
}

// Update table metrics. They belong to a cluster.
func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
for tableID, metrics := range metricsByTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
Expand All @@ -283,19 +277,18 @@ func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
tableName = name
}
for _, metric := range metrics {
updateMetric(metric, "cluster", "table", tableName)
collector.updateMetric(metric, "cluster", "table", tableName)
}
}
}

func updateMetric(metric Metric, endpoint string, level string, title string) {
role := RoleByDataSource[DataSource]
func (collector *Collector) updateMetric(metric Metric, endpoint string, level string, title string) {
switch metric.mtype {
case "Counter":
if counter, ok := CounterMetricsMap[metric.name]; ok {
counter.With(
prometheus.Labels{"endpoint": endpoint,
"role": role, "level": level,
"role": collector.role, "level": level,
"title": title}).Add(float64(metric.value))
} else {
log.Warnf("Unknown metric name %s", metric.name)
Expand All @@ -304,7 +297,7 @@ func updateMetric(metric Metric, endpoint string, level string, title string) {
if gauge, ok := GaugeMetricsMap[metric.name]; ok {
gauge.With(
prometheus.Labels{"endpoint": endpoint,
"role": role, "level": level,
"role": collector.role, "level": level,
"title": title}).Set(float64(metric.value))
} else {
log.Warnf("Unknown metric name %s", metric.name)
Expand Down

0 comments on commit 09dc9e0

Please sign in to comment.