From 5c4fb87d45186c949f78d959e302d673cc96249d Mon Sep 17 00:00:00 2001 From: wangguangshuo <1924240046@qq.com> Date: Tue, 16 Jan 2024 11:46:22 +0000 Subject: [PATCH 1/5] fix(collector): fix the parsing way of metrics returned by the server --- collector/metrics/metric_collector.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/collector/metrics/metric_collector.go b/collector/metrics/metric_collector.go index 0c3d1de0bd..f3c47785e3 100644 --- a/collector/metrics/metric_collector.go +++ b/collector/metrics/metric_collector.go @@ -73,8 +73,8 @@ func NewMetricCollector( SummaryMetricsMap = make(map[string]prometheus.Summary, 128) RoleByDataSource = make(map[int]string, 128) TableNameByID = make(map[string]string, 128) - RoleByDataSource[0] = "meta_server" - RoleByDataSource[1] = "replica_server" + RoleByDataSource[0] = "meta" + RoleByDataSource[1] = "replica" initMetrics() return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout} @@ -143,7 +143,7 @@ func initMetrics() { return } jsonData := gjson.Parse(data) - for _, entity := range jsonData.Array() { + for _, entity := range jsonData.Get("entities").Array() { for _, metric := range entity.Get("metrics").Array() { var name string = metric.Get("name").String() var mtype string = metric.Get("type").String() @@ -211,7 +211,7 @@ func processAllServerMetrics() { return } jsonData := gjson.Parse(data) - for _, entity := range jsonData.Array() { + for _, entity := range jsonData.Get("entities").Array() { etype := entity.Get("type").String() switch etype { case "replica": From c295f2b9b2b088ef6010b3978c6cac7a55135020 Mon Sep 17 00:00:00 2001 From: wangguangshuo <1924240046@qq.com> Date: Wed, 17 Jan 2024 06:51:19 +0000 Subject: [PATCH 2/5] fix(collector): fix the write overwrite caused by multiple collectors sharing a global variable DataSource --- collector/metrics/metric_collector.go | 47 +++++++++++++-------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/collector/metrics/metric_collector.go b/collector/metrics/metric_collector.go index f3c47785e3..bb9175affc 100644 --- a/collector/metrics/metric_collector.go +++ b/collector/metrics/metric_collector.go @@ -53,8 +53,7 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec var CounterMetricsMap map[string]prometheus.CounterVec var SummaryMetricsMap map[string]prometheus.Summary -// DataSource 0 meta server, 1 replica server. -var DataSource int +// RoleByDataSource 0 meta server, 1 replica server. var RoleByDataSource map[int]string var TableNameByID map[string]string @@ -67,7 +66,6 @@ func NewMetricCollector( dataSource int, detectInterval time.Duration, detectTimeout time.Duration) MetricCollector { - DataSource = dataSource GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) CounterMetricsMap = make(map[string]prometheus.CounterVec, 128) SummaryMetricsMap = make(map[string]prometheus.Summary, 128) @@ -75,12 +73,13 @@ func NewMetricCollector( TableNameByID = make(map[string]string, 128) RoleByDataSource[0] = "meta" RoleByDataSource[1] = "replica" - initMetrics() + initMetrics(dataSource) - return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout} + return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, dataSource: dataSource} } type Collector struct { + dataSource int detectInterval time.Duration detectTimeout time.Duration } @@ -93,7 +92,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error { return nil case <-ticker.C: updateClusterTableInfo() - processAllServerMetrics() + processAllServerMetrics(collector.dataSource) } } } @@ -124,10 +123,10 @@ func getReplicaAddrs() ([]string, error) { } // Register all metrics. -func initMetrics() { +func initMetrics(dataSource int) { var addrs []string var err error - if DataSource == MetaServer { + if dataSource == MetaServer { addrs = viper.GetStringSlice("meta_servers") } else { addrs, err = getReplicaAddrs() @@ -188,10 +187,10 @@ func initMetrics() { } // Parse metric data and update metrics. -func processAllServerMetrics() { +func processAllServerMetrics(dataSource int) { var addrs []string var err error - if DataSource == MetaServer { + if dataSource == MetaServer { addrs = viper.GetStringSlice("meta_servers") } else { addrs, err = getReplicaAddrs() @@ -225,7 +224,7 @@ func processAllServerMetrics() { tableID, &metricsByTableID) collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID, &metricsByServerTableID) - updateServerLevelTableMetrics(addr, metricsByServerTableID) + updateServerLevelTableMetrics(addr, metricsByServerTableID, dataSource) case "server": mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(), metricsOfCluster) @@ -237,13 +236,13 @@ func processAllServerMetrics() { } } - updateClusterLevelTableMetrics(metricsByTableID) - updateServerLevelServerMetrics(metricsByAddr) - updateClusterLevelMetrics(metricsOfCluster) + updateClusterLevelTableMetrics(metricsByTableID, dataSource) + updateServerLevelServerMetrics(metricsByAddr, dataSource) + updateClusterLevelMetrics(metricsOfCluster, dataSource) } // Update table metrics. They belong to a specified server. -func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) { +func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics, dataSource int) { for tableID, metrics := range metricsByServerTableID { var tableName string if name, ok := TableNameByID[tableID]; !ok { @@ -252,29 +251,29 @@ func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[strin tableName = name } for _, metric := range metrics { - updateMetric(metric, addr, "server", tableName) + updateMetric(metric, addr, "server", tableName, dataSource) } } } // Update server metrics. They belong to a specified server. -func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) { +func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics, dataSource int) { for addr, metrics := range metricsByAddr { for _, metric := range metrics { - updateMetric(metric, addr, "server", "server") + updateMetric(metric, addr, "server", "server", dataSource) } } } // Update cluster level metrics. They belong to a cluster. -func updateClusterLevelMetrics(metricsOfCluster []Metric) { +func updateClusterLevelMetrics(metricsOfCluster []Metric, dataSource int) { for _, metric := range metricsOfCluster { - updateMetric(metric, "cluster", "server", metric.name) + updateMetric(metric, "cluster", "server", metric.name, dataSource) } } // Update table metrics. They belong to a cluster. -func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) { +func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics, dataSource int) { for tableID, metrics := range metricsByTableID { var tableName string if name, ok := TableNameByID[tableID]; !ok { @@ -283,13 +282,13 @@ func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) { tableName = name } for _, metric := range metrics { - updateMetric(metric, "cluster", "table", tableName) + updateMetric(metric, "cluster", "table", tableName, dataSource) } } } -func updateMetric(metric Metric, endpoint string, level string, title string) { - role := RoleByDataSource[DataSource] +func updateMetric(metric Metric, endpoint string, level string, title string, dataSource int) { + role := RoleByDataSource[dataSource] switch metric.mtype { case "Counter": if counter, ok := CounterMetricsMap[metric.name]; ok { From 05132734571497222ed88fd8352236aaebdcf8b1 Mon Sep 17 00:00:00 2001 From: wangguangshuo <1924240046@qq.com> Date: Thu, 18 Jan 2024 03:20:19 +0000 Subject: [PATCH 3/5] make both the initMetrics function (some other functions are the same) and dataSource as members of Collector --- collector/metrics/metric_collector.go | 57 ++++++++++++++------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/collector/metrics/metric_collector.go b/collector/metrics/metric_collector.go index bb9175affc..17e8baad42 100644 --- a/collector/metrics/metric_collector.go +++ b/collector/metrics/metric_collector.go @@ -53,9 +53,6 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec var CounterMetricsMap map[string]prometheus.CounterVec var SummaryMetricsMap map[string]prometheus.Summary -// RoleByDataSource 0 meta server, 1 replica server. -var RoleByDataSource map[int]string - var TableNameByID map[string]string type MetricCollector interface { @@ -69,19 +66,23 @@ func NewMetricCollector( GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) CounterMetricsMap = make(map[string]prometheus.CounterVec, 128) SummaryMetricsMap = make(map[string]prometheus.Summary, 128) - RoleByDataSource = make(map[int]string, 128) TableNameByID = make(map[string]string, 128) - RoleByDataSource[0] = "meta" - RoleByDataSource[1] = "replica" - initMetrics(dataSource) - return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, dataSource: dataSource} + var role string + if dataSource == 0 { + role = "meta" + } else { + role = "replica" + } + var collector = Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, role: role} + collector.initMetrics() + return &collector } type Collector struct { - dataSource int detectInterval time.Duration detectTimeout time.Duration + role string } func (collector *Collector) Start(tom *tomb.Tomb) error { @@ -92,7 +93,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error { return nil case <-ticker.C: updateClusterTableInfo() - processAllServerMetrics(collector.dataSource) + collector.processAllServerMetrics() } } } @@ -123,10 +124,10 @@ func getReplicaAddrs() ([]string, error) { } // Register all metrics. -func initMetrics(dataSource int) { +func (collector *Collector) initMetrics() { var addrs []string var err error - if dataSource == MetaServer { + if collector.role == "meta" { addrs = viper.GetStringSlice("meta_servers") } else { addrs, err = getReplicaAddrs() @@ -187,10 +188,10 @@ func initMetrics(dataSource int) { } // Parse metric data and update metrics. -func processAllServerMetrics(dataSource int) { +func (collector *Collector) processAllServerMetrics() { var addrs []string var err error - if dataSource == MetaServer { + if collector.role == "meta" { addrs = viper.GetStringSlice("meta_servers") } else { addrs, err = getReplicaAddrs() @@ -224,7 +225,7 @@ func processAllServerMetrics(dataSource int) { tableID, &metricsByTableID) collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID, &metricsByServerTableID) - updateServerLevelTableMetrics(addr, metricsByServerTableID, dataSource) + collector.updateServerLevelTableMetrics(addr, metricsByServerTableID) case "server": mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(), metricsOfCluster) @@ -236,13 +237,13 @@ func processAllServerMetrics(dataSource int) { } } - updateClusterLevelTableMetrics(metricsByTableID, dataSource) - updateServerLevelServerMetrics(metricsByAddr, dataSource) - updateClusterLevelMetrics(metricsOfCluster, dataSource) + collector.updateClusterLevelTableMetrics(metricsByTableID) + collector.updateServerLevelServerMetrics(metricsByAddr) + collector.updateClusterLevelMetrics(metricsOfCluster) } // Update table metrics. They belong to a specified server. -func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics, dataSource int) { +func (collector *Collector) updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) { for tableID, metrics := range metricsByServerTableID { var tableName string if name, ok := TableNameByID[tableID]; !ok { @@ -251,29 +252,29 @@ func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[strin tableName = name } for _, metric := range metrics { - updateMetric(metric, addr, "server", tableName, dataSource) + collector.updateMetric(metric, addr, "server", tableName) } } } // Update server metrics. They belong to a specified server. -func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics, dataSource int) { +func (collector *Collector) updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) { for addr, metrics := range metricsByAddr { for _, metric := range metrics { - updateMetric(metric, addr, "server", "server", dataSource) + collector.updateMetric(metric, addr, "server", "server") } } } // Update cluster level metrics. They belong to a cluster. -func updateClusterLevelMetrics(metricsOfCluster []Metric, dataSource int) { +func (collector *Collector) updateClusterLevelMetrics(metricsOfCluster []Metric) { for _, metric := range metricsOfCluster { - updateMetric(metric, "cluster", "server", metric.name, dataSource) + collector.updateMetric(metric, "cluster", "server", metric.name) } } // Update table metrics. They belong to a cluster. -func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics, dataSource int) { +func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) { for tableID, metrics := range metricsByTableID { var tableName string if name, ok := TableNameByID[tableID]; !ok { @@ -282,13 +283,13 @@ func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics, dataSou tableName = name } for _, metric := range metrics { - updateMetric(metric, "cluster", "table", tableName, dataSource) + collector.updateMetric(metric, "cluster", "table", tableName) } } } -func updateMetric(metric Metric, endpoint string, level string, title string, dataSource int) { - role := RoleByDataSource[dataSource] +func (collector *Collector) updateMetric(metric Metric, endpoint string, level string, title string) { + role := collector.role switch metric.mtype { case "Counter": if counter, ok := CounterMetricsMap[metric.name]; ok { From 99659c0fa0f1ab0625cee448e82453b1a8b39ee4 Mon Sep 17 00:00:00 2001 From: wangguangshuo <1924240046@qq.com> Date: Thu, 18 Jan 2024 04:01:25 +0000 Subject: [PATCH 4/5] change dataSource to role, and certainly change the type of dataSource to string instead of int --- collector/metrics/metric_collector.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/collector/metrics/metric_collector.go b/collector/metrics/metric_collector.go index 17e8baad42..7b3300ae60 100644 --- a/collector/metrics/metric_collector.go +++ b/collector/metrics/metric_collector.go @@ -34,8 +34,8 @@ import ( ) const ( - MetaServer int = 0 - ReplicaServer int = 1 + MetaServer string = "meta" + ReplicaServer string = "replica" ) type Metric struct { @@ -60,7 +60,7 @@ type MetricCollector interface { } func NewMetricCollector( - dataSource int, + role string, detectInterval time.Duration, detectTimeout time.Duration) MetricCollector { GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128) @@ -68,12 +68,6 @@ func NewMetricCollector( SummaryMetricsMap = make(map[string]prometheus.Summary, 128) TableNameByID = make(map[string]string, 128) - var role string - if dataSource == 0 { - role = "meta" - } else { - role = "replica" - } var collector = Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, role: role} collector.initMetrics() return &collector From 611cd4d454407163e3cb6da5f016729ba44a079a Mon Sep 17 00:00:00 2001 From: wangguangshuo <1924240046@qq.com> Date: Thu, 18 Jan 2024 05:22:01 +0000 Subject: [PATCH 5/5] change meta to MetaServer and remove the role variable in the updateMetric function --- collector/metrics/metric_collector.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/collector/metrics/metric_collector.go b/collector/metrics/metric_collector.go index 7b3300ae60..9e6f57bbb5 100644 --- a/collector/metrics/metric_collector.go +++ b/collector/metrics/metric_collector.go @@ -121,7 +121,7 @@ func getReplicaAddrs() ([]string, error) { func (collector *Collector) initMetrics() { var addrs []string var err error - if collector.role == "meta" { + if collector.role == MetaServer { addrs = viper.GetStringSlice("meta_servers") } else { addrs, err = getReplicaAddrs() @@ -185,7 +185,7 @@ func (collector *Collector) initMetrics() { func (collector *Collector) processAllServerMetrics() { var addrs []string var err error - if collector.role == "meta" { + if collector.role == MetaServer { addrs = viper.GetStringSlice("meta_servers") } else { addrs, err = getReplicaAddrs() @@ -283,13 +283,12 @@ func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID map[ } func (collector *Collector) updateMetric(metric Metric, endpoint string, level string, title string) { - role := collector.role switch metric.mtype { case "Counter": if counter, ok := CounterMetricsMap[metric.name]; ok { counter.With( prometheus.Labels{"endpoint": endpoint, - "role": role, "level": level, + "role": collector.role, "level": level, "title": title}).Add(float64(metric.value)) } else { log.Warnf("Unknown metric name %s", metric.name) @@ -298,7 +297,7 @@ func (collector *Collector) updateMetric(metric Metric, endpoint string, level s if gauge, ok := GaugeMetricsMap[metric.name]; ok { gauge.With( prometheus.Labels{"endpoint": endpoint, - "role": role, "level": level, + "role": collector.role, "level": level, "title": title}).Set(float64(metric.value)) } else { log.Warnf("Unknown metric name %s", metric.name)