From 28b1629a21e885bdd2b527d6a1c1de8483dc47d4 Mon Sep 17 00:00:00 2001 From: zirain Date: Tue, 17 Sep 2024 06:59:27 +0800 Subject: [PATCH] implemnt Prometheus sink (#681) Signed-off-by: zirain --- README.md | 104 ++++++++++++++++ go.mod | 8 ++ go.sum | 18 +++ src/service_cmd/runner/runner.go | 39 +++--- src/settings/settings.go | 4 + src/stats/prom/default_mapper.yaml | 89 ++++++++++++++ src/stats/prom/prometheus_sink.go | 160 +++++++++++++++++++++++++ src/stats/prom/prometheus_sink_test.go | 111 +++++++++++++++++ 8 files changed, 517 insertions(+), 16 deletions(-) create mode 100644 src/stats/prom/default_mapper.yaml create mode 100644 src/stats/prom/prometheus_sink.go create mode 100644 src/stats/prom/prometheus_sink_test.go diff --git a/README.md b/README.md index bbfa4347..fff18bae 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ - [DogStatsD](#dogstatsd) - [Example](#example) - [Continued example:](#continued-example) + - [Prometheus](#prometheus) - [HTTP Port](#http-port) - [/json endpoint](#json-endpoint) - [Debug Port](#debug-port) @@ -901,6 +902,109 @@ Then, declare additional rules for the `DESCRIPTOR` mogrifier 2. `DOG_STATSD_MOGRIFIER_HITS_NAME`: `ratelimit.service.rate_limit.$3` 3. `DOG_STATSD_MOGRIFIER_HITS_TAGS`: `domain:$1,descriptor:$2` +## Prometheus + +To enable Prometheus integration set: + +1. `USE_PROMETHEUS`: `true` to use [Prometheus](https://prometheus.io/) +2. `PROMETHEUS_ADDR`: The port to listen on for Prometheus metrics. Defaults to `:9090` +3. `PROMETHEUS_PATH`: The path to listen on for Prometheus metrics. Defaults to `/metrics` +4. `PROMETHEUS_MAPPER_YAML`: The path to the YAML file that defines the mapping from statsd to prometheus metrics. + +Define the mapping from statsd to prometheus metrics in a YAML file. +Find more information about the mapping in the [Metric Mapping and Configuration](https://github.com/prometheus/statsd_exporter?tab=readme-ov-file#metric-mapping-and-configuration). +The default setting is: + +```yaml +mappings: # Requires statsd exporter >= v0.6.0 since it uses the "drop" action. + - match: "ratelimit.service.rate_limit.*.*.near_limit" + name: "ratelimit_service_rate_limit_near_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + - match: "ratelimit.service.rate_limit.*.*.over_limit" + name: "ratelimit_service_rate_limit_over_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + - match: "ratelimit.service.rate_limit.*.*.total_hits" + name: "ratelimit_service_rate_limit_total_hits" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + - match: "ratelimit.service.rate_limit.*.*.within_limit" + name: "ratelimit_service_rate_limit_within_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + + - match: "ratelimit.service.rate_limit.*.*.*.near_limit" + name: "ratelimit_service_rate_limit_near_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + - match: "ratelimit.service.rate_limit.*.*.*.over_limit" + name: "ratelimit_service_rate_limit_over_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + - match: "ratelimit.service.rate_limit.*.*.*.total_hits" + name: "ratelimit_service_rate_limit_total_hits" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + - match: "ratelimit.service.rate_limit.*.*.*.within_limit" + name: "ratelimit_service_rate_limit_within_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + + - match: "ratelimit.service.call.should_rate_limit.*" + name: "ratelimit_service_should_rate_limit_error" + match_metric_type: counter + labels: + err_type: "$1" + + - match: "ratelimit_server.*.total_requests" + name: "ratelimit_service_total_requests" + match_metric_type: counter + labels: + grpc_method: "$1" + + - match: "ratelimit_server.*.response_time" + name: "ratelimit_service_response_time_seconds" + timer_type: histogram + labels: + grpc_method: "$1" + + - match: "ratelimit.service.config_load_success" + name: "ratelimit_service_config_load_success" + match_metric_type: counter + - match: "ratelimit.service.config_load_error" + name: "ratelimit_service_config_load_error" + match_metric_type: counter + + - match: "ratelimit.service.rate_limit.*.*.*.shadow_mode" + name: "ratelimit_service_rate_limit_shadow_mode" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" +``` + # HTTP Port The ratelimit service listens to HTTP 1.1 (by default on port 8080) with two endpoints: diff --git a/go.mod b/go.mod index 37e8e14b..b7b3051e 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,8 @@ require ( github.com/lyft/goruntime v0.3.0 github.com/lyft/gostats v0.4.14 github.com/mediocregopher/radix/v3 v3.8.1 + github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/statsd_exporter v0.26.1 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 @@ -37,6 +39,7 @@ require ( cel.dev/expr v0.15.0 // indirect github.com/Microsoft/go-winio v0.5.0 // indirect github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -44,12 +47,17 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-kit/log v0.2.1 // indirect + github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect diff --git a/go.sum b/go.sum index 49b0ddf7..b54c5c8b 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGn github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -42,7 +44,11 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -51,6 +57,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= @@ -101,7 +109,17 @@ github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795 h1:pH+U6p github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/statsd_exporter v0.26.1 h1:ucbIAdPmwAUcA+dU+Opok8Qt81Aw8HanlO+2N/Wjv7w= +github.com/prometheus/statsd_exporter v0.26.1/go.mod h1:XlDdjAmRmx3JVvPPYuFNUg+Ynyb5kR69iPPkQjxXFMk= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index c079182f..ada57757 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -9,25 +9,22 @@ import ( "sync" "time" - "github.com/envoyproxy/ratelimit/src/godogstats" - "github.com/envoyproxy/ratelimit/src/metrics" - "github.com/envoyproxy/ratelimit/src/stats" - "github.com/envoyproxy/ratelimit/src/trace" - - gostats "github.com/lyft/gostats" - "github.com/coocood/freecache" - pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" - + gostats "github.com/lyft/gostats" logger "github.com/sirupsen/logrus" + "github.com/envoyproxy/ratelimit/src/godogstats" "github.com/envoyproxy/ratelimit/src/limiter" "github.com/envoyproxy/ratelimit/src/memcached" + "github.com/envoyproxy/ratelimit/src/metrics" "github.com/envoyproxy/ratelimit/src/redis" "github.com/envoyproxy/ratelimit/src/server" ratelimit "github.com/envoyproxy/ratelimit/src/service" "github.com/envoyproxy/ratelimit/src/settings" + "github.com/envoyproxy/ratelimit/src/stats" + "github.com/envoyproxy/ratelimit/src/stats/prom" + "github.com/envoyproxy/ratelimit/src/trace" "github.com/envoyproxy/ratelimit/src/utils" ) @@ -42,14 +39,14 @@ type Runner struct { func NewRunner(s settings.Settings) Runner { var store gostats.Store - if s.DisableStats { + switch { + case s.DisableStats: logger.Info("Stats disabled") store = gostats.NewStore(gostats.NewNullSink(), false) - } else if s.UseDogStatsd { - if s.UseStatsd { - logger.Fatalf("Error: unable to use both stats sink at the same time. Set either USE_DOG_STATSD or USE_STATSD but not both.") + case s.UseDogStatsd: + if s.UseStatsd || s.UsePrometheus { + logger.Fatalf("Error: unable to use more than one stats sink at the same time. Set one of USE_DOG_STATSD, USE_STATSD, USE_PROMETHEUS.") } - var err error sink, err := godogstats.NewSink( godogstats.WithStatsdHost(s.StatsdHost), godogstats.WithStatsdPort(s.StatsdPort), @@ -59,10 +56,20 @@ func NewRunner(s settings.Settings) Runner { } logger.Info("Stats initialized for dogstatsd") store = gostats.NewStore(sink, false) - } else if s.UseStatsd { + case s.UseStatsd: + if s.UseDogStatsd || s.UsePrometheus { + logger.Fatalf("Error: unable to use more than one stats sink at the same time. Set one of USE_DOG_STATSD, USE_STATSD, USE_PROMETHEUS.") + } logger.Info("Stats initialized for statsd") store = gostats.NewStore(gostats.NewTCPStatsdSink(gostats.WithStatsdHost(s.StatsdHost), gostats.WithStatsdPort(s.StatsdPort)), false) - } else { + case s.UsePrometheus: + if s.UseDogStatsd || s.UseStatsd { + logger.Fatalf("Error: unable to use more than one stats sink at the same time. Set one of USE_DOG_STATSD, USE_STATSD, USE_PROMETHEUS.") + } + logger.Info("Stats initialized for Prometheus") + store = gostats.NewStore(prom.NewPrometheusSink(prom.WithAddr(s.PrometheusAddr), + prom.WithPath(s.PrometheusPath), prom.WithMapperYamlPath(s.PrometheusMapperYaml)), false) + default: logger.Info("Stats initialized for stdout") store = gostats.NewStore(gostats.NewLoggingSink(), false) } diff --git a/src/settings/settings.go b/src/settings/settings.go index 49da0ea5..9febf7d9 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -87,6 +87,10 @@ type Settings struct { ExtraTags map[string]string `envconfig:"EXTRA_TAGS" default:""` StatsFlushInterval time.Duration `envconfig:"STATS_FLUSH_INTERVAL" default:"10s"` DisableStats bool `envconfig:"DISABLE_STATS" default:"false"` + UsePrometheus bool `envconfig:"USE_PROMETHEUS" default:"false"` + PrometheusAddr string `envconfig:"PROMETHEUS_ADDR" default:":9090"` + PrometheusPath string `envconfig:"PROMETHEUS_PATH" default:"/metrics"` + PrometheusMapperYaml string `envconfig:"PROMETHEUS_MAPPER_YAML" default:""` // Settings for rate limit configuration RuntimePath string `envconfig:"RUNTIME_ROOT" default:"/srv/runtime_data/current"` diff --git a/src/stats/prom/default_mapper.yaml b/src/stats/prom/default_mapper.yaml new file mode 100644 index 00000000..df30b1dc --- /dev/null +++ b/src/stats/prom/default_mapper.yaml @@ -0,0 +1,89 @@ +# Requires statsd exporter >= v0.6.0 since it uses the "drop" action. +mappings: + - match: "ratelimit.service.rate_limit.*.*.near_limit" + name: "ratelimit_service_rate_limit_near_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + - match: "ratelimit.service.rate_limit.*.*.over_limit" + name: "ratelimit_service_rate_limit_over_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + - match: "ratelimit.service.rate_limit.*.*.total_hits" + name: "ratelimit_service_rate_limit_total_hits" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + - match: "ratelimit.service.rate_limit.*.*.within_limit" + name: "ratelimit_service_rate_limit_within_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + + - match: "ratelimit.service.rate_limit.*.*.*.near_limit" + name: "ratelimit_service_rate_limit_near_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + - match: "ratelimit.service.rate_limit.*.*.*.over_limit" + name: "ratelimit_service_rate_limit_over_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + - match: "ratelimit.service.rate_limit.*.*.*.total_hits" + name: "ratelimit_service_rate_limit_total_hits" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + - match: "ratelimit.service.rate_limit.*.*.*.within_limit" + name: "ratelimit_service_rate_limit_within_limit" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" + + - match: "ratelimit.service.call.should_rate_limit.*" + name: "ratelimit_service_should_rate_limit_error" + match_metric_type: counter + labels: + err_type: "$1" + + - match: "ratelimit_server.*.total_requests" + name: "ratelimit_service_total_requests" + match_metric_type: counter + labels: + grpc_method: "$1" + + - match: "ratelimit_server.*.response_time" + name: "ratelimit_service_response_time_seconds" + timer_type: histogram + labels: + grpc_method: "$1" + + - match: "ratelimit.service.config_load_success" + name: "ratelimit_service_config_load_success" + match_metric_type: counter + + - match: "ratelimit.service.config_load_error" + name: "ratelimit_service_config_load_error" + match_metric_type: counter + + - match: "ratelimit.service.rate_limit.*.*.*.shadow_mode" + name: "ratelimit_service_rate_limit_shadow_mode" + timer_type: "histogram" + labels: + domain: "$1" + key1: "$2" + key2: "$3" diff --git a/src/stats/prom/prometheus_sink.go b/src/stats/prom/prometheus_sink.go new file mode 100644 index 00000000..58b27ece --- /dev/null +++ b/src/stats/prom/prometheus_sink.go @@ -0,0 +1,160 @@ +package prom + +import ( + _ "embed" + "net/http" + + "github.com/go-kit/log" + gostats "github.com/lyft/gostats" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/exporter" + "github.com/prometheus/statsd_exporter/pkg/mapper" + "github.com/sirupsen/logrus" +) + +var ( + //go:embed default_mapper.yaml + defaultMapper string + _ gostats.Sink = &prometheusSink{} + + eventsActions = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_actions_total", + Help: "The total number of StatsD events by action.", + }, + []string{"action"}, + ) + eventsUnmapped = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_unmapped_total", + Help: "The total number of StatsD events no mapping was found for.", + }) + metricsCount = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "statsd_exporter_metrics_total", + Help: "The total number of metrics.", + }, + []string{"type"}, + ) + conflictingEventStats = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_conflict_total", + Help: "The total number of StatsD events with conflicting names.", + }, + []string{"type"}, + ) + eventStats = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_total", + Help: "The total number of StatsD events seen.", + }, + []string{"type"}, + ) + errorEventStats = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_error_total", + Help: "The total number of StatsD events discarded due to errors.", + }, + []string{"reason"}, + ) +) + +type prometheusSink struct { + config struct { + addr string + path string + mapperYamlPath string + } + mapper *mapper.MetricMapper + events chan event.Events + exp *exporter.Exporter +} + +type prometheusSinkOption func(sink *prometheusSink) + +func WithAddr(addr string) prometheusSinkOption { + return func(sink *prometheusSink) { + sink.config.addr = addr + } +} + +func WithPath(path string) prometheusSinkOption { + return func(sink *prometheusSink) { + sink.config.path = path + } +} + +func WithMapperYamlPath(mapperYamlPath string) prometheusSinkOption { + return func(sink *prometheusSink) { + sink.config.mapperYamlPath = mapperYamlPath + } +} + +// NewPrometheusSink returns a Sink that flushes stats to os.StdErr. +func NewPrometheusSink(opts ...prometheusSinkOption) gostats.Sink { + promRegistry := prometheus.DefaultRegisterer + sink := &prometheusSink{ + events: make(chan event.Events), + mapper: &mapper.MetricMapper{ + Registerer: promRegistry, + }, + } + for _, opt := range opts { + opt(sink) + } + if sink.config.addr == "" { + sink.config.addr = ":9090" + } + if sink.config.path == "" { + sink.config.path = "/metrics" + } + http.Handle(sink.config.path, promhttp.Handler()) + go func() { + logrus.Infof("Starting prometheus sink on %s%s", sink.config.addr, sink.config.path) + _ = http.ListenAndServe(sink.config.addr, nil) + }() + if sink.config.mapperYamlPath != "" { + _ = sink.mapper.InitFromFile(sink.config.mapperYamlPath) + } else { + _ = sink.mapper.InitFromYAMLString(defaultMapper) + } + + sink.exp = exporter.NewExporter(promRegistry, + sink.mapper, log.NewNopLogger(), + eventsActions, eventsUnmapped, + errorEventStats, eventStats, + conflictingEventStats, metricsCount) + + go func() { + sink.exp.Listen(sink.events) + }() + + return sink +} + +func (s *prometheusSink) FlushCounter(name string, value uint64) { + s.events <- event.Events{&event.CounterEvent{ + CMetricName: name, + CValue: float64(value), + CLabels: make(map[string]string), + }} +} + +func (s *prometheusSink) FlushGauge(name string, value uint64) { + s.events <- event.Events{&event.GaugeEvent{ + GMetricName: name, + GValue: float64(value), + GLabels: make(map[string]string), + }} +} + +func (s *prometheusSink) FlushTimer(name string, value float64) { + s.events <- event.Events{&event.ObserverEvent{ + OMetricName: name, + OValue: value, + OLabels: make(map[string]string), + }} +} diff --git a/src/stats/prom/prometheus_sink_test.go b/src/stats/prom/prometheus_sink_test.go new file mode 100644 index 00000000..cc4b8900 --- /dev/null +++ b/src/stats/prom/prometheus_sink_test.go @@ -0,0 +1,111 @@ +package prom + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var s = NewPrometheusSink() + +func TestFlushCounter(t *testing.T) { + s.FlushCounter("ratelimit_server.ShouldRateLimit.total_requests", 1) + assert.Eventually(t, func() bool { + metricFamilies, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + + metrics := make(map[string]*dto.MetricFamily) + for _, metricFamily := range metricFamilies { + metrics[*metricFamily.Name] = metricFamily + } + + m, ok := metrics["ratelimit_service_total_requests"] + require.True(t, ok) + require.Len(t, m.Metric, 1) + require.Equal(t, map[string]string{ + "grpc_method": "ShouldRateLimit", + }, toMap(m.Metric[0].Label)) + require.Equal(t, 1.0, *m.Metric[0].Counter.Value) + return true + }, time.Second, time.Millisecond) +} + +func toMap(labels []*dto.LabelPair) map[string]string { + m := make(map[string]string) + for _, l := range labels { + m[*l.Name] = *l.Value + } + return m +} + +func TestFlushCounterWithDifferentLabels(t *testing.T) { + s.FlushCounter("ratelimit.service.rate_limit.domain1.key1_val1.over_limit", 1) + s.FlushCounter("ratelimit.service.rate_limit.domain1.key1_val1.key2_val2.over_limit", 2) + assert.Eventually(t, func() bool { + metricFamilies, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + + metrics := make(map[string]*dto.MetricFamily) + for _, metricFamily := range metricFamilies { + metrics[*metricFamily.Name] = metricFamily + } + + m, ok := metrics["ratelimit_service_rate_limit_over_limit"] + require.True(t, ok) + require.Len(t, m.Metric, 2) + require.Equal(t, 1.0, *m.Metric[0].Counter.Value) + require.Equal(t, map[string]string{ + "domain": "domain1", + "key1": "key1_val1", + }, toMap(m.Metric[0].Label)) + require.Equal(t, 2.0, *m.Metric[1].Counter.Value) + require.Equal(t, map[string]string{ + "domain": "domain1", + "key1": "key1_val1", + "key2": "key2_val2", + }, toMap(m.Metric[1].Label)) + return true + }, time.Second, time.Millisecond) +} + +func TestFlushGauge(t *testing.T) { + s.FlushGauge("ratelimit.service.rate_limit.domain1.key1.test_gauge", 1) + metricFamilies, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + + metrics := make(map[string]*dto.MetricFamily) + for _, metricFamily := range metricFamilies { + metrics[*metricFamily.Name] = metricFamily + } + + _, ok := metrics["ratelimit_service_rate_limit_test_gauge"] + require.False(t, ok) +} + +func TestFlushTimer(t *testing.T) { + s.FlushTimer("ratelimit.service.rate_limit.mongo_cps.database_users.total_hits", 1) + assert.Eventually(t, func() bool { + metricFamilies, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + + metrics := make(map[string]*dto.MetricFamily) + for _, metricFamily := range metricFamilies { + metrics[*metricFamily.Name] = metricFamily + } + + m, ok := metrics["ratelimit_service_rate_limit_total_hits"] + require.True(t, ok) + require.Len(t, m.Metric, 1) + require.Equal(t, uint64(1), *m.Metric[0].Histogram.SampleCount) + require.Equal(t, map[string]string{ + "domain": "mongo_cps", + "key1": "database_users", + }, toMap(m.Metric[0].Label)) + require.Equal(t, 1.0, *m.Metric[0].Histogram.SampleSum) + return true + }, time.Second, time.Millisecond) +}