Skip to content

Commit

Permalink
fixes Telefonica#75: allow labels to be hashed into topic partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-garcia committed Aug 27, 2021
1 parent 8722897 commit 60368ed
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

/vendor
/prometheus-kafka-adapter
28 changes: 25 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ package main

import (
"fmt"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"gopkg.in/yaml.v2"
"os"
"strings"
"text/template"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"gopkg.in/yaml.v2"

"github.com/sirupsen/logrus"
)

var (
kafkaBrokerList = "kafka:9092"
kafkaTopic = "metrics"
kafkaPartitionLabels []string
kafkaMetadataTimeout = time.Second * 10
kafkaMetadataInterval = time.Minute * 5
topicTemplate *template.Template
match = make(map[string]*dto.MetricFamily, 0)
basicauth = false
Expand Down Expand Up @@ -63,6 +68,23 @@ func init() {
kafkaTopic = value
}

if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" {
kafkaPartitionLabels = strings.Split(value, ",")
}

if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" {
d, err := time.ParseDuration(value)
if err != nil {
logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default")
} else {
if d < 0 {
logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout")
} else {
kafkaMetadataTimeout = d
}
}
}

if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" {
basicauth = true
basicauthUsername = value
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/confluentinc/confluent-kafka-go v1.6.1
github.com/confluentinc/confluent-kafka-go v1.7.0
github.com/gin-gonic/contrib v0.0.0-20191209060500-d6e26eeaa607
github.com/gin-gonic/gin v1.6.3
github.com/gogo/protobuf v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/confluentinc/confluent-kafka-go v1.6.1 h1:YxM/UtMQ2vgJX2gIgeJFUD0ANQYTEvfo4Cs4qKUlmGE=
github.com/confluentinc/confluent-kafka-go v1.6.1/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
33 changes: 29 additions & 4 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package main

import (
"fmt"
"hash/fnv"
"io/ioutil"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -61,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
return
}

for topic, metrics := range metricsPerTopic {
t := topic
for topicAndHashKey, metrics := range metricsPerTopic {

topic, partitionID, err := getPartitionAndTopic(topicAndHashKey)
if err != nil {
continue
}

part := kafka.TopicPartition{
Partition: kafka.PartitionAny,
Topic: &t,
Partition: partitionID,
Topic: &topic,
}
for _, metric := range metrics {
err := producer.Produce(&kafka.Message{
Expand All @@ -83,3 +91,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin

}
}

func getPartitionAndTopic(topic string) (string, int32, error) {
parts := strings.Split(topic, "|")

if len(parts) == 1 {
return parts[0], kafka.PartitionAny, nil
}
h := fnv.New32a()
h.Write([]byte(parts[1]))

v, ok := topicPartitionCount.Load(parts[0])
if !ok {
logrus.WithField("topic", parts[0]).Error("did not find metadata requested topic")
return topic, kafka.PartitionAny, fmt.Errorf("could not")
}
return parts[0], int32(h.Sum32() % uint32(v.(int))), nil
}
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -28,6 +29,9 @@ import (
func main() {
log.Info("creating kafka producer")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kafkaConfig := kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
Expand Down Expand Up @@ -69,6 +73,12 @@ func main() {
logrus.WithError(err).Fatal("couldn't create kafka producer")
}

if kafkaPartitionLabels != nil {
if err := syncTopicMetadata(ctx, producer); err != nil {
logrus.WithError(err).Fatal("couldn't fetch topic metadata")
}
}

r := gin.New()

r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())
Expand Down
14 changes: 13 additions & 1 deletion serializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) {
}

func topic(labels map[string]string) string {
var buf bytes.Buffer
var buf, buf2 bytes.Buffer
if err := topicTemplate.Execute(&buf, labels); err != nil {
return ""
}
for _, s := range kafkaPartitionLabels {
v, ok := labels[s]
if ok {
if _, err := buf2.WriteString(v); err != nil {
return ""
}
}
}
if buf2.Len() > 0 {
buf.WriteString("|")
buf.WriteString(buf2.String())
}
return buf.String()
}

Expand Down
47 changes: 47 additions & 0 deletions topic_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"context"
"math"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/sirupsen/logrus"
)

var topicPartitionCount sync.Map

type metaDataFetcher interface {
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
}

func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error {

if err := processMetadata(producer); err != nil {
return err
}
go func() {
select {
case <-ctx.Done():
return

case <-time.After(kafkaMetadataInterval):
if err := processMetadata(producer); err != nil {
logrus.WithError(err).Error("could not fetch topic metadata")
}
}
}()
return nil
}

func processMetadata(producer metaDataFetcher) error {
metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds())))
if err != nil {
return err
}
for name, topic := range metadata.Topics {
topicPartitionCount.Store(name, len(topic.Partitions))
}
return nil
}

0 comments on commit 60368ed

Please sign in to comment.