From 85c2d57b2f0af661ed5756af5abaa67207c29d7b Mon Sep 17 00:00:00 2001 From: berruk Date: Mon, 30 Dec 2024 14:00:01 +0300 Subject: [PATCH] atomic prometheus metrics --- collector.go | 6 +++--- consumer.go | 6 +++--- consumer_base.go | 2 +- metric.go | 18 +++++++++++++++--- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/collector.go b/collector.go index 2393e77..d7d5370 100644 --- a/collector.go +++ b/collector.go @@ -55,21 +55,21 @@ func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( s.totalProcessedMessagesCounter, prometheus.CounterValue, - float64(s.consumerMetric.TotalProcessedMessagesCounter), + float64(s.consumerMetric.totalProcessedMessagesCounter), emptyStringList..., ) ch <- prometheus.MustNewConstMetric( s.totalUnprocessedMessagesCounter, prometheus.CounterValue, - float64(s.consumerMetric.TotalUnprocessedMessagesCounter), + float64(s.consumerMetric.totalUnprocessedMessagesCounter), emptyStringList..., ) ch <- prometheus.MustNewConstMetric( s.totalErrorCountDuringFetchingMessage, prometheus.CounterValue, - float64(s.consumerMetric.TotalErrorCountDuringFetchingMessage), + float64(s.consumerMetric.totalErrorCountDuringFetchingMessage), emptyStringList..., ) } diff --git a/consumer.go b/consumer.go index 257c5b8..c991e17 100644 --- a/consumer.go +++ b/consumer.go @@ -166,10 +166,10 @@ func (c *consumer) process(message *Message) { // Try to process same message again if consumeErr = c.consumeFn(message); consumeErr != nil { c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic) - c.metric.TotalUnprocessedMessagesCounter++ + c.metric.IncrementTotalUnprocessedMessagesCounter() } } else { - c.metric.TotalUnprocessedMessagesCounter++ + c.metric.IncrementTotalUnprocessedMessagesCounter() } } @@ -185,7 +185,7 @@ func (c *consumer) process(message *Message) { } if consumeErr == nil { - c.metric.TotalProcessedMessagesCounter++ + c.metric.IncrementTotalProcessedMessagesCounter() } } diff --git a/consumer_base.go b/consumer_base.go index 9ff590d..cef76dc 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -202,7 +202,7 @@ func (c *base) startConsume() { continue } - c.metric.TotalErrorCountDuringFetchingMessage++ + c.metric.IncrementTotalErrorCountDuringFetchingMessage() //nolint:lll c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID) continue diff --git a/metric.go b/metric.go index 104bbee..d41d8d4 100644 --- a/metric.go +++ b/metric.go @@ -1,7 +1,19 @@ package kafka type ConsumerMetric struct { - TotalUnprocessedMessagesCounter int64 - TotalProcessedMessagesCounter int64 - TotalErrorCountDuringFetchingMessage int64 + totalUnprocessedMessagesCounter int64 + totalProcessedMessagesCounter int64 + totalErrorCountDuringFetchingMessage int64 +} + +func (cm *ConsumerMetric) IncrementTotalUnprocessedMessagesCounter() { + cm.totalUnprocessedMessagesCounter++ +} + +func (cm *ConsumerMetric) IncrementTotalProcessedMessagesCounter() { + cm.totalProcessedMessagesCounter++ +} + +func (cm *ConsumerMetric) IncrementTotalErrorCountDuringFetchingMessage() { + cm.totalErrorCountDuringFetchingMessage++ }