Skip to content

Commit

Permalink
chore: limit logging for consumergroup
Browse files Browse the repository at this point in the history
  • Loading branch information
robcxyz committed Aug 17, 2022
1 parent b836d6f commit 74756fd
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions src/kafka/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@ import (
"gorm.io/gorm"
)

////////////////////
// Group Consumer //
////////////////////
func (k *kafkaTopicConsumer) consumeGroup(group string) {
saramaConfig := sarama.NewConfig()

////////////////////
// Wait for topic //
////////////////////
// Wait for topic
admin, err := getAdmin(k.brokerURL, saramaConfig)
if err != nil {
zap.S().Fatal("KAFKA ADMIN ERROR: ", err.Error())
Expand Down Expand Up @@ -57,10 +52,7 @@ func (k *kafkaTopicConsumer) consumeGroup(group string) {
time.Sleep(1 * time.Second)
}

///////////////////////////
// Consumer Group Config //
///////////////////////////

// Consumer Group Config
// Version
version, err := sarama.ParseKafkaVersion("2.1.1")
if err != nil {
Expand Down Expand Up @@ -173,6 +165,9 @@ func (c *ClaimConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sar
topicName := claim.Topic()
partition := uint64(claim.Partition())

// Counter for displaying logs
var msgLogCounter int = 0

// find kafka job
var kafkaJob *models.KafkaJob = nil
for i, k := range c.kafkaJobs {
Expand Down Expand Up @@ -200,7 +195,12 @@ func (c *ClaimConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sar
return nil
}

zap.S().Info("GROUP=", c.group, ",TOPIC=", topicName, ",PARTITION=", partition, ",OFFSET=", topicMsg.Offset, " - New message")
if msgLogCounter > config.Config.LogMsgCount-1 {
zap.S().Info("GROUP=", c.group, ",TOPIC=", topicName, ",PARTITION=", partition, ",OFFSET=", topicMsg.Offset, " - New message")
msgLogCounter = 0
}
msgLogCounter++

sess.MarkMessage(topicMsg, "")

// Broadcast
Expand Down

0 comments on commit 74756fd

Please sign in to comment.