diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 1670bd557c6ad..17ea03db8a01f 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -127,6 +127,9 @@ to use them. ## * now: Uses the time of write # producer_timestamp = metric + ## Add metric name as specified kafka header if not empty + # metric_name_header = "" + ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 0fc430dba842f..13e3de3b831ea 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -39,6 +39,7 @@ type Kafka struct { RoutingTag string `toml:"routing_tag"` RoutingKey string `toml:"routing_key"` ProducerTimestamp string `toml:"producer_timestamp"` + MetricNameHeader string `toml:"metric_name_header"` Log telegraf.Logger `toml:"-"` proxy.Socks5ProxyConfig kafka.WriteConfig @@ -209,6 +210,15 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { Value: sarama.ByteEncoder(buf), } + if k.MetricNameHeader != "" { + m.Headers = []sarama.RecordHeader{ + { + Key: []byte(k.MetricNameHeader), + Value: []byte(metric.Name()), + }, + } + } + // Negative timestamps are not allowed by the Kafka protocol. if k.ProducerTimestamp == "metric" && !metric.Time().Before(zeroTime) { m.Timestamp = metric.Time() diff --git a/plugins/outputs/kafka/sample.conf b/plugins/outputs/kafka/sample.conf index 56916a95f2f92..5474df18639e5 100644 --- a/plugins/outputs/kafka/sample.conf +++ b/plugins/outputs/kafka/sample.conf @@ -87,6 +87,9 @@ ## * now: Uses the time of write # producer_timestamp = metric + ## Add metric name as specified kafka header if not empty + # metric_name_header = "" + ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem"