From decea42195c1d3340e79b99695dea811f86c1fb2 Mon Sep 17 00:00:00 2001 From: arunsudhakar Date: Tue, 14 Mar 2023 17:11:39 +0800 Subject: [PATCH 1/4] feat: add metric metadata to message --- config.go | 21 +++++++++++++++--- main.go | 4 ++++ metadata.go | 53 +++++++++++++++++++++++++++++++++++++++++++++ schemas/metric.avsc | 3 +++ serializers.go | 3 +++ serializers_test.go | 8 +++---- 6 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 metadata.go diff --git a/config.go b/config.go index 08cb2cd7..a9686569 100644 --- a/config.go +++ b/config.go @@ -16,18 +16,27 @@ package main import ( "fmt" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "gopkg.in/yaml.v2" "os" "strings" "text/template" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "gopkg.in/yaml.v2" + "github.com/sirupsen/logrus" ) +type MetricAttributes struct { + metricType string + metricHelp string + metricUnit string +} + var ( kafkaBrokerList = "kafka:9092" + promAPIEndPoint = "" + getMetricAttributes = false kafkaTopic = "metrics" topicTemplate *template.Template match = make(map[string]*dto.MetricFamily, 0) @@ -45,6 +54,7 @@ var ( kafkaSaslUsername = "" kafkaSaslPassword = "" serializer Serializer + metricsList = make(map[string]MetricAttributes) ) func init() { @@ -55,6 +65,11 @@ func init() { logrus.SetLevel(parseLogLevel(value)) } + if value := os.Getenv("PROM_API_ENDPOINT"); value != "" { + promAPIEndPoint = value + getMetricAttributes = true + } + if value := os.Getenv("KAFKA_BROKER_LIST"); value != "" { kafkaBrokerList = value } diff --git a/main.go b/main.go index a9173ed6..66067c14 100644 --- a/main.go +++ b/main.go @@ -63,6 +63,10 @@ func main() { } producer, err := kafka.NewProducer(&kafkaConfig) + logrus.Info("Prometheus URL is " + promAPIEndPoint) + if getMetricAttributes == true { + GetAllMetricAttributes(promAPIEndPoint, metricsList) + } if err != nil { logrus.WithError(err).Fatal("couldn't create kafka producer") diff --git a/metadata.go b/metadata.go new file mode 100644 index 00000000..189d125d --- /dev/null +++ b/metadata.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/sirupsen/logrus" +) + +func GetAllMetricAttributes(promAPIEndPoint string, metricsList map[string]MetricAttributes) { + + // Make a GET request to the Prometheus metadata API + response, err := http.Get(promAPIEndPoint) + if err != nil { + logrus.WithError(err).Errorln("Error making request") + return + } + defer response.Body.Close() + + // Read the response body + body, err := ioutil.ReadAll(response.Body) + if err != nil { + logrus.WithError(err).Errorln("Error reading response body") + // logrus.error("Error reading response body: %s\n", err.Error()) + return + } + + // Parse the JSON data into a map + var data map[string]interface{} + err = json.Unmarshal([]byte(body), &data) + if err != nil { + logrus.WithError(err).Errorln("Error parsing json") + return + } + logrus.WithFields(logrus.Fields{ + "[]": len(data["data"].(map[string]interface{})), + }).Debug("Metrics Count is ") + // var metricList = make(map[string]MetricAttributes) + for key, metrics := range data["data"].(map[string]interface{}) { + for _, metric := range metrics.([]interface{}) { + var metricAttribute MetricAttributes + metricAttribute.metricType = metric.(map[string]interface{})["type"].(string) + metricAttribute.metricHelp = metric.(map[string]interface{})["help"].(string) + metricAttribute.metricUnit = metric.(map[string]interface{})["unit"].(string) + metricsList[key] = metricAttribute + // fmt.Printf("Metric: %s, Type: %s, Help: %s, Unit: %s", key, metricAttribute.metricType, metricAttribute.metricHelp, metricAttribute.metricUnit) + } + } + logrus.WithFields(logrus.Fields{ + "[]": len(metricsList), + }).Debug("Map Size is ") +} diff --git a/schemas/metric.avsc b/schemas/metric.avsc index 8d312db5..a9f63baa 100644 --- a/schemas/metric.avsc +++ b/schemas/metric.avsc @@ -7,6 +7,9 @@ {"name": "timestamp", "type": "string"}, {"name": "value", "type": "string"}, {"name": "name", "type": "string"}, + {"name": "type", "type": "string"}, + {"name": "help", "type": "string"}, + {"name": "unit", "type": "string"}, {"name": "labels", "type": { "type": "map", "values": "string"} } ] } diff --git a/serializers.go b/serializers.go index e75e86dd..692c5c22 100644 --- a/serializers.go +++ b/serializers.go @@ -60,6 +60,9 @@ func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, err "value": strconv.FormatFloat(sample.Value, 'f', -1, 64), "name": name, "labels": labels, + "type": metricsList[name].metricType, + "help": metricsList[name].metricHelp, + "unit": metricsList[name].metricUnit, } data, err := s.Marshal(m) diff --git a/serializers_test.go b/serializers_test.go index a4ad5778..666584ec 100644 --- a/serializers_test.go +++ b/serializers_test.go @@ -46,8 +46,8 @@ func TestSerializeToJSON(t *testing.T) { assert.Nil(t, err) expectedSamples := []string{ - "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", - "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", + "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", + "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", } for i, metric := range output["metrics"] { @@ -76,8 +76,8 @@ func TestSerializeToAvro(t *testing.T) { assert.Nil(t, err) expectedSamples := []string{ - "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", - "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", + "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", + "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", } for i, metric := range output["metrics"] { From 7bcf2e0137ff8d4050617bd1c269f200cc1ef1e0 Mon Sep 17 00:00:00 2001 From: arunsudhakar Date: Tue, 14 Mar 2023 17:27:01 +0800 Subject: [PATCH 2/4] feat: updates to readme and helm charts --- README.md | 8 ++++++-- helm/prometheus-kafka-adapter/templates/deployment.yaml | 2 ++ helm/prometheus-kafka-adapter/values.yaml | 2 ++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b5b93237..091ac001 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,9 @@ Prometheus-kafka-adapter is a service which receives [Prometheus](https://github ## output -It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable. +It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable. +Metric metadata can be included in the JSON output, if the `PROM_API_ENDPOINT` is set to correct API endpoint of the prometheus service, eg http://localhost:9090/api/v1/metadata ### JSON ```json @@ -15,7 +16,9 @@ It is able to write JSON or Avro-JSON messages in a kafka topic, depending on th "timestamp": "1970-01-01T00:00:00Z", "value": "9876543210", "name": "up", - + "type": "gauge", + "help": "help text of metric", + "unit": "seconds" "labels": { "__name__": "up", "label1": "value1", @@ -38,6 +41,7 @@ There is a docker image `telefonica/prometheus-kafka-adapter:1.8.0` [available o Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables: +- `PROM_API_ENDPOINT`: defines prometheus metric metadata endpoint , not set by default and hence metadata wont be included. - `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`. - `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`) - `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`. diff --git a/helm/prometheus-kafka-adapter/templates/deployment.yaml b/helm/prometheus-kafka-adapter/templates/deployment.yaml index 51be8a7c..9227b37f 100644 --- a/helm/prometheus-kafka-adapter/templates/deployment.yaml +++ b/helm/prometheus-kafka-adapter/templates/deployment.yaml @@ -39,6 +39,8 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" imagePullPolicy: {{ .Values.image.pullPolicy }} env: + - name: PROM_API_ENDPOINT + value: {{ .Values.environment.PROM_API_ENDPOINT | quote }} # may want customizable service references - name: KAFKA_BROKER_LIST value: {{ tpl .Values.environment.KAFKA_BROKER_LIST . }} # may want customizable service references - name: KAFKA_TOPIC diff --git a/helm/prometheus-kafka-adapter/values.yaml b/helm/prometheus-kafka-adapter/values.yaml index 473400d0..e6f78989 100644 --- a/helm/prometheus-kafka-adapter/values.yaml +++ b/helm/prometheus-kafka-adapter/values.yaml @@ -32,6 +32,8 @@ KAFKA_SSL_CA_CERT: # - "" environment: + # defines the API endpoint of the prometheus server to read the metadata of the metrics from, eg http://localhost:9090/api/v1/metadata. If this is not set,metadata is not included in the topic message + PROM_API_ENDPOINT: "" # defines kafka endpoint and port, defaults to kafka:9092. KAFKA_BROKER_LIST: "" # defines kafka topic to be used, defaults to metrics. From 821244722a0645e57710837e35b43aa3bbf20f43 Mon Sep 17 00:00:00 2001 From: arunsudhakar Date: Wed, 15 Mar 2023 11:41:36 +0800 Subject: [PATCH 3/4] feat: allow setting attribute types --- .gitignore | 1 + README.md | 13 ++++--- config.go | 19 ++++++---- .../templates/deployment.yaml | 6 ++- helm/prometheus-kafka-adapter/values.yaml | 6 ++- main.go | 6 +-- metadata.go | 37 +++++++++++-------- schemas/metric.avsc | 3 -- serializers.go | 21 +++++++++-- serializers_test.go | 8 ++-- 10 files changed, 74 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index ce6763fa..4617838a 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ prometheus-kafka-adapter-libc prometheus-kafka-adapter-musl prometheus-kafka-adapter +mycode.go.txt diff --git a/README.md b/README.md index 091ac001..6ef70a2e 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Prometheus-kafka-adapter is a service which receives [Prometheus](https://github It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable. -Metric metadata can be included in the JSON output, if the `PROM_API_ENDPOINT` is set to correct API endpoint of the prometheus service, eg http://localhost:9090/api/v1/metadata +Metric metadata can be included in the JSON output, if the `PROM_METADATA_ENDPOINT` is set to correct API endpoint of the prometheus service, eg http://localhost:9090/api/v1/metadata. ### JSON ```json @@ -16,14 +16,14 @@ Metric metadata can be included in the JSON output, if the `PROM_API_ENDPOINT` i "timestamp": "1970-01-01T00:00:00Z", "value": "9876543210", "name": "up", - "type": "gauge", - "help": "help text of metric", - "unit": "seconds" "labels": { "__name__": "up", "label1": "value1", "label2": "value2" - } + }, + "type": "type", + "help": "help", + "unit": "unit" } ``` @@ -41,7 +41,8 @@ There is a docker image `telefonica/prometheus-kafka-adapter:1.8.0` [available o Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables: -- `PROM_API_ENDPOINT`: defines prometheus metric metadata endpoint , not set by default and hence metadata wont be included. +- `PROM_METADATA_ENDPOINT`: defines prometheus metric metadata endpoint , not set by default and hence metadata wont be included. +- `INLCUDED_METADATA`: specifies which attributes to be exported. The attributes should be comma separated. Permitted values are _type_, _help_ and _unit_. Only _type_ is included by default - `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`. - `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`) - `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`. diff --git a/config.go b/config.go index a9686569..f51e30a0 100644 --- a/config.go +++ b/config.go @@ -27,7 +27,7 @@ import ( "github.com/sirupsen/logrus" ) -type MetricAttributes struct { +type MetricMetadata struct { metricType string metricHelp string metricUnit string @@ -35,8 +35,8 @@ type MetricAttributes struct { var ( kafkaBrokerList = "kafka:9092" - promAPIEndPoint = "" - getMetricAttributes = false + promMetaDataEndPoint = "" + getMetricMetadata = false kafkaTopic = "metrics" topicTemplate *template.Template match = make(map[string]*dto.MetricFamily, 0) @@ -54,7 +54,8 @@ var ( kafkaSaslUsername = "" kafkaSaslPassword = "" serializer Serializer - metricsList = make(map[string]MetricAttributes) + metricsList = make(map[string]MetricMetadata) + includedMetaData = "type" ) func init() { @@ -65,9 +66,13 @@ func init() { logrus.SetLevel(parseLogLevel(value)) } - if value := os.Getenv("PROM_API_ENDPOINT"); value != "" { - promAPIEndPoint = value - getMetricAttributes = true + if value := os.Getenv("PROM_METADATA_ENDPOINT"); value != "" { + promMetaDataEndPoint = value + getMetricMetadata = true + } + + if value := os.Getenv("INLCUDED_METADATA"); value != "" { + includedMetaData = value } if value := os.Getenv("KAFKA_BROKER_LIST"); value != "" { diff --git a/helm/prometheus-kafka-adapter/templates/deployment.yaml b/helm/prometheus-kafka-adapter/templates/deployment.yaml index 9227b37f..ad7a7b18 100644 --- a/helm/prometheus-kafka-adapter/templates/deployment.yaml +++ b/helm/prometheus-kafka-adapter/templates/deployment.yaml @@ -39,8 +39,6 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" imagePullPolicy: {{ .Values.image.pullPolicy }} env: - - name: PROM_API_ENDPOINT - value: {{ .Values.environment.PROM_API_ENDPOINT | quote }} # may want customizable service references - name: KAFKA_BROKER_LIST value: {{ tpl .Values.environment.KAFKA_BROKER_LIST . }} # may want customizable service references - name: KAFKA_TOPIC @@ -93,6 +91,10 @@ spec: name: {{ include "prometheus-kafka-adapter.fullname" . }} key: KAFKA_SSL_CLIENT_KEY_PASS {{- end }} + - name: PROM_METADATA_ENDPOINT + value: {{ .Values.environment.PROM_METADATA_ENDPOINT | quote }} # may want customizable service references + - name: INLCUDED_METADATA + value: {{ .Values.environment.INLCUDED_METADATA | quote }} # may want customizable service references ports: - name: http containerPort: {{ .Values.environment.PORT }} diff --git a/helm/prometheus-kafka-adapter/values.yaml b/helm/prometheus-kafka-adapter/values.yaml index e6f78989..4a2e670d 100644 --- a/helm/prometheus-kafka-adapter/values.yaml +++ b/helm/prometheus-kafka-adapter/values.yaml @@ -32,8 +32,6 @@ KAFKA_SSL_CA_CERT: # - "" environment: - # defines the API endpoint of the prometheus server to read the metadata of the metrics from, eg http://localhost:9090/api/v1/metadata. If this is not set,metadata is not included in the topic message - PROM_API_ENDPOINT: "" # defines kafka endpoint and port, defaults to kafka:9092. KAFKA_BROKER_LIST: "" # defines kafka topic to be used, defaults to metrics. @@ -67,6 +65,10 @@ environment: KAFKA_SSL_CA_CERT_FILE: # defines the match rules, simple metric name match and label match MATCH: + # defines the API endpoint of the prometheus server to read the metadata of the metrics from, eg http://localhost:9090/api/v1/metadata. If this is not set,metadata is not included in the topic message + PROM_METADATA_ENDPOINT: "" + # defines the metadata to be included in the topic if PROM_METADATA_ENDPOINT is set. Permitted values are type , unit & help. They need to be in CSV format. Defaulted to type + INLCUDED_METADATA: "type" serviceAccount: # Specifies whether a service account should be created diff --git a/main.go b/main.go index 66067c14..888298b9 100644 --- a/main.go +++ b/main.go @@ -63,9 +63,9 @@ func main() { } producer, err := kafka.NewProducer(&kafkaConfig) - logrus.Info("Prometheus URL is " + promAPIEndPoint) - if getMetricAttributes == true { - GetAllMetricAttributes(promAPIEndPoint, metricsList) + if getMetricMetadata == true { + logrus.Debugf("Prometheus API URL is %s", promMetaDataEndPoint) + GetAllMetricMetadata(promMetaDataEndPoint, metricsList) } if err != nil { diff --git a/metadata.go b/metadata.go index 189d125d..06b07c67 100644 --- a/metadata.go +++ b/metadata.go @@ -4,14 +4,15 @@ import ( "encoding/json" "io/ioutil" "net/http" + "strings" "github.com/sirupsen/logrus" ) -func GetAllMetricAttributes(promAPIEndPoint string, metricsList map[string]MetricAttributes) { +func GetAllMetricMetadata(promMetaDataEndPoint string, metricsList map[string]MetricMetadata) { // Make a GET request to the Prometheus metadata API - response, err := http.Get(promAPIEndPoint) + response, err := http.Get(promMetaDataEndPoint) if err != nil { logrus.WithError(err).Errorln("Error making request") return @@ -33,21 +34,27 @@ func GetAllMetricAttributes(promAPIEndPoint string, metricsList map[string]Metri logrus.WithError(err).Errorln("Error parsing json") return } - logrus.WithFields(logrus.Fields{ - "[]": len(data["data"].(map[string]interface{})), - }).Debug("Metrics Count is ") - // var metricList = make(map[string]MetricAttributes) + // var metricList = make(map[string]MetricMetadata) for key, metrics := range data["data"].(map[string]interface{}) { for _, metric := range metrics.([]interface{}) { - var metricAttribute MetricAttributes - metricAttribute.metricType = metric.(map[string]interface{})["type"].(string) - metricAttribute.metricHelp = metric.(map[string]interface{})["help"].(string) - metricAttribute.metricUnit = metric.(map[string]interface{})["unit"].(string) - metricsList[key] = metricAttribute - // fmt.Printf("Metric: %s, Type: %s, Help: %s, Unit: %s", key, metricAttribute.metricType, metricAttribute.metricHelp, metricAttribute.metricUnit) + var metricMetadata MetricMetadata + logrus.Debugf("Processing Metric %s, Metadata to be included %s", key, includedMetaData) + + if strings.Contains(strings.ToLower(includedMetaData), "type") { + metricMetadata.metricType = metric.(map[string]interface{})["type"].(string) + logrus.Debugf("Type is %s", metricMetadata.metricType) + } + if strings.Contains(strings.ToLower(includedMetaData), "help") { + metricMetadata.metricHelp = metric.(map[string]interface{})["help"].(string) + logrus.Debugf("Help is %s", metricMetadata.metricHelp) + } + if strings.Contains(strings.ToLower(includedMetaData), "unit") { + metricMetadata.metricUnit = metric.(map[string]interface{})["unit"].(string) + logrus.Debugf("Unit is %s", metricMetadata.metricUnit) + } + metricsList[key] = metricMetadata + // fmt.Printf("Metric: %s, Type: %s, Help: %s, Unit: %s", key, metricMetadata.metricType, metricMetadata.metricHelp, metricMetadata.metricUnit) } } - logrus.WithFields(logrus.Fields{ - "[]": len(metricsList), - }).Debug("Map Size is ") + logrus.Debugf("Total number of metrics parsed is %v", len(metricsList)) } diff --git a/schemas/metric.avsc b/schemas/metric.avsc index a9f63baa..8d312db5 100644 --- a/schemas/metric.avsc +++ b/schemas/metric.avsc @@ -7,9 +7,6 @@ {"name": "timestamp", "type": "string"}, {"name": "value", "type": "string"}, {"name": "name", "type": "string"}, - {"name": "type", "type": "string"}, - {"name": "help", "type": "string"}, - {"name": "unit", "type": "string"}, {"name": "labels", "type": { "type": "map", "values": "string"} } ] } diff --git a/serializers.go b/serializers.go index 692c5c22..c510b895 100644 --- a/serializers.go +++ b/serializers.go @@ -49,20 +49,33 @@ func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, err for _, sample := range ts.Samples { name := string(labels["__name__"]) + if !filter(name, labels) { objectsFiltered.Add(float64(1)) continue } - epoch := time.Unix(sample.Timestamp/1000, 0).UTC() m := map[string]interface{}{ "timestamp": epoch.Format(time.RFC3339), "value": strconv.FormatFloat(sample.Value, 'f', -1, 64), "name": name, "labels": labels, - "type": metricsList[name].metricType, - "help": metricsList[name].metricHelp, - "unit": metricsList[name].metricUnit, + } + + if getMetricMetadata { + val, ok := metricsList[name] + + if ok { + if val.metricType != "" { + m["type"] = val.metricType + } + if val.metricUnit != "" { + m["unit"] = val.metricUnit + } + if val.metricHelp != "" { + m["help"] = val.metricHelp + } + } } data, err := s.Marshal(m) diff --git a/serializers_test.go b/serializers_test.go index 666584ec..a4ad5778 100644 --- a/serializers_test.go +++ b/serializers_test.go @@ -46,8 +46,8 @@ func TestSerializeToJSON(t *testing.T) { assert.Nil(t, err) expectedSamples := []string{ - "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", - "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", + "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", + "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", } for i, metric := range output["metrics"] { @@ -76,8 +76,8 @@ func TestSerializeToAvro(t *testing.T) { assert.Nil(t, err) expectedSamples := []string{ - "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", - "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"},\"type\":\"\",\"help\":\"\",\"unit\":\"\"}", + "{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", + "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", } for i, metric := range output["metrics"] { From efab7caab926fdbf950b670444b606ae252e7e89 Mon Sep 17 00:00:00 2001 From: arunsudhakar Date: Wed, 15 Mar 2023 11:46:26 +0800 Subject: [PATCH 4/4] docs: updated readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 6ef70a2e..99ad4f75 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Prometheus-kafka-adapter is a service which receives [Prometheus](https://github It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable. Metric metadata can be included in the JSON output, if the `PROM_METADATA_ENDPOINT` is set to correct API endpoint of the prometheus service, eg http://localhost:9090/api/v1/metadata. +Only the metrics which are available at application startup will have the metadata included. To inlcude metadata for newer metrics, the application will need to be restarted ### JSON ```json