From 9a5812f3e98afb991102fc994d52c91f2d051aec Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Wed, 17 Jul 2024 23:23:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:push=E4=B8=8A=E6=8A=A5=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=8A=A8=E6=B3=A8=E9=94=80=20(#210)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/metrics/prometheus/reporter.go | 29 +++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/plugin/metrics/prometheus/reporter.go b/plugin/metrics/prometheus/reporter.go index 193121c3..fb047ff1 100644 --- a/plugin/metrics/prometheus/reporter.go +++ b/plugin/metrics/prometheus/reporter.go @@ -41,10 +41,11 @@ import ( const ( // PluginName is the name of the plugin. - PluginName = "prometheus" - _metricsPull = "pull" - _metricsPush = "push" - _defaultJobName = "polaris-client" + PluginName = "prometheus" + _metricsPull = "pull" + _metricsPush = "push" + _defaultJobName = "polaris-client" + _defaultJobInstance = "instance" ) var _ statreporter.StatReporter = (*PrometheusReporter)(nil) @@ -232,6 +233,9 @@ func (s *PrometheusReporter) Destroy() error { if s.cancel != nil { s.cancel() } + if s.action != nil { + s.action.Close() + } return nil } @@ -251,6 +255,7 @@ type ReportAction interface { Init(initCtx *plugin.InitContext, reporter *PrometheusReporter) Run(ctx context.Context) Info() model.StatInfo + Close() } type PullAction struct { @@ -277,6 +282,9 @@ func (pa *PullAction) Init(initCtx *plugin.InitContext, reporter *PrometheusRepo pa.bindPort = int32(pa.cfg.port) } +func (pa *PullAction) Close() { +} + func (pa *PullAction) doAggregation(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) @@ -351,6 +359,7 @@ type PushAction struct { initCtx *plugin.InitContext reporter *PrometheusReporter cfg *Config + pusher *push.Pusher } func (pa *PushAction) Init(initCtx *plugin.InitContext, reporter *PrometheusReporter) { @@ -359,6 +368,15 @@ func (pa *PushAction) Init(initCtx *plugin.InitContext, reporter *PrometheusRepo return } pa.cfg = cfgValue.(*Config) + pa.pusher = push. + New(pa.cfg.Address, _defaultJobName). + Grouping(_defaultJobInstance, pa.initCtx.SDKContextID) +} + +func (pa *PushAction) Close() { + if pa.pusher != nil { + pa.pusher.Delete() + } } func (pa *PushAction) Run(ctx context.Context) { @@ -380,8 +398,7 @@ func (pa *PushAction) Run(ctx context.Context) { statcommon.PutDataFromContainerInOrder(pa.reporter.metricVecCaches, pa.reporter.rateLimitCollector, pa.reporter.rateLimitCollector.GetCurrentRevision()) - if err := push. - New(pa.cfg.Address, _defaultJobName). + if err := pa.pusher. Gatherer(pa.reporter.registry). Push(); err != nil { log.GetBaseLogger().Errorf("push metrics to pushgateway fail: %s", err.Error())