diff --git a/src/kafka/consumer_group.go b/src/kafka/consumer_group.go index 1cade71..6ed004e 100644 --- a/src/kafka/consumer_group.go +++ b/src/kafka/consumer_group.go @@ -14,15 +14,10 @@ import ( "gorm.io/gorm" ) -//////////////////// -// Group Consumer // -//////////////////// func (k *kafkaTopicConsumer) consumeGroup(group string) { saramaConfig := sarama.NewConfig() - //////////////////// - // Wait for topic // - //////////////////// + // Wait for topic admin, err := getAdmin(k.brokerURL, saramaConfig) if err != nil { zap.S().Fatal("KAFKA ADMIN ERROR: ", err.Error()) @@ -57,10 +52,7 @@ func (k *kafkaTopicConsumer) consumeGroup(group string) { time.Sleep(1 * time.Second) } - /////////////////////////// - // Consumer Group Config // - /////////////////////////// - + // Consumer Group Config // Version version, err := sarama.ParseKafkaVersion("2.1.1") if err != nil { @@ -173,6 +165,9 @@ func (c *ClaimConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sar topicName := claim.Topic() partition := uint64(claim.Partition()) + // Counter for displaying logs + var msgLogCounter int = 0 + // find kafka job var kafkaJob *models.KafkaJob = nil for i, k := range c.kafkaJobs { @@ -200,7 +195,12 @@ func (c *ClaimConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sar return nil } - zap.S().Info("GROUP=", c.group, ",TOPIC=", topicName, ",PARTITION=", partition, ",OFFSET=", topicMsg.Offset, " - New message") + if msgLogCounter > config.Config.LogMsgCount-1 { + zap.S().Info("GROUP=", c.group, ",TOPIC=", topicName, ",PARTITION=", partition, ",OFFSET=", topicMsg.Offset, " - New message") + msgLogCounter = 0 + } + msgLogCounter++ + sess.MarkMessage(topicMsg, "") // Broadcast