Skip to content

Commit

Permalink
feat(outputs.kafka): Option to add metric name as record header
Browse files Browse the repository at this point in the history
  • Loading branch information
Mrflatt committed Aug 12, 2024
1 parent 9df4800 commit 1616e8e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
3 changes: 3 additions & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ to use them.
## * now: Uses the time of write
# producer_timestamp = metric

## Add metric name as specified kafka header if not empty
# metric_name_header = ""

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down
10 changes: 10 additions & 0 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Kafka struct {
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
ProducerTimestamp string `toml:"producer_timestamp"`
MetricNameHeader string `toml:"metric_name_header"`
Log telegraf.Logger `toml:"-"`
proxy.Socks5ProxyConfig
kafka.WriteConfig
Expand Down Expand Up @@ -209,6 +210,15 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
Value: sarama.ByteEncoder(buf),
}

if k.MetricNameHeader != "" {
m.Headers = []sarama.RecordHeader{
{
Key: []byte(k.MetricNameHeader),
Value: []byte(metric.Name()),
},
}
}

// Negative timestamps are not allowed by the Kafka protocol.
if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) {
m.Timestamp = metric.Time()
Expand Down
3 changes: 3 additions & 0 deletions plugins/outputs/kafka/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
## * now: Uses the time of write
# producer_timestamp = metric

## Add metric name as specified kafka header if not empty
# metric_name_header = ""

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down

0 comments on commit 1616e8e

Please sign in to comment.