Skip to content

Commit

Permalink
fixes Telefonica#75: allow labels to be hashed into topic partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-garcia authored and kd7lxl committed Mar 6, 2023
1 parent 6a1dbf5 commit f52e2d2
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 8 deletions.
28 changes: 25 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ package main

import (
"fmt"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"gopkg.in/yaml.v2"
"os"
"strings"
"text/template"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"gopkg.in/yaml.v2"

"github.com/sirupsen/logrus"
)

var (
kafkaBrokerList = "kafka:9092"
kafkaTopic = "metrics"
kafkaPartitionLabels []string
kafkaMetadataTimeout = time.Second * 10
kafkaMetadataInterval = time.Minute * 5
topicTemplate *template.Template
match = make(map[string]*dto.MetricFamily, 0)
basicauth = false
Expand Down Expand Up @@ -63,6 +68,23 @@ func init() {
kafkaTopic = value
}

if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" {
kafkaPartitionLabels = strings.Split(value, ",")
}

if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" {
d, err := time.ParseDuration(value)
if err != nil {
logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default")
} else {
if d < 0 {
logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout")
} else {
kafkaMetadataTimeout = d
}
}
}

if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" {
basicauth = true
basicauthUsername = value
Expand Down
32 changes: 28 additions & 4 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package main

import (
"fmt"
"hash/fnv"
"io/ioutil"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -62,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
return
}

for topic, metrics := range metricsPerTopic {
t := topic
for topicAndHashKey, metrics := range metricsPerTopic {

topic, partitionID, err := getPartitionAndTopic(topicAndHashKey)
if err != nil {
continue
}

part := kafka.TopicPartition{
Partition: kafka.PartitionAny,
Topic: &t,
Partition: partitionID,
Topic: &topic,
}
for _, metric := range metrics {
objectsWritten.Add(float64(1))
Expand All @@ -87,3 +94,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin

}
}

func getPartitionAndTopic(topic string) (string, int32, error) {
parts := strings.Split(topic, "|")

if len(parts) == 1 {
return parts[0], kafka.PartitionAny, nil
}
h := fnv.New32a()
h.Write([]byte(parts[1]))

v, ok := topicPartitionCount.Load(parts[0])
if !ok {
logrus.WithField("topic", parts[0]).Error("did not find metadata requested topic")
return topic, kafka.PartitionAny, fmt.Errorf("could not")
}
return parts[0], int32(h.Sum32() % uint32(v.(int))), nil
}
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -27,6 +28,9 @@ import (
func main() {
logrus.Info("creating kafka producer")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kafkaConfig := kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
Expand Down Expand Up @@ -68,6 +72,12 @@ func main() {
logrus.WithError(err).Fatal("couldn't create kafka producer")
}

if kafkaPartitionLabels != nil {
if err := syncTopicMetadata(ctx, producer); err != nil {
logrus.WithError(err).Fatal("couldn't fetch topic metadata")
}
}

r := gin.New()

r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())
Expand Down
14 changes: 13 additions & 1 deletion serializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) {
}

func topic(labels map[string]string) string {
var buf bytes.Buffer
var buf, buf2 bytes.Buffer
if err := topicTemplate.Execute(&buf, labels); err != nil {
return ""
}
for _, s := range kafkaPartitionLabels {
v, ok := labels[s]
if ok {
if _, err := buf2.WriteString(v); err != nil {
return ""
}
}
}
if buf2.Len() > 0 {
buf.WriteString("|")
buf.WriteString(buf2.String())
}
return buf.String()
}

Expand Down
47 changes: 47 additions & 0 deletions topic_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"context"
"math"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/sirupsen/logrus"
)

var topicPartitionCount sync.Map

type metaDataFetcher interface {
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
}

func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error {

if err := processMetadata(producer); err != nil {
return err
}
go func() {
select {
case <-ctx.Done():
return

case <-time.After(kafkaMetadataInterval):
if err := processMetadata(producer); err != nil {
logrus.WithError(err).Error("could not fetch topic metadata")
}
}
}()
return nil
}

func processMetadata(producer metaDataFetcher) error {
metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds())))
if err != nil {
return err
}
for name, topic := range metadata.Topics {
topicPartitionCount.Store(name, len(topic.Partitions))
}
return nil
}

0 comments on commit f52e2d2

Please sign in to comment.