From b8c4228d91093cef6146a98297890c41559dff84 Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Mon, 27 Feb 2023 17:47:33 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#127]=20=E5=90=88=E5=B9=B6prometheus?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E7=9A=84=E7=9B=91=E6=8E=A7=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=20(#124)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 添加就近路由支持文档 * rebase upstream/master * feat:issue 118 * feat:issue 118 * feat:issue 118 * feat:issue 118 * fix:修复获取SDK自身IP不对问题 * fix:修复获取SDK自身IP不对问题 * fix:修复获取SDK自身IP不对问题 * fix:修复获取SDK自身IP不对问题 --- .gitignore | 3 +- api/config.go | 7 +- examples/circuitbreaker/consumer/polaris.yaml | 4 +- examples/circuitbreaker/provider/polaris.yaml | 6 +- examples/quickstart/consumer/polaris.yaml | 4 +- examples/quickstart/provider/polaris.yaml | 5 +- examples/ratelimit/consumer/polaris.yaml | 4 +- examples/ratelimit/provider/polaris.yaml | 4 +- examples/route/dynamic/consumer/polaris.yaml | 4 +- examples/route/dynamic/provider/polaris.yaml | 4 +- examples/route/nearby/consumer/polaris.yaml | 4 +- examples/route/nearby/provider/polaris.yaml | 4 +- examples/watchinstance/polaris.yaml | 4 +- pkg/config/default.go | 16 +- pkg/config/statreporter.go | 2 +- pkg/plugin/register/plugins.go | 1 - plugin/metrics/prometheus/config.go | 20 +- plugin/metrics/prometheus/model.go | 81 +++++ .../metrics/prometheus/prometheus_handler.go | 281 --------------- .../metrics/prometheus/prometheus_reporter.go | 239 +++++++++++-- .../metrics/pushgateway/addons/max_gauge.go | 200 ----------- .../pushgateway/addons/max_gauge_test.go | 104 ------ plugin/metrics/pushgateway/config.go | 48 --- plugin/metrics/pushgateway/model.go | 325 ------------------ .../pushgateway/pushgateway_handler.go | 221 ------------ .../pushgateway/pushgateway_reporter.go | 141 -------- polaris.yaml | 31 +- 27 files changed, 352 insertions(+), 1415 deletions(-) delete mode 100644 plugin/metrics/prometheus/prometheus_handler.go delete mode 100644 plugin/metrics/pushgateway/addons/max_gauge.go delete mode 100644 plugin/metrics/pushgateway/addons/max_gauge_test.go delete mode 100644 plugin/metrics/pushgateway/config.go delete mode 100644 plugin/metrics/pushgateway/model.go delete mode 100644 plugin/metrics/pushgateway/pushgateway_handler.go delete mode 100644 plugin/metrics/pushgateway/pushgateway_reporter.go diff --git a/.gitignore b/.gitignore index a5a262bc..874eb7c9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,6 @@ backup/ polaris/log/* angevil_test/ -test/other/other_test_suit.go *.log /vendor/ @@ -14,4 +13,4 @@ test/other/other_test_suit.go .vscode/ style_tool/ -goimports-reviser \ No newline at end of file +goimports-reviser diff --git a/api/config.go b/api/config.go index 3c10ee33..be5f384c 100644 --- a/api/config.go +++ b/api/config.go @@ -336,7 +336,12 @@ func getSelfIP(cfg config.Configuration) { conn, _ := net.Dial("tcp", address[0]) if conn != nil { - cfg.GetGlobal().GetAPI().SetBindIP(conn.LocalAddr().String()) + localAddr := conn.LocalAddr().String() + colonIdx := strings.LastIndex(localAddr, ":") + if colonIdx > 0 { + localAddr = localAddr[:colonIdx] + } + cfg.GetGlobal().GetAPI().SetBindIP(localAddr) _ = conn.Close() } } diff --git a/examples/circuitbreaker/consumer/polaris.yaml b/examples/circuitbreaker/consumer/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/circuitbreaker/consumer/polaris.yaml +++ b/examples/circuitbreaker/consumer/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/circuitbreaker/provider/polaris.yaml b/examples/circuitbreaker/provider/polaris.yaml index ba739b74..17c1c10d 100644 --- a/examples/circuitbreaker/provider/polaris.yaml +++ b/examples/circuitbreaker/provider/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: - metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s + type: pull + metricPort: 0 \ No newline at end of file diff --git a/examples/quickstart/consumer/polaris.yaml b/examples/quickstart/consumer/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/quickstart/consumer/polaris.yaml +++ b/examples/quickstart/consumer/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/quickstart/provider/polaris.yaml b/examples/quickstart/provider/polaris.yaml index ba739b74..00d1ea49 100644 --- a/examples/quickstart/provider/polaris.yaml +++ b/examples/quickstart/provider/polaris.yaml @@ -9,7 +9,6 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s + diff --git a/examples/ratelimit/consumer/polaris.yaml b/examples/ratelimit/consumer/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/ratelimit/consumer/polaris.yaml +++ b/examples/ratelimit/consumer/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/ratelimit/provider/polaris.yaml b/examples/ratelimit/provider/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/ratelimit/provider/polaris.yaml +++ b/examples/ratelimit/provider/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/route/dynamic/consumer/polaris.yaml b/examples/route/dynamic/consumer/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/route/dynamic/consumer/polaris.yaml +++ b/examples/route/dynamic/consumer/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/route/dynamic/provider/polaris.yaml b/examples/route/dynamic/provider/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/route/dynamic/provider/polaris.yaml +++ b/examples/route/dynamic/provider/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/route/nearby/consumer/polaris.yaml b/examples/route/nearby/consumer/polaris.yaml index d4bb08d0..a73ad63f 100644 --- a/examples/route/nearby/consumer/polaris.yaml +++ b/examples/route/nearby/consumer/polaris.yaml @@ -16,10 +16,8 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s consumer: serviceRouter: # 服务路由链 diff --git a/examples/route/nearby/provider/polaris.yaml b/examples/route/nearby/provider/polaris.yaml index 2852834b..e81ad3cf 100644 --- a/examples/route/nearby/provider/polaris.yaml +++ b/examples/route/nearby/provider/polaris.yaml @@ -16,7 +16,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/examples/watchinstance/polaris.yaml b/examples/watchinstance/polaris.yaml index ba739b74..08a2db42 100644 --- a/examples/watchinstance/polaris.yaml +++ b/examples/watchinstance/polaris.yaml @@ -9,7 +9,5 @@ global: # - pushgateway plugin: prometheus: + type: pull metricPort: 0 - # pushgateway: - # address: 127.0.0.1:9091 - # pushInterval: 10s diff --git a/pkg/config/default.go b/pkg/config/default.go index eccd3039..39de7ded 100644 --- a/pkg/config/default.go +++ b/pkg/config/default.go @@ -230,19 +230,21 @@ const ( const ( // DefaultStatReporter . - DefaultStatReporter string = "stat2Monitor" + DefaultStatReporter = "stat2Monitor" // DefaultCacheReporter . - DefaultCacheReporter string = "serviceCache" + DefaultCacheReporter = "serviceCache" // DefaultPluginReporter . - DefaultPluginReporter string = "pluginInfo" + DefaultPluginReporter = "pluginInfo" // DefaultLoadBalanceReporter . - DefaultLoadBalanceReporter string = "lbInfo" + DefaultLoadBalanceReporter = "lbInfo" // DefaultRateLimitReporter . - DefaultRateLimitReporter string = "rateLimitRecord" + DefaultRateLimitReporter = "rateLimitRecord" // DefaultServiceRouteReporter . - DefaultServiceRouteReporter string = "serviceRoute" + DefaultServiceRouteReporter = "serviceRoute" // DefaultStatReportEnabled . - DefaultStatReportEnabled bool = false + DefaultStatReportEnabled = true + // DefaultMetricsChain . + DefaultMetricsChain = "prometheus" ) const ( diff --git a/pkg/config/statreporter.go b/pkg/config/statreporter.go index e03d57a3..98f9e7c2 100644 --- a/pkg/config/statreporter.go +++ b/pkg/config/statreporter.go @@ -72,7 +72,7 @@ func (s *StatReporterConfigImpl) SetDefault() { s.Enable = &enable } if len(s.Chain) == 0 { - s.Chain = []string{} + s.Chain = []string{DefaultMetricsChain} } s.Plugin.SetDefault(common.TypeStatReporter) } diff --git a/pkg/plugin/register/plugins.go b/pkg/plugin/register/plugins.go index 3ebfb449..68a33bd4 100644 --- a/pkg/plugin/register/plugins.go +++ b/pkg/plugin/register/plugins.go @@ -43,7 +43,6 @@ import ( _ "github.com/polarismesh/polaris-go/plugin/location" _ "github.com/polarismesh/polaris-go/plugin/logger/zaplog" _ "github.com/polarismesh/polaris-go/plugin/metrics/prometheus" - _ "github.com/polarismesh/polaris-go/plugin/metrics/pushgateway" _ "github.com/polarismesh/polaris-go/plugin/ratelimiter/reject" _ "github.com/polarismesh/polaris-go/plugin/ratelimiter/unirate" _ "github.com/polarismesh/polaris-go/plugin/serverconnector/grpc" diff --git a/plugin/metrics/prometheus/config.go b/plugin/metrics/prometheus/config.go index 1573efa4..c2646af5 100644 --- a/plugin/metrics/prometheus/config.go +++ b/plugin/metrics/prometheus/config.go @@ -34,9 +34,12 @@ const ( // Config prometheus 的配置 type Config struct { - IP string `yaml:"metricHost"` - PortStr string `yaml:"metricPort"` - Port int `yaml:"-"` + Type string `yaml:"type"` + IP string `yaml:"metricHost"` + PortStr string `yaml:"metricPort"` + port int `yaml:"-"` + Interval time.Duration `yaml:"interval"` + Address string `yaml:"address"` } // Verify verify config @@ -46,11 +49,16 @@ func (c *Config) Verify() error { // SetDefault Setting defaults func (c *Config) SetDefault() { + if c.Type == "" { + c.Type = "pull" + } if c.PortStr == "" { - c.Port = defaultMetricPort + c.port = defaultMetricPort return } - + if c.Interval == 0 { + c.Interval = 15 * time.Second + } port, _ := strconv.ParseInt(c.PortStr, 10, 64) - c.Port = int(port) + c.port = int(port) } diff --git a/plugin/metrics/prometheus/model.go b/plugin/metrics/prometheus/model.go index 52551638..6e1b2511 100644 --- a/plugin/metrics/prometheus/model.go +++ b/plugin/metrics/prometheus/model.go @@ -18,9 +18,14 @@ package prometheus import ( "fmt" + "net/http" "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" "github.com/polarismesh/polaris-go/pkg/model" + "github.com/polarismesh/polaris-go/plugin/metrics/prometheus/addons" ) // MetricsType 指标类型,对应 Prometheus 提供的 Collector 类型. @@ -231,6 +236,9 @@ var ( // 主调方相关信息 CallerLabels: func(args interface{}) string { val := args.(*model.ServiceCallResult) + if val.SourceService == nil || len(val.SourceService.Metadata) == 0 { + return "" + } labels := val.SourceService.Metadata var ret []string for k, v := range labels { @@ -323,3 +331,76 @@ func formatLabelsToStr(arguments []model.Argument) string { return strings.Join(s, "|") } + +func buildMetrics() ([]prometheus.Collector, map[string]prometheus.Collector) { + var ( + metricVecCaches = map[string]prometheus.Collector{} + collectors = make([]prometheus.Collector, 0, len(metrcisDesces)) + ) + + for _, desc := range metrcisDesces { + var collector prometheus.Collector + switch desc.MetricType { + case TypeForGaugeVec: + collector = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: desc.Name, + Help: desc.Help, + }, desc.LabelNames) + case TypeForCounterVec: + collector = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: desc.Name, + Help: desc.Help, + }, desc.LabelNames) + case TypeForMaxGaugeVec: + collector = addons.NewMaxGaugeVec(prometheus.GaugeOpts{ + Name: desc.Name, + Help: desc.Help, + }, desc.LabelNames) + case TypeForHistogramVec: + collector = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: desc.Name, + Help: desc.Help, + }, desc.LabelNames) + } + collectors = append(collectors, collector) + metricVecCaches[desc.Name] = collector + } + return collectors, metricVecCaches +} + +type metricsHttpHandler struct { + handler http.Handler + lock sync.RWMutex +} + +// ServeHTTP 提供 prometheus http 服务. +func (p *metricsHttpHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + p.lock.RLock() + defer p.lock.RUnlock() + p.handler.ServeHTTP(writer, request) +} + +func convertInsGaugeToLabels(val *model.ServiceCallResult, bindIP string) map[string]string { + labels := make(map[string]string) + for label, supplier := range InstanceGaugeLabelOrder { + labels[label] = supplier(val) + } + labels[CallerIP] = bindIP + return labels +} + +func convertRateLimitGaugeToLabels(val *model.RateLimitGauge) map[string]string { + labels := make(map[string]string) + for label, supplier := range RateLimitGaugeLabelOrder { + labels[label] = supplier(val) + } + return labels +} + +func convertCircuitBreakGaugeToLabels(val *model.CircuitBreakGauge) map[string]string { + labels := make(map[string]string) + for label, supplier := range CircuitBreakerGaugeLabelOrder { + labels[label] = supplier(val) + } + return labels +} diff --git a/plugin/metrics/prometheus/prometheus_handler.go b/plugin/metrics/prometheus/prometheus_handler.go deleted file mode 100644 index 8c015240..00000000 --- a/plugin/metrics/prometheus/prometheus_handler.go +++ /dev/null @@ -1,281 +0,0 @@ -// Tencent is pleased to support the open source community by making polaris-go available. -// -// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. -// -// Licensed under the BSD 3-Clause License (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://opensource.org/licenses/BSD-3-Clause -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissionsr and limitations under the License. -// - -package prometheus - -import ( - "fmt" - "net" - "net/http" - "sync" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - - "github.com/polarismesh/polaris-go/pkg/log" - "github.com/polarismesh/polaris-go/pkg/model" - "github.com/polarismesh/polaris-go/pkg/plugin" - "github.com/polarismesh/polaris-go/plugin/metrics/prometheus/addons" -) - -// PrometheusHandler handler for prometheus -type PrometheusHandler struct { - // prometheus的metrics注册 - registry *prometheus.Registry - // metrics的 http handler - handler http.Handler - cfg *Config - // - metricVecCaches map[string]prometheus.Collector - ln net.Listener - bindIP string - port int -} - -func newHandler(ctx *plugin.InitContext) (*PrometheusHandler, error) { - p := &PrometheusHandler{} - return p, p.init(ctx) -} - -func (p *PrometheusHandler) init(ctx *plugin.InitContext) error { - cfgValue := ctx.Config.GetGlobal().GetStatReporter().GetPluginConfig(PluginName) - if cfgValue != nil { - p.cfg = cfgValue.(*Config) - } - - p.metricVecCaches = make(map[string]prometheus.Collector) - p.registry = prometheus.NewRegistry() - if err := p.registerMetrics(); err != nil { - return err - } - - p.bindIP = p.cfg.IP - if p.bindIP == "" { - p.bindIP = ctx.Config.GetGlobal().GetAPI().GetBindIP() - } - p.port = p.cfg.Port - - p.handler = &metricsHttpHandler{ - promeHttpHandler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}), - lock: &sync.RWMutex{}, - } - - if ctx.Config.GetGlobal().GetStatReporter().IsEnable() { - p.runInnerMetricsWebServer() - } - - return nil -} - -func (p *PrometheusHandler) registerMetrics() error { - for _, desc := range metrcisDesces { - var collector prometheus.Collector - switch desc.MetricType { - case TypeForGaugeVec: - collector = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - case TypeForCounterVec: - collector = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - case TypeForMaxGaugeVec: - collector = addons.NewMaxGaugeVec(prometheus.GaugeOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - case TypeForHistogramVec: - collector = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - } - - err := p.registry.Register(collector) - if err != nil { - log.GetBaseLogger().Errorf("register prometheus collector error, %v", err) - return err - } - if _, ok := p.metricVecCaches[desc.Name]; ok { - log.GetBaseLogger().Errorf("register prometheus collector duplicate, %s", desc.Name) - return fmt.Errorf("register prometheus collector duplicate, %s", desc.Name) - } - p.metricVecCaches[desc.Name] = collector - } - return nil -} - -// ReportStat 上报采集指标到 prometheus,这里只针对部分 model.InstanceGauge 的实现做处理 -func (p *PrometheusHandler) ReportStat(metricsType model.MetricType, metricsVal model.InstanceGauge) error { - switch metricsType { - case model.ServiceStat: - val, ok := metricsVal.(*model.ServiceCallResult) - if ok { - p.handleServiceGauge(metricsType, val) - } - case model.RateLimitStat: - val, ok := metricsVal.(*model.RateLimitGauge) - if ok { - p.handleRateLimitGauge(metricsType, val) - } - case model.CircuitBreakStat: - val, ok := metricsVal.(*model.CircuitBreakGauge) - if ok { - p.handleCircuitBreakGauge(metricsType, val) - } - } - return nil -} - -// runInnerMetricsWebServer 启动用于 prometheus 主动拉取的 http-server,如果端口设置为负数,则不启用 -func (p *PrometheusHandler) runInnerMetricsWebServer() { - if p.port < 0 { - return - } - - ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", p.bindIP, p.cfg.Port)) - if err != nil { - log.GetBaseLogger().Errorf("start metrics http-server fail: %v", err) - p.port = -1 - return - } - - p.ln = ln - p.port = ln.Addr().(*net.TCPAddr).Port - - go func() { - log.GetBaseLogger().Infof("start metrics http-server address : %s", fmt.Sprintf("%s:%d", p.bindIP, p.port)) - if err := http.Serve(ln, p.GetHttpHandler()); err != nil { - log.GetBaseLogger().Errorf("start metrics http-server fail : %s", err) - return - } - }() -} - -func (p *PrometheusHandler) exportSuccess() bool { - return p.port > 0 -} - -// GetHttpHandler 获取 handler -func (p *PrometheusHandler) GetHttpHandler() http.Handler { - return p.handler -} - -func (p *PrometheusHandler) handleServiceGauge(metricsType model.MetricType, val *model.ServiceCallResult) { - labels := p.convertInsGaugeToLabels(val) - - total := p.metricVecCaches[MetricsNameUpstreamRequestTotal].(*prometheus.CounterVec) - total.With(labels).Inc() - - success := p.metricVecCaches[MetricsNameUpstreamRequestSuccess].(*prometheus.CounterVec) - if val.GetRetStatus() == model.RetSuccess { - success.With(labels).Inc() - } - - delay := val.GetDelay() - if delay != nil { - data := float64(delay.Milliseconds()) - - timeout := p.metricVecCaches[MetricsNameUpstreamRequestTimeout].(*prometheus.GaugeVec) - timeout.With(labels).Add(data) - - maxTimeout := p.metricVecCaches[MetricsNameUpstreamRequestMaxTimeout].(*addons.MaxGaugeVec) - maxTimeout.With(labels).Set(data) - - reqDelay := p.metricVecCaches[MetricsNameUpstreamRequestDelay].(*prometheus.HistogramVec) - reqDelay.With(labels).Observe(data) - } -} - -func (p *PrometheusHandler) handleRateLimitGauge(metricsType model.MetricType, val *model.RateLimitGauge) { - labels := p.convertRateLimitGaugeToLabels(val) - - total := p.metricVecCaches[MetricsNameRateLimitRequestTotal].(*prometheus.CounterVec) - total.With(labels).Inc() - - pass := p.metricVecCaches[MetricsNameRateLimitRequestPass].(*prometheus.CounterVec) - if val.Result == model.QuotaResultOk { - pass.With(labels).Inc() - } - - limit := p.metricVecCaches[MetricsNameRateLimitRequestLimit].(*prometheus.CounterVec) - if val.Result == model.QuotaResultLimited { - limit.With(labels).Inc() - } -} - -func (p *PrometheusHandler) handleCircuitBreakGauge(metricsType model.MetricType, val *model.CircuitBreakGauge) { - labels := p.convertCircuitBreakGaugeToLabels(val) - - open := p.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.GaugeVec) - - // 计算完之后的熔断状态 - status := val.GetCircuitBreakerStatus().GetStatus() - if status == model.Open { - open.With(labels).Inc() - } else { - open.With(labels).Dec() - } - - halfOpen := p.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.GaugeVec) - - if status == model.HalfOpen { - halfOpen.With(labels).Inc() - } else { - halfOpen.With(labels).Dec() - } -} - -func (p *PrometheusHandler) convertInsGaugeToLabels(val *model.ServiceCallResult) map[string]string { - labels := make(map[string]string) - - for label, supplier := range InstanceGaugeLabelOrder { - labels[label] = supplier(val) - } - - labels[CallerIP] = p.bindIP - return labels -} - -func (p *PrometheusHandler) convertRateLimitGaugeToLabels(val *model.RateLimitGauge) map[string]string { - labels := make(map[string]string) - - for label, supplier := range RateLimitGaugeLabelOrder { - labels[label] = supplier(val) - } - return labels -} - -func (p *PrometheusHandler) convertCircuitBreakGaugeToLabels(val *model.CircuitBreakGauge) map[string]string { - labels := make(map[string]string) - - for label, supplier := range CircuitBreakerGaugeLabelOrder { - labels[label] = supplier(val) - } - return labels -} - -// Close the prometheus handler -func (p *PrometheusHandler) Close() error { - if p.ln != nil { - if err := p.ln.Close(); err != nil { - return err - } - } - return nil -} diff --git a/plugin/metrics/prometheus/prometheus_reporter.go b/plugin/metrics/prometheus/prometheus_reporter.go index 202c6080..15ea6398 100644 --- a/plugin/metrics/prometheus/prometheus_reporter.go +++ b/plugin/metrics/prometheus/prometheus_reporter.go @@ -18,18 +18,31 @@ package prometheus import ( + "fmt" + "io" + "net" "net/http" "sync" + "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/client_golang/prometheus/push" + + "github.com/polarismesh/polaris-go/pkg/log" "github.com/polarismesh/polaris-go/pkg/model" "github.com/polarismesh/polaris-go/pkg/plugin" "github.com/polarismesh/polaris-go/pkg/plugin/common" statreporter "github.com/polarismesh/polaris-go/pkg/plugin/metrics" + "github.com/polarismesh/polaris-go/plugin/metrics/prometheus/addons" ) const ( // PluginName is the name of the plugin. - PluginName string = "prometheus" + PluginName = "prometheus" + _metricsPull = "pull" + _metricsPush = "push" + _defaultJobName = "polaris-client" ) var _ statreporter.StatReporter = (*PrometheusReporter)(nil) @@ -39,6 +52,13 @@ func init() { plugin.RegisterPlugin(&PrometheusReporter{}) } +type metricsHandler interface { + io.Closer + init(ctx *plugin.InitContext) error + reportStat(metricsType model.MetricType, metricsVal model.InstanceGauge) error + info() model.StatInfo +} + // PrometheusReporter is a prometheus reporter. type PrometheusReporter struct { *plugin.PluginBase @@ -50,9 +70,22 @@ type PrometheusReporter struct { // sdk加载的插件 sdkPlugins string // 插件工厂 - plugins plugin.Supplier - // prometheus的metrics注册 - handler *PrometheusHandler + plugins plugin.Supplier + once sync.Once + metricVecCaches map[string]prometheus.Collector + + clientIP string + bindIP string + + // registry metrics registry + registry *prometheus.Registry + + // metrics pull resource + ln net.Listener + pullPort int32 + + // metrics push resource + pushTicker *time.Ticker } // Type 插件类型. @@ -71,27 +104,182 @@ func (s *PrometheusReporter) Init(ctx *plugin.InitContext) error { s.globalCtx = ctx.ValueCtx s.plugins = ctx.Plugins s.PluginBase = plugin.NewPluginBase(ctx) - handler, err := newHandler(ctx) - if err != nil { - return err + s.clientIP = ctx.Config.GetGlobal().GetAPI().GetBindIP() + s.bindIP = ctx.Config.GetGlobal().GetAPI().GetBindIP() + cfgValue := ctx.Config.GetGlobal().GetStatReporter().GetPluginConfig(PluginName) + if cfgValue != nil { + s.cfg = cfgValue.(*Config) + } + s.registry = prometheus.NewRegistry() + + collectors, metricsVecCaches := buildMetrics() + s.metricVecCaches = metricsVecCaches + for i := range collectors { + collector := collectors[i] + if err := s.registry.Register(collector); err != nil { + return err + } } - s.handler = handler + return nil } // ReportStat 报告统计数据. -func (s *PrometheusReporter) ReportStat(metricType model.MetricType, metricsVal model.InstanceGauge) error { - return s.handler.ReportStat(metricType, metricsVal) +func (s *PrometheusReporter) ReportStat(metricsType model.MetricType, metricsVal model.InstanceGauge) error { + s.prepare() + switch metricsType { + case model.ServiceStat: + val, ok := metricsVal.(*model.ServiceCallResult) + if ok { + s.handleServiceGauge(metricsType, val) + } + case model.RateLimitStat: + val, ok := metricsVal.(*model.RateLimitGauge) + if ok { + s.handleRateLimitGauge(metricsType, val) + } + case model.CircuitBreakStat: + val, ok := metricsVal.(*model.CircuitBreakGauge) + if ok { + s.handleCircuitBreakGauge(metricsType, val) + } + } + return nil +} + +func (s *PrometheusReporter) handleServiceGauge(metricsType model.MetricType, val *model.ServiceCallResult) { + labels := convertInsGaugeToLabels(val, s.clientIP) + total := s.metricVecCaches[MetricsNameUpstreamRequestTotal].(*prometheus.CounterVec) + total.With(labels).Inc() + + success := s.metricVecCaches[MetricsNameUpstreamRequestSuccess].(*prometheus.CounterVec) + if val.GetRetStatus() == model.RetSuccess { + success.With(labels).Inc() + } + + delay := val.GetDelay() + if delay != nil { + data := float64(delay.Milliseconds()) + + timeout := s.metricVecCaches[MetricsNameUpstreamRequestTimeout].(*prometheus.GaugeVec) + timeout.With(labels).Add(data) + + maxTimeout := s.metricVecCaches[MetricsNameUpstreamRequestMaxTimeout].(*addons.MaxGaugeVec) + maxTimeout.With(labels).Set(data) + + reqDelay := s.metricVecCaches[MetricsNameUpstreamRequestDelay].(*prometheus.HistogramVec) + reqDelay.With(labels).Observe(data) + } +} + +func (s *PrometheusReporter) handleRateLimitGauge(metricsType model.MetricType, val *model.RateLimitGauge) { + labels := convertRateLimitGaugeToLabels(val) + + total := s.metricVecCaches[MetricsNameRateLimitRequestTotal].(*prometheus.CounterVec) + total.With(labels).Inc() + + pass := s.metricVecCaches[MetricsNameRateLimitRequestPass].(*prometheus.CounterVec) + if val.Result == model.QuotaResultOk { + pass.With(labels).Inc() + } + + limit := s.metricVecCaches[MetricsNameRateLimitRequestLimit].(*prometheus.CounterVec) + if val.Result == model.QuotaResultLimited { + limit.With(labels).Inc() + } +} + +func (s *PrometheusReporter) handleCircuitBreakGauge(metricsType model.MetricType, val *model.CircuitBreakGauge) { + labels := convertCircuitBreakGaugeToLabels(val) + + open := s.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.GaugeVec) + + // 计算完之后的熔断状态 + status := val.GetCircuitBreakerStatus().GetStatus() + if status == model.Open { + open.With(labels).Inc() + } else { + open.With(labels).Dec() + } + + halfOpen := s.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.GaugeVec) + + if status == model.HalfOpen { + halfOpen.With(labels).Inc() + } else { + halfOpen.With(labels).Dec() + } +} + +func (s *PrometheusReporter) prepare() { + s.once.Do(func() { + switch s.cfg.Type { + case _metricsPush: + s.initPush() + default: + s.initPull() + } + }) +} + +func (s *PrometheusReporter) initPush() { + s.pushTicker = time.NewTicker(s.cfg.Interval) + go func() { + for range s.pushTicker.C { + if err := push. + New(s.cfg.Address, _defaultJobName). + Gatherer(s.registry). + Push(); err != nil { + log.GetBaseLogger().Errorf("push metrics to pushgateway fail: %s", err.Error()) + } + } + }() +} + +func (s *PrometheusReporter) initPull() { + if s.cfg.IP != "" { + s.bindIP = s.cfg.IP + } + + s.pullPort = int32(s.cfg.port) + if s.pullPort < 0 { + return + } + + ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.bindIP, s.pullPort)) + if err != nil { + log.GetBaseLogger().Errorf("start metrics http-server fail: %v", err) + s.pullPort = -1 + return + } + + s.ln = ln + s.pullPort = int32(ln.Addr().(*net.TCPAddr).Port) + + go func() { + handler := metricsHttpHandler{ + handler: promhttp.HandlerFor(s.registry, promhttp.HandlerOpts{}), + } + + log.GetBaseLogger().Infof("start metrics http-server address : %s", fmt.Sprintf("%s:%d", s.bindIP, s.pullPort)) + if err := http.Serve(ln, &handler); err != nil { + log.GetBaseLogger().Errorf("start metrics http-server fail : %s", err) + return + } + }() } // Info 插件信息. func (s *PrometheusReporter) Info() model.StatInfo { - if !s.handler.exportSuccess() { + if s.cfg.Type != _metricsPull { + return model.StatInfo{} + } + if s.pullPort <= 0 { return model.StatInfo{} } return model.StatInfo{ - Target: s.Name(), - Port: uint32(s.handler.port), + Target: PluginName, + Port: uint32(s.pullPort), Path: "/metrics", Protocol: "http", } @@ -109,24 +297,15 @@ func (s *PrometheusReporter) Destroy() error { return err } } - - if s.handler != nil { - if err := s.handler.Close(); err != nil { - return err + switch s.cfg.Type { + case _metricsPush: + if s.pushTicker != nil { + s.pushTicker.Stop() + } + default: + if s.ln != nil { + _ = s.ln.Close() } } - return nil } - -type metricsHttpHandler struct { - promeHttpHandler http.Handler - lock *sync.RWMutex -} - -// ServeHTTP 提供 prometheus http 服务. -func (p *metricsHttpHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - p.lock.RLock() - defer p.lock.RUnlock() - p.promeHttpHandler.ServeHTTP(writer, request) -} diff --git a/plugin/metrics/pushgateway/addons/max_gauge.go b/plugin/metrics/pushgateway/addons/max_gauge.go deleted file mode 100644 index b52856d9..00000000 --- a/plugin/metrics/pushgateway/addons/max_gauge.go +++ /dev/null @@ -1,200 +0,0 @@ -// Tencent is pleased to support the open source community by making polaris-go available. -// -// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. -// -// Licensed under the BSD 3-Clause License (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://opensource.org/licenses/BSD-3-Clause -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissionsr and limitations under the License. -// - -package addons - -import ( - "fmt" - "math" - "reflect" - "sync/atomic" - "time" - - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "google.golang.org/protobuf/proto" -) - -type MaxGauge interface { - prometheus.Gauge -} - -func NewMaxGauge(opts prometheus.GaugeOpts) MaxGauge { - desc := prometheus.NewDesc( - prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), - opts.Help, - nil, - opts.ConstLabels, - ) - - holder := reflect.ValueOf(desc) - result := &maxGauge{desc: desc, labelPairs: holder.FieldByName("constLabelPairs").Interface().([]*dto.LabelPair)} - result.init(result) - return result -} - -type maxGauge struct { - valBits uint64 - - selfCollector - - desc *prometheus.Desc - labelPairs []*dto.LabelPair -} - -func (g *maxGauge) Desc() *prometheus.Desc { - return g.desc -} - -func (g *maxGauge) Set(val float64) { - newVal := math.Float64bits(val) - for { - data := atomic.LoadUint64(&g.valBits) - saveVal := math.Float64frombits(data) - if saveVal >= val { - return - } - if atomic.CompareAndSwapUint64(&g.valBits, data, newVal) { - return - } - } -} - -func (g *maxGauge) SetToCurrentTime() { - g.Set(float64(time.Now().UnixNano()) / 1e9) -} - -func (g *maxGauge) Inc() { -} - -func (g *maxGauge) Dec() { -} - -func (g *maxGauge) Add(val float64) { -} - -func (g *maxGauge) Sub(val float64) { -} - -func (g *maxGauge) Write(out *dto.Metric) error { - val := math.Float64frombits(atomic.LoadUint64(&g.valBits)) - return populateMetric(prometheus.GaugeValue, val, g.labelPairs, nil, out) -} - -func populateMetric( - t prometheus.ValueType, - v float64, - labelPairs []*dto.LabelPair, - e *dto.Exemplar, - m *dto.Metric, -) error { - m.Label = labelPairs - switch t { - case prometheus.CounterValue: - m.Counter = &dto.Counter{Value: proto.Float64(v), Exemplar: e} - case prometheus.GaugeValue: - m.Gauge = &dto.Gauge{Value: proto.Float64(v)} - case prometheus.UntypedValue: - m.Untyped = &dto.Untyped{Value: proto.Float64(v)} - default: - return fmt.Errorf("encountered unknown type %v", t) - } - return nil -} - -type MaxGaugeVec struct { - *prometheus.MetricVec -} - -func NewMaxGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *MaxGaugeVec { - desc := prometheus.NewDesc( - prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), - opts.Help, - labelNames, - opts.ConstLabels, - ) - return &MaxGaugeVec{ - MetricVec: prometheus.NewMetricVec(desc, func(lvs ...string) prometheus.Metric { - result := &maxGauge{desc: desc, labelPairs: prometheus.MakeLabelPairs(desc, lvs)} - result.init(result) // Init self-collection. - return result - }), - } -} - -func (v *MaxGaugeVec) GetMetricWithLabelValues(lvs ...string) (MaxGauge, error) { - metric, err := v.MetricVec.GetMetricWithLabelValues(lvs...) - if metric != nil { - return metric.(MaxGauge), err - } - return nil, err -} - -func (v *MaxGaugeVec) GetMetricWith(labels prometheus.Labels) (MaxGauge, error) { - metric, err := v.MetricVec.GetMetricWith(labels) - if metric != nil { - return metric.(MaxGauge), err - } - return nil, err -} - -func (v *MaxGaugeVec) WithLabelValues(lvs ...string) MaxGauge { - g, err := v.GetMetricWithLabelValues(lvs...) - if err != nil { - panic(err) - } - return g -} - -func (v *MaxGaugeVec) With(labels prometheus.Labels) MaxGauge { - g, err := v.GetMetricWith(labels) - if err != nil { - panic(err) - } - return g -} - -func (v *MaxGaugeVec) CurryWith(labels prometheus.Labels) (*MaxGaugeVec, error) { - vec, err := v.MetricVec.CurryWith(labels) - if vec != nil { - return &MaxGaugeVec{vec}, err - } - return nil, err -} - -func (v *MaxGaugeVec) MustCurryWith(labels prometheus.Labels) *MaxGaugeVec { - vec, err := v.CurryWith(labels) - if err != nil { - panic(err) - } - return vec -} - -type selfCollector struct { - self prometheus.Metric -} - -func (c *selfCollector) init(self prometheus.Metric) { - c.self = self -} - -func (c *selfCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- c.self.Desc() -} - -func (c *selfCollector) Collect(ch chan<- prometheus.Metric) { - ch <- c.self -} diff --git a/plugin/metrics/pushgateway/addons/max_gauge_test.go b/plugin/metrics/pushgateway/addons/max_gauge_test.go deleted file mode 100644 index c70b9bca..00000000 --- a/plugin/metrics/pushgateway/addons/max_gauge_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Tencent is pleased to support the open source community by making polaris-go available. -// -// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. -// -// Licensed under the BSD 3-Clause License (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://opensource.org/licenses/BSD-3-Clause -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissionsr and limitations under the License. -// - -package addons - -import ( - "context" - "math" - "sync" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" - - "github.com/polarismesh/polaris-go/pkg/algorithm/rand" -) - -func Test_maxGauge_Set(t *testing.T) { - gauge := NewMaxGaugeVec(prometheus.GaugeOpts{ - Name: "Test_maxGauge_Set", - Help: "Test_maxGauge_Set", - }, []string{"key-1", "key-2"}) - - totalCnt := 10 - - chOne := make(chan float64, 1024) - chTwo := make(chan float64, 1024) - maxOne := uint64(0) - maxTwo := uint64(0) - - ctx, cancel := context.WithCancel(context.Background()) - go func(ctx context.Context) { - for { - select { - case ret := <-chOne: - if ret > float64(maxOne) { - maxOne = math.Float64bits(ret) - } - case ret := <-chTwo: - if ret > float64(maxTwo) { - maxTwo = math.Float64bits(ret) - } - case <-ctx.Done(): - return - } - } - }(ctx) - - wg := &sync.WaitGroup{} - wg.Add(totalCnt) - randPool := rand.NewScalableRand() - - for i := 0; i < totalCnt; i++ { - go func() { - defer wg.Done() - for p := 0; p < 1000; p++ { - val := randPool.Intn(math.MaxInt64) - gauge.With(map[string]string{ - "key-1": "1", - "key-2": "1", - }).Set(float64(val)) - chOne <- float64(val) - - val = randPool.Intn(math.MaxInt64) - gauge.With(map[string]string{ - "key-1": "2", - "key-2": "2", - }).Set(float64(val)) - chTwo <- float64(val) - } - }() - } - - wg.Wait() - time.Sleep(time.Second) - cancel() - - t.Logf("max one : %d", maxOne) - t.Logf("max two : %d", maxTwo) - - assert.Equal(t, maxOne, gauge.With(map[string]string{ - "key-1": "1", - "key-2": "1", - }).(*maxGauge).valBits) - - assert.Equal(t, maxTwo, gauge.With(map[string]string{ - "key-1": "2", - "key-2": "2", - }).(*maxGauge).valBits) -} diff --git a/plugin/metrics/pushgateway/config.go b/plugin/metrics/pushgateway/config.go deleted file mode 100644 index c7e6696e..00000000 --- a/plugin/metrics/pushgateway/config.go +++ /dev/null @@ -1,48 +0,0 @@ -// Tencent is pleased to support the open source community by making polaris-go available. -// -// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. -// -// Licensed under the BSD 3-Clause License (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://opensource.org/licenses/BSD-3-Clause -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissionsr and limitations under the License. -// - -package pushgateway - -import ( - "time" - - "github.com/polarismesh/polaris-go/pkg/plugin" -) - -func init() { - plugin.RegisterConfigurablePlugin(&PushgatewayReporter{}, &Config{}) -} - -// Config pushgateway 的配置 -type Config struct { - Address string `yaml:"address"` - PushInterval time.Duration `yaml:"pushInterval"` -} - -// Verify verify config -func (c *Config) Verify() error { - return nil -} - -// SetDefault Setting defaults -func (c *Config) SetDefault() { - if len(c.Address) == 0 { - c.Address = "127.0.0.1:9091" - } - if c.PushInterval == 0 { - c.PushInterval = 10 * time.Second - } -} diff --git a/plugin/metrics/pushgateway/model.go b/plugin/metrics/pushgateway/model.go deleted file mode 100644 index 1498a721..00000000 --- a/plugin/metrics/pushgateway/model.go +++ /dev/null @@ -1,325 +0,0 @@ -// Tencent is pleased to support the open source community by making polaris-go available. -// -// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. -// -// Licensed under the BSD 3-Clause License (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://opensource.org/licenses/BSD-3-Clause -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissionsr and limitations under the License. -// - -package pushgateway - -import ( - "fmt" - "strings" - - "github.com/polarismesh/polaris-go/pkg/model" -) - -// MetricsType 指标类型,对应 Prometheus 提供的 Collector 类型. -type MetricsType int - -const ( - // TypeForCounterVec metric type. - TypeForCounterVec MetricsType = iota - TypeForGaugeVec - TypeForGauge - TypeForHistogramVec - TypeForMaxGaugeVec -) - -// metricDesc 指标描述. -type metricDesc struct { - Name string - Help string - MetricType MetricsType - LabelNames []string -} - -const ( - // CalleeNamespace SystemMetricName. - CalleeNamespace = "callee_namespace" - CalleeService = "callee_service" - CalleeSubset = "callee_subset" - CalleeInstance = "callee_instance" - CalleeRetCode = "callee_result_code" - CalleeMethod = "callee_method" - CallerNamespace = "caller_namespace" - CallerService = "caller_service" - CallerIP = "caller_ip" - CallerLabels = "caller_labels" - MetricNameLabel = "metric_name" - - // MetricsNameUpstreamRequestTotal 与路由、请求相关的指标信息. - MetricsNameUpstreamRequestTotal = "upstream_rq_total" - MetricsNameUpstreamRequestSuccess = "upstream_rq_success" - MetricsNameUpstreamRequestTimeout = "upstream_rq_timeout" - MetricsNameUpstreamRequestMaxTimeout = "upstream_rq_max_timeout" - MetricsNameUpstreamRequestDelay = "upstream_rq_delay" - - // 限流相关指标信息. - MetricsNameRateLimitRequestTotal = "ratelimit_rq_total" - MetricsNameRateLimitRequestPass = "ratelimit_rq_pass" - MetricsNameRateLimitRequestLimit = "ratelimit_rq_limit" - - // 熔断相关指标信息. - MetricsNameCircuitBreakerOpen = "circuitbreaker_open" - MetricsNameCircuitBreakerHalfOpen = "circuitbreaker_halfopen" - - // SystemMetricValue. - NilValue = "__NULL__" -) - -// 服务路由相关指标. -var ( - RouterGaugeNames []string = []string{ - MetricsNameUpstreamRequestTotal, - MetricsNameUpstreamRequestSuccess, - MetricsNameUpstreamRequestTimeout, - MetricsNameUpstreamRequestMaxTimeout, - } - - UpstreamRequestTotal = metricDesc{ - Name: MetricsNameUpstreamRequestTotal, - Help: "total of request per period", - MetricType: TypeForCounterVec, - LabelNames: GetLabels(InstanceGaugeLabelOrder), - } - - UpstreamRequestSuccess = metricDesc{ - Name: MetricsNameUpstreamRequestSuccess, - Help: "total of success request per period", - MetricType: TypeForCounterVec, - LabelNames: GetLabels(InstanceGaugeLabelOrder), - } - - UpstreamRequestTimeout = metricDesc{ - Name: MetricsNameUpstreamRequestTimeout, - Help: "total of request delay per period", - MetricType: TypeForGaugeVec, - LabelNames: GetLabels(InstanceGaugeLabelOrder), - } - - UpstreamRequestMaxTimeout = metricDesc{ - Name: MetricsNameUpstreamRequestMaxTimeout, - Help: "maximum request delay per period", - MetricType: TypeForMaxGaugeVec, - LabelNames: GetLabels(InstanceGaugeLabelOrder), - } - - UpstreamRequestDelay = metricDesc{ - Name: MetricsNameUpstreamRequestDelay, - Help: "per request delay per period", - MetricType: TypeForHistogramVec, - LabelNames: GetLabels(InstanceGaugeLabelOrder), - } -) - -// 限流相关指标. -var ( - RateLimitGaugeNames []string = []string{ - MetricsNameRateLimitRequestTotal, - MetricsNameRateLimitRequestPass, - MetricsNameRateLimitRequestLimit, - } - - RateLimitRequestTotal = metricDesc{ - Name: MetricsNameRateLimitRequestTotal, - Help: "total of rate limit per period", - MetricType: TypeForCounterVec, - LabelNames: GetLabels(RateLimitGaugeLabelOrder), - } - - RateLimitRequestPass = metricDesc{ - Name: MetricsNameRateLimitRequestPass, - Help: "total of passed request per period", - MetricType: TypeForCounterVec, - LabelNames: GetLabels(RateLimitGaugeLabelOrder), - } - - RateLimitRequestLimit = metricDesc{ - Name: MetricsNameRateLimitRequestLimit, - Help: "total of limited request per period", - MetricType: TypeForCounterVec, - LabelNames: GetLabels(RateLimitGaugeLabelOrder), - } -) - -var ( - CircuitBreakerGaugeNames []string = []string{ - MetricsNameCircuitBreakerOpen, - MetricsNameCircuitBreakerHalfOpen, - } - - CircuitBreakerOpen = metricDesc{ - Name: MetricsNameCircuitBreakerOpen, - Help: "total of opened circuit breaker", - MetricType: TypeForGaugeVec, - LabelNames: GetLabels(CircuitBreakerGaugeLabelOrder), - } - - CircuitBreakerHalfOpen = metricDesc{ - Name: MetricsNameCircuitBreakerHalfOpen, - Help: "total of half-open circuit breaker", - MetricType: TypeForGaugeVec, - LabelNames: GetLabels(CircuitBreakerGaugeLabelOrder), - } -) - -var metrcisDesces map[string]metricDesc = map[string]metricDesc{ - MetricsNameUpstreamRequestTotal: UpstreamRequestTotal, - MetricsNameUpstreamRequestSuccess: UpstreamRequestSuccess, - MetricsNameUpstreamRequestTimeout: UpstreamRequestTimeout, - MetricsNameUpstreamRequestMaxTimeout: UpstreamRequestMaxTimeout, - MetricsNameUpstreamRequestDelay: UpstreamRequestDelay, - - MetricsNameRateLimitRequestTotal: RateLimitRequestTotal, - MetricsNameRateLimitRequestPass: RateLimitRequestPass, - MetricsNameRateLimitRequestLimit: RateLimitRequestLimit, - - MetricsNameCircuitBreakerOpen: CircuitBreakerOpen, - MetricsNameCircuitBreakerHalfOpen: CircuitBreakerHalfOpen, -} - -type LabelValueSupplier func(val interface{}) string - -func GetLabels(m map[string]LabelValueSupplier) []string { - labels := make([]string, 0, len(m)) - - for k := range m { - labels = append(labels, k) - } - - return labels -} - -var ( - // InstanceGaugeLabelOrder 实例监控指标的label顺序 - InstanceGaugeLabelOrder map[string]LabelValueSupplier = map[string]LabelValueSupplier{ - // 被调方相关信息 - CalleeNamespace: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - return val.GetCalledInstance().GetNamespace() - }, - CalleeService: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - return val.GetCalledInstance().GetService() - }, - CalleeSubset: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - return val.CalledInstance.GetLogicSet() - }, - CalleeInstance: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - return fmt.Sprintf("%s:%d", val.GetCalledInstance().GetHost(), val.GetCalledInstance().GetPort()) - }, - CalleeRetCode: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - if val.GetRetCode() == nil { - return NilValue - } - return fmt.Sprintf("%d", *val.GetRetCode()) - }, - - // 主调方相关信息 - CallerLabels: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - labels := val.SourceService.Metadata - var ret []string - for k, v := range labels { - ret = append(ret, fmt.Sprintf("%s=%s", k, v)) - } - return strings.Join(ret, ",") - }, - CallerNamespace: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - namespace := val.GetCallerNamespace() - if namespace != "" { - return namespace - } - return NilValue - }, - CallerService: func(args interface{}) string { - val := args.(*model.ServiceCallResult) - service := val.GetCallerService() - if service != "" { - return service - } - return NilValue - }, - CallerIP: func(args interface{}) string { - return NilValue - }, - } - - RateLimitGaugeLabelOrder map[string]LabelValueSupplier = map[string]LabelValueSupplier{ - CalleeNamespace: func(args interface{}) string { - val := args.(*model.RateLimitGauge) - return val.GetNamespace() - }, - CalleeService: func(args interface{}) string { - val := args.(*model.RateLimitGauge) - return val.GetService() - }, - CalleeMethod: func(args interface{}) string { - val := args.(*model.RateLimitGauge) - return val.Method - }, - CallerLabels: func(args interface{}) string { - val := args.(*model.RateLimitGauge) - return formatLabelsToStr(val.Arguments) - }, - } - - CircuitBreakerGaugeLabelOrder map[string]LabelValueSupplier = map[string]LabelValueSupplier{ - CalleeNamespace: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return val.GetCalledInstance().GetNamespace() - }, - CalleeService: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return val.GetCalledInstance().GetService() - }, - CalleeMethod: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return val.Method - }, - CalleeSubset: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return val.GetCalledInstance().GetLogicSet() - }, - CalleeInstance: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return fmt.Sprintf("%s:%d", val.GetCalledInstance().GetHost(), val.GetCalledInstance().GetPort()) - }, - CallerNamespace: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return val.GetNamespace() - }, - CallerService: func(args interface{}) string { - val := args.(*model.CircuitBreakGauge) - return val.GetService() - }, - } -) - -func formatLabelsToStr(arguments []model.Argument) string { - if len(arguments) == 0 { - return "" - } - - s := make([]string, 0, len(arguments)) - - for _, argument := range arguments { - s = append(s, argument.String()) - } - - return strings.Join(s, "|") -} diff --git a/plugin/metrics/pushgateway/pushgateway_handler.go b/plugin/metrics/pushgateway/pushgateway_handler.go deleted file mode 100644 index 8b9d499d..00000000 --- a/plugin/metrics/pushgateway/pushgateway_handler.go +++ /dev/null @@ -1,221 +0,0 @@ -// Tencent is pleased to support the open source community by making polaris-go available. -// -// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. -// -// Licensed under the BSD 3-Clause License (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://opensource.org/licenses/BSD-3-Clause -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissionsr and limitations under the License. -// - -package pushgateway - -import ( - "fmt" - "net/http" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/polarismesh/polaris-go/pkg/log" - "github.com/polarismesh/polaris-go/pkg/model" - "github.com/polarismesh/polaris-go/pkg/plugin" - "github.com/polarismesh/polaris-go/plugin/metrics/pushgateway/addons" -) - -// PushgatewayHandler handler for prometheus -type PushgatewayHandler struct { - // prometheus的metrics注册 - registry *prometheus.Registry - // metrics的 http handler - handler http.Handler - cfg *Config - metricVecCaches map[string]prometheus.Collector - bindIP string -} - -func newHandler(ctx *plugin.InitContext) (*PushgatewayHandler, error) { - p := &PushgatewayHandler{} - return p, p.init(ctx) -} - -func (p *PushgatewayHandler) init(ctx *plugin.InitContext) error { - cfgValue := ctx.Config.GetGlobal().GetStatReporter().GetPluginConfig(PluginName) - if cfgValue != nil { - p.cfg = cfgValue.(*Config) - } - p.bindIP = ctx.Config.GetGlobal().GetAPI().GetBindIP() - p.metricVecCaches = make(map[string]prometheus.Collector) - p.registry = prometheus.NewRegistry() - if err := p.registerMetrics(); err != nil { - return err - } - - return nil -} - -func (p *PushgatewayHandler) registerMetrics() error { - for _, desc := range metrcisDesces { - var collector prometheus.Collector - switch desc.MetricType { - case TypeForGaugeVec: - collector = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - case TypeForCounterVec: - collector = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - case TypeForMaxGaugeVec: - collector = addons.NewMaxGaugeVec(prometheus.GaugeOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - case TypeForHistogramVec: - collector = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: desc.Name, - Help: desc.Help, - }, desc.LabelNames) - } - - err := p.registry.Register(collector) - if err != nil { - log.GetBaseLogger().Errorf("register prometheus collector error, %v", err) - return err - } - if _, ok := p.metricVecCaches[desc.Name]; ok { - log.GetBaseLogger().Errorf("register prometheus collector duplicate, %s", desc.Name) - return fmt.Errorf("register prometheus collector duplicate, %s", desc.Name) - } - p.metricVecCaches[desc.Name] = collector - } - return nil -} - -// ReportStat 上报采集指标到 prometheus,这里只针对部分 model.InstanceGauge 的实现做处理 -func (p *PushgatewayHandler) ReportStat(metricsType model.MetricType, metricsVal model.InstanceGauge) error { - switch metricsType { - case model.ServiceStat: - val, ok := metricsVal.(*model.ServiceCallResult) - if ok { - p.handleServiceGauge(metricsType, val) - } - case model.RateLimitStat: - val, ok := metricsVal.(*model.RateLimitGauge) - if ok { - p.handleRateLimitGauge(metricsType, val) - } - case model.CircuitBreakStat: - val, ok := metricsVal.(*model.CircuitBreakGauge) - if ok { - p.handleCircuitBreakGauge(metricsType, val) - } - } - return nil -} - -func (p *PushgatewayHandler) handleServiceGauge(metricsType model.MetricType, val *model.ServiceCallResult) { - labels := p.convertInsGaugeToLabels(val) - - total := p.metricVecCaches[MetricsNameUpstreamRequestTotal].(*prometheus.CounterVec) - total.With(labels).Inc() - - success := p.metricVecCaches[MetricsNameUpstreamRequestSuccess].(*prometheus.CounterVec) - if val.GetRetStatus() == model.RetSuccess { - success.With(labels).Inc() - } - - delay := val.GetDelay() - if delay != nil { - data := float64(delay.Milliseconds()) - - timeout := p.metricVecCaches[MetricsNameUpstreamRequestTimeout].(*prometheus.GaugeVec) - timeout.With(labels).Add(data) - - maxTimeout := p.metricVecCaches[MetricsNameUpstreamRequestMaxTimeout].(*addons.MaxGaugeVec) - maxTimeout.With(labels).Set(data) - - reqDelay := p.metricVecCaches[MetricsNameUpstreamRequestDelay].(*prometheus.HistogramVec) - reqDelay.With(labels).Observe(data) - } -} - -func (p *PushgatewayHandler) handleRateLimitGauge(metricsType model.MetricType, val *model.RateLimitGauge) { - labels := p.convertRateLimitGaugeToLabels(val) - - total := p.metricVecCaches[MetricsNameRateLimitRequestTotal].(*prometheus.CounterVec) - total.With(labels).Inc() - - pass := p.metricVecCaches[MetricsNameRateLimitRequestPass].(*prometheus.CounterVec) - if val.Result == model.QuotaResultOk { - pass.With(labels).Inc() - } - - limit := p.metricVecCaches[MetricsNameRateLimitRequestLimit].(*prometheus.CounterVec) - if val.Result == model.QuotaResultLimited { - limit.With(labels).Inc() - } -} - -func (p *PushgatewayHandler) handleCircuitBreakGauge(metricsType model.MetricType, val *model.CircuitBreakGauge) { - labels := p.convertCircuitBreakGaugeToLabels(val) - - open := p.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.GaugeVec) - - // 计算完之后的熔断状态 - status := val.GetCircuitBreakerStatus().GetStatus() - if status == model.Open { - open.With(labels).Inc() - } else { - open.With(labels).Dec() - } - - halfOpen := p.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.GaugeVec) - - if status == model.HalfOpen { - halfOpen.With(labels).Inc() - } else { - halfOpen.With(labels).Dec() - } -} - -func (p *PushgatewayHandler) convertInsGaugeToLabels(val *model.ServiceCallResult) map[string]string { - labels := make(map[string]string) - - for label, supplier := range InstanceGaugeLabelOrder { - labels[label] = supplier(val) - } - - labels[CallerIP] = p.bindIP - return labels -} - -func (p *PushgatewayHandler) convertRateLimitGaugeToLabels(val *model.RateLimitGauge) map[string]string { - labels := make(map[string]string) - - for label, supplier := range RateLimitGaugeLabelOrder { - labels[label] = supplier(val) - } - return labels -} - -func (p *PushgatewayHandler) convertCircuitBreakGaugeToLabels(val *model.CircuitBreakGauge) map[string]string { - labels := make(map[string]string) - - for label, supplier := range CircuitBreakerGaugeLabelOrder { - labels[label] = supplier(val) - } - return labels -} - -// Close the prometheus handler -func (p *PushgatewayHandler) Close() error { - return nil -} diff --git a/plugin/metrics/pushgateway/pushgateway_reporter.go b/plugin/metrics/pushgateway/pushgateway_reporter.go deleted file mode 100644 index 8326d5bf..00000000 --- a/plugin/metrics/pushgateway/pushgateway_reporter.go +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Tencent is pleased to support the open source community by making polaris-go available. - * - * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package pushgateway - -import ( - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus/push" - - "github.com/polarismesh/polaris-go/pkg/log" - "github.com/polarismesh/polaris-go/pkg/model" - "github.com/polarismesh/polaris-go/pkg/plugin" - "github.com/polarismesh/polaris-go/pkg/plugin/common" - statreporter "github.com/polarismesh/polaris-go/pkg/plugin/metrics" -) - -const ( - // PluginName is the name of the plugin. - PluginName = "pushgateway" - _defaultJobName = "polaris-client" -) - -var _ statreporter.StatReporter = (*PushgatewayReporter)(nil) - -// init 注册插件. -func init() { - plugin.RegisterPlugin(&PushgatewayReporter{}) -} - -// PushgatewayReporter is a prometheus reporter. -type PushgatewayReporter struct { - *plugin.PluginBase - *common.RunContext - // 本插件的配置 - cfg *Config - // 全局上下文 - globalCtx model.ValueContext - // sdk加载的插件 - sdkPlugins string - // 插件工厂 - plugins plugin.Supplier - // prometheus的metrics注册 - handler *PushgatewayHandler - ticker *time.Ticker - once sync.Once -} - -// Type 插件类型. -func (s *PushgatewayReporter) Type() common.Type { - return common.TypeStatReporter -} - -// Name 插件名,一个类型下插件名唯一. -func (s *PushgatewayReporter) Name() string { - return PluginName -} - -// Init 初始化插件. -func (s *PushgatewayReporter) Init(ctx *plugin.InitContext) error { - s.RunContext = common.NewRunContext() - s.globalCtx = ctx.ValueCtx - s.plugins = ctx.Plugins - s.PluginBase = plugin.NewPluginBase(ctx) - handler, err := newHandler(ctx) - if err != nil { - return err - } - s.handler = handler - cfgValue := ctx.Config.GetGlobal().GetStatReporter().GetPluginConfig(PluginName) - s.cfg = cfgValue.(*Config) - return nil -} - -// ReportStat 报告统计数据. -func (s *PushgatewayReporter) ReportStat(metricType model.MetricType, metricsVal model.InstanceGauge) error { - s.once.Do(func() { - s.runPushMetrics(s.cfg.PushInterval) - }) - return s.handler.ReportStat(metricType, metricsVal) -} - -// Info 插件信息. -func (s *PushgatewayReporter) Info() model.StatInfo { - return model.StatInfo{} -} - -// Destroy .销毁插件. -func (s *PushgatewayReporter) Destroy() error { - if s.PluginBase != nil { - if err := s.PluginBase.Destroy(); err != nil { - return err - } - } - if s.RunContext != nil { - if err := s.RunContext.Destroy(); err != nil { - return err - } - } - if s.ticker != nil { - s.ticker.Stop() - } - - if s.handler != nil { - if err := s.handler.Close(); err != nil { - return err - } - } - - return nil -} - -// runPushMetrics 指标推送 -func (s *PushgatewayReporter) runPushMetrics(interval time.Duration) { - s.ticker = time.NewTicker(interval) - go func() { - for range s.ticker.C { - if err := push. - New(s.cfg.Address, _defaultJobName). - Gatherer(s.handler.registry). - Push(); err != nil { - log.GetBaseLogger().Errorf("push metrics to pushgateway fail: %s", err.Error()) - } - } - }() -} diff --git a/polaris.yaml b/polaris.yaml index 4d143532..ec05ea83 100644 --- a/polaris.yaml +++ b/polaris.yaml @@ -109,27 +109,32 @@ global: #描述:统计上报插件配置 plugin: prometheus: - #描述: 设置 prometheus http-server 的监听IP + #描述: 设置 prometheus 指标上报模式 #类型:string + #默认值:pull + #范围:pull|push + type: pull + #描述: 设置 prometheus http-server 的监听IP, 仅 type == pull 时生效 + #类型:string + #默认值: ${global.api.bindIP} #默认使用SDK的绑定IP metricHost: - #描述: 设置 prometheus http-server 的监听端口 + #描述: 设置 prometheus http-server 的监听端口, 仅 type == pull 时生效 #类型:int #默认值: 28080 #如果设置为负数,则不会开启默认的http-server #如果设置为0,则随机选择一个可用端口进行启动 http-server metricPort: 28080 - # pushgateway: - # #描述: 设置 pushgateway 的地址 - # #类型:string - # #默认 127.0.0.1:9091 - # address: 127.0.0.1:9091 - # #描述:设置metric数据推送到pushgateway的执行周期 - # #类型:string - # #格式:^\d+(ms|s|m|h)$ - # #范围:[1m:...] - # #默认值:10m - # pushInterval: 10s + # #描述: 设置 pushgateway 的地址, 仅 type == push 时生效 + # #类型:string + # #默认 ${global.serverConnector.addresses[0]}:9091 + # address: 127.0.0.1:9091 + # #描述:设置metric数据推送到pushgateway的执行周期, 仅 type == push 时生效 + # #类型:string + # #格式:^\d+(ms|s|m|h)$ + # #范围:[1m:...] + # #默认值:10m + # pushInterval: 10s # 地址提供插件,用于获取当前SDK所在的地域信息 # location: # providers: