Skip to content

Commit

Permalink
Fix latency metrics for async kafka producer
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Feb 28, 2025
1 parent 5a5e8de commit 7c92965
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pkg/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes
var errResult error

if kt.asyncProducer != nil {
// Not adding request latency metric to async producer for now, since it would complicate the implementation, and delay our debug.

requestStarted := time.Now().UTC()
for _, msg := range safeMessages {
kt.asyncProducer.Input() <- &sarama.ProducerMessage{
Topic: kt.topicName,
Expand All @@ -243,6 +244,8 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes
if originalMessage.AckFunc != nil {
originalMessage.AckFunc()
}
originalMessage.TimeRequestStarted = requestStarted
originalMessage.TimeRequestFinished = time.Now().UTC()
sent = append(sent, originalMessage)
}
}
Expand Down

0 comments on commit 7c92965

Please sign in to comment.