Skip to content

Commit

Permalink
Revert "refactor: change prometheus primary metrics as atomic ones (#159
Browse files Browse the repository at this point in the history
)"

This reverts commit 50f4fcc.
  • Loading branch information
Abdulsametileri authored Jan 1, 2025
1 parent 50f4fcc commit 4c476eb
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 22 deletions.
6 changes: 3 additions & 3 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
s.totalProcessedMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.totalProcessedMessagesCounter),
float64(s.consumerMetric.TotalProcessedMessagesCounter),
emptyStringList...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.totalUnprocessedMessagesCounter),
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
emptyStringList...,
)

ch <- prometheus.MustNewConstMetric(
s.totalErrorCountDuringFetchingMessage,
prometheus.CounterValue,
float64(s.consumerMetric.totalErrorCountDuringFetchingMessage),
float64(s.consumerMetric.TotalErrorCountDuringFetchingMessage),
emptyStringList...,
)
}
Expand Down
6 changes: 3 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ func (c *consumer) process(message *Message) {
// Try to process same message again
if consumeErr = c.consumeFn(message); consumeErr != nil {
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)
c.metric.IncrementTotalUnprocessedMessagesCounter()
c.metric.TotalUnprocessedMessagesCounter++
}
} else {
c.metric.IncrementTotalUnprocessedMessagesCounter()
c.metric.TotalUnprocessedMessagesCounter++
}
}

Expand All @@ -185,7 +185,7 @@ func (c *consumer) process(message *Message) {
}

if consumeErr == nil {
c.metric.IncrementTotalProcessedMessagesCounter()
c.metric.TotalProcessedMessagesCounter++
}
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *base) startConsume() {
continue
}

c.metric.IncrementTotalErrorCountDuringFetchingMessage()
c.metric.TotalErrorCountDuringFetchingMessage++
//nolint:lll
c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID)
continue
Expand Down
18 changes: 3 additions & 15 deletions metric.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,7 @@
package kafka

type ConsumerMetric struct {
totalUnprocessedMessagesCounter int64
totalProcessedMessagesCounter int64
totalErrorCountDuringFetchingMessage int64
}

func (cm *ConsumerMetric) IncrementTotalUnprocessedMessagesCounter() {
cm.totalUnprocessedMessagesCounter++
}

func (cm *ConsumerMetric) IncrementTotalProcessedMessagesCounter() {
cm.totalProcessedMessagesCounter++
}

func (cm *ConsumerMetric) IncrementTotalErrorCountDuringFetchingMessage() {
cm.totalErrorCountDuringFetchingMessage++
TotalUnprocessedMessagesCounter int64
TotalProcessedMessagesCounter int64
TotalErrorCountDuringFetchingMessage int64
}

0 comments on commit 4c476eb

Please sign in to comment.