diff --git a/CHANGELOG.md b/CHANGELOG.md index 51638bdc..bf046ace 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ This changelog keeps track of work items that have been completed and are ready ### New +- **General**: Add support to collect metrics using either a prometheus compatible endpoint or by sending metrics to an OTEL http endpoint ([#910](https://github.com/kedacore/http-add-on/issues/910)) - **General**: Propagate HTTPScaledObject labels and annotations to ScaledObject ([#840](https://github.com/kedacore/http-add-on/issues/840)) - **General**: Provide support to allow HTTP scaler to work alongside other core KEDA scalers ([#489](https://github.com/kedacore/http-add-on/issues/489)) - **General**: Support aggregation windows ([#882](https://github.com/kedacore/http-add-on/issues/882)) diff --git a/Makefile b/Makefile index 3bf32588..42dc86e7 100644 --- a/Makefile +++ b/Makefile @@ -147,6 +147,12 @@ deploy: manifests kustomize ## Deploy to the K8s cluster specified in ~/.kube/co cd config/interceptor && \ $(KUSTOMIZE) edit set image ghcr.io/kedacore/http-add-on-interceptor=${IMAGE_INTERCEPTOR_VERSIONED_TAG} + cd config/interceptor && \ + $(KUSTOMIZE) edit add patch --path otel/deployment.yaml --group apps --kind Deployment --name interceptor --version v1 + + cd config/interceptor && \ + $(KUSTOMIZE) edit add patch --path otel/scaledobject.yaml --group keda.sh --kind ScaledObject --name interceptor --version v1alpha1 + cd config/scaler && \ $(KUSTOMIZE) edit set image ghcr.io/kedacore/http-add-on-scaler=${IMAGE_SCALER_VERSIONED_TAG} diff --git a/config/interceptor/deployment.yaml b/config/interceptor/deployment.yaml index cc7c7982..9207cb57 100644 --- a/config/interceptor/deployment.yaml +++ b/config/interceptor/deployment.yaml @@ -55,6 +55,8 @@ spec: containerPort: 9090 - name: proxy containerPort: 8080 + - name: metrics + containerPort: 2223 livenessProbe: httpGet: path: /livez diff --git a/config/interceptor/kustomization.yaml b/config/interceptor/kustomization.yaml index 4158a666..32e18f83 100644 --- a/config/interceptor/kustomization.yaml +++ b/config/interceptor/kustomization.yaml @@ -6,6 +6,7 @@ resources: - role_binding.yaml - admin.service.yaml - proxy.service.yaml +- metrics.service.yaml - service_account.yaml - scaledobject.yaml configurations: diff --git a/config/interceptor/metrics.service.yaml b/config/interceptor/metrics.service.yaml new file mode 100644 index 00000000..df125610 --- /dev/null +++ b/config/interceptor/metrics.service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: interceptor-metrics +spec: + type: ClusterIP + ports: + - name: metrics + protocol: TCP + port: 2223 + targetPort: metrics diff --git a/config/interceptor/otel/deployment.yaml b/config/interceptor/otel/deployment.yaml new file mode 100644 index 00000000..0f6b3ec8 --- /dev/null +++ b/config/interceptor/otel/deployment.yaml @@ -0,0 +1,23 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: interceptor +spec: + replicas: 1 + template: + spec: + containers: + - name: interceptor + env: + - name: KEDA_HTTP_OTEL_PROM_EXPORTER_ENABLED + value: "true" + - name: KEDA_HTTP_OTEL_PROM_EXPORTER_PORT + value: "2223" + - name: KEDA_HTTP_OTEL_HTTP_EXPORTER_ENABLED + value: "true" + - name: KEDA_HTTP_OTEL_HTTP_COLLECTOR_ENDPOINT + value: "opentelemetry-collector.open-telemetry-system:4318" + - name: KEDA_HTTP_OTEL_HTTP_COLLECTOR_INSECURE + value: "true" + - name: KEDA_HTTP_OTEL_METRIC_EXPORT_INTERVAL + value: "1" diff --git a/config/interceptor/otel/kustomization.yaml b/config/interceptor/otel/kustomization.yaml new file mode 100644 index 00000000..74055b35 --- /dev/null +++ b/config/interceptor/otel/kustomization.yaml @@ -0,0 +1,5 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: +- deployment.yaml +- scaledobject.yaml diff --git a/config/interceptor/otel/scaledobject.yaml b/config/interceptor/otel/scaledobject.yaml new file mode 100644 index 00000000..c3c0e462 --- /dev/null +++ b/config/interceptor/otel/scaledobject.yaml @@ -0,0 +1,6 @@ +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: interceptor +spec: + minReplicaCount: 1 diff --git a/docs/operate.md b/docs/operate.md new file mode 100644 index 00000000..9c066dfa --- /dev/null +++ b/docs/operate.md @@ -0,0 +1,21 @@ +# Configuring metrics for the KEDA HTTP Add-on interceptor proxy + +### Exportable metrics: +* **Pending request count** - the number of pending requests for a given host. +* **Total request count** - the total number of requests for a given host with method, path and response code attributes. + +There are currently 2 supported methods for exposing metrics from the interceptor proxy service - via a Prometheus compatible metrics endpoint or by pushing metrics to a OTEL HTTP collector. + +### Configuring the Prometheus compatible metrics endpoint +When configured, the interceptor proxy can expose metrics on a Prometheus compatible endpoint. + +This endpoint can be enabled by setting the `KEDA_HTTP_OTEL_PROM_EXPORTER_ENABLED` environment variable to `true` on the interceptor deployment (`true` by default) and by setting `KEDA_HTTP_OTEL_PROM_EXPORTER_PORT` to an unused port for the endpoint to be made avaialble on (`2223` by default). + +### Configuring the OTEL HTTP exporter +When configured, the interceptor proxy can export metrics to a OTEL HTTP collector. + +The OTEL exporter can be enabled by setting the `KEDA_HTTP_OTEL_HTTP_EXPORTER_ENABLED` environment variable to `true` on the interceptor deployment (`false` by default). When enabled the `KEDA_HTTP_OTEL_HTTP_COLLECTOR_ENDPOINT` environment variable must also be configured so the exporter knows what collector to send the metrics to (e.g. opentelemetry-collector.open-telemetry-system:4318). + +If the collector is exposed on a unsecured endpoint then you can set the `KEDA_HTTP_OTEL_HTTP_COLLECTOR_INSECURE` environment variable to `true` (`false` by default) which will disable client security on the exporter. + +If you need to provide any headers such as authentication details in order to utilise your OTEL collector you can add them into the `KEDA_HTTP_OTEL_HTTP_HEADERS` environment variable. The frequency at which the metrics are exported can be configured by setting `KEDA_HTTP_OTEL_METRIC_EXPORT_INTERVAL` to the number of seconds you require between each export interval (`30` by default). diff --git a/docs/readme.md b/docs/readme.md index cfc41b74..9bb7a096 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -7,6 +7,7 @@ Here is an overview of detailed documentation: - [Design](design.md) - [Use-Cases](use_cases.md) - [Walkthrough](walkthrough.md) +- [Operate](operate.md) - [Developing](developing.md) - [Integrations](integrations.md) - [FAQ](faq.md) diff --git a/go.mod b/go.mod index 40591582..a4d9cd69 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,9 @@ require ( github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.31.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.23.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.0 + go.opentelemetry.io/otel/sdk v1.23.0 golang.org/x/sync v0.6.0 google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 @@ -24,6 +27,16 @@ require ( sigs.k8s.io/kustomize/kustomize/v5 v5.3.0 ) +require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect + go.opentelemetry.io/otel/trace v1.23.0 // indirect + go.opentelemetry.io/proto/otlp v1.1.0 // indirect + go.uber.org/zap v1.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect +) + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -62,16 +75,18 @@ require ( github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect + github.com/prometheus/client_golang v1.18.0 + github.com/prometheus/client_model v0.5.0 + github.com/prometheus/common v0.46.0 github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xlab/treeprint v1.2.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.45.1 + go.opentelemetry.io/otel/metric v1.23.0 + go.opentelemetry.io/otel/sdk/metric v1.23.0 go.starlark.net v0.0.0-20231121155337-90ade8b19d09 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.23.0 // indirect diff --git a/go.sum b/go.sum index b9a3d445..3aeceff0 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -21,9 +23,12 @@ github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyT github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.20.2 h1:mQc3nmndL8ZBzStEo3JYF8wzmeWffDH4VbXz58sAx6Q= @@ -64,6 +69,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= github.com/hashicorp/go-immutable-radix/v2 v2.1.0 h1:CUW5RYIcysz+D3B+l1mDeXrQ7fUvGGCwJfdASSzbrfo= github.com/hashicorp/go-immutable-radix/v2 v2.1.0/go.mod h1:hgdqLXA4f6NIjRVisM1TJ9aOJVNRqKZj+xDGF6m7PBw= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= @@ -145,6 +152,22 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.23.0 h1:Df0pqjqExIywbMCMTxkAwzjLZtRf+bBKLbUcpxO2C9E= +go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFufObyB0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.0 h1:Lc6m+ytInMOSdTOGl+Y4qPzTlZ7QPb0pL+1JuUEt4Ao= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.0/go.mod h1:Rv15/kBGgH1lHvfd6Y0FlnMuy8F7MdSSiqVjn8Q8KUQ= +go.opentelemetry.io/otel/exporters/prometheus v0.45.1 h1:R/bW3afad6q6VGU+MFYpnEdo0stEARMCdhWu6+JI6aI= +go.opentelemetry.io/otel/exporters/prometheus v0.45.1/go.mod h1:wnHAfKRav5Dfp4iZhyWZ7SzQfT+rDZpEpYG7To+qJ1k= +go.opentelemetry.io/otel/metric v1.23.0 h1:pazkx7ss4LFVVYSxYew7L5I6qvLXHA0Ap2pwV+9Cnpo= +go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo= +go.opentelemetry.io/otel/sdk v1.23.0 h1:0KM9Zl2esnl+WSukEmlaAEjVY5HDZANOHferLq36BPc= +go.opentelemetry.io/otel/sdk v1.23.0/go.mod h1:wUscup7byToqyKJSilEtMf34FgdCAsFpFOjXnAwFfO0= +go.opentelemetry.io/otel/sdk/metric v1.23.0 h1:u81lMvmK6GMgN4Fty7K7S6cSKOZhMKJMK2TB+KaTs0I= +go.opentelemetry.io/otel/sdk/metric v1.23.0/go.mod h1:2LUOToN/FdX6wtfpHybOnCZjoZ6ViYajJYMiJ1LKDtQ= +go.opentelemetry.io/otel/trace v1.23.0 h1:37Ik5Ib7xfYVb4V1UtnT97T1jI+AoIYkJyPkuL4iJgI= +go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= +go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= +go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= go.starlark.net v0.0.0-20231121155337-90ade8b19d09 h1:hzy3LFnSN8kuQK8h9tHl4ndF6UruMj47OqwqsS+/Ai4= go.starlark.net v0.0.0-20231121155337-90ade8b19d09/go.mod h1:LcLNIzVOMp4oV+uusnpk+VU+SzXaJakUuBjoCSWH5dM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -224,6 +247,10 @@ gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= diff --git a/interceptor/config/metrics.go b/interceptor/config/metrics.go new file mode 100644 index 00000000..3def7730 --- /dev/null +++ b/interceptor/config/metrics.go @@ -0,0 +1,31 @@ +package config + +import ( + "github.com/kelseyhightower/envconfig" +) + +// Metrics is the configuration for configuring metrics in the interceptor. +type Metrics struct { + // Sets whether or not to enable the Prometheus metrics exporter + OtelPrometheusExporterEnabled bool `envconfig:"KEDA_HTTP_OTEL_PROM_EXPORTER_ENABLED" default:"true"` + // Sets the port which the Prometheus compatible metrics endpoint should be served on + OtelPrometheusExporterPort int `envconfig:"KEDA_HTTP_OTEL_PROM_EXPORTER_PORT" default:"2223"` + // Sets whether or not to enable the OTEL metrics exporter + OtelHTTPExporterEnabled bool `envconfig:"KEDA_HTTP_OTEL_HTTP_EXPORTER_ENABLED" default:"false"` + // Sets the HTTP endpoint where metrics should be sent to + OtelHTTPCollectorEndpoint string `envconfig:"KEDA_HTTP_OTEL_HTTP_COLLECTOR_ENDPOINT" default:"localhost:4318"` + // Sets the OTLP headers required by the otel exporter + OtelHTTPHeaders string `envconfig:"KEDA_HTTP_OTEL_HTTP_HEADERS" default:""` + // Set the connection to the otel HTTP collector endpoint to use HTTP rather than HTTPS + OtelHTTPCollectorInsecure bool `envconfig:"KEDA_HTTP_OTEL_HTTP_COLLECTOR_INSECURE" default:"false"` + // Set the interval in seconds to export otel metrics to the configured collector endpoint + OtelMetricExportInterval int `envconfig:"KEDA_HTTP_OTEL_METRIC_EXPORT_INTERVAL" default:"30"` +} + +// Parse parses standard configs using envconfig and returns a pointer to the +// newly created config. Returns nil and a non-nil error if parsing failed +func MustParseMetrics() *Metrics { + ret := new(Metrics) + envconfig.MustProcess("", ret) + return ret +} diff --git a/interceptor/main.go b/interceptor/main.go index f2d2c141..a3812205 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/sync/errgroup" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" @@ -17,6 +18,7 @@ import ( "github.com/kedacore/http-add-on/interceptor/config" "github.com/kedacore/http-add-on/interceptor/handler" + "github.com/kedacore/http-add-on/interceptor/metrics" "github.com/kedacore/http-add-on/interceptor/middleware" clientset "github.com/kedacore/http-add-on/operator/generated/clientset/versioned" informers "github.com/kedacore/http-add-on/operator/generated/informers/externalversions" @@ -39,6 +41,7 @@ var ( func main() { timeoutCfg := config.MustParseTimeouts() servingCfg := config.MustParseServing() + metricsCfg := config.MustParseMetrics() opts := zap.Options{ Development: true, @@ -59,11 +62,16 @@ func main() { timeoutCfg, "servingConfig", servingCfg, + "metricsConfig", + metricsCfg, ) proxyPort := servingCfg.ProxyPort adminPort := servingCfg.AdminPort + // setup the configured metrics collectors + metrics.NewMetricsCollectors(metricsCfg) + cfg := ctrl.GetConfigOrDie() cl, err := kubernetes.NewForConfig(cfg) @@ -139,6 +147,19 @@ func main() { return nil }) + if metricsCfg.OtelPrometheusExporterEnabled { + // start the prometheus compatible metrics server + // serves a prometheus compatible metrics endpoint on the configured port + eg.Go(func() error { + if err := runMetricsServer(ctx, ctrl.Log, metricsCfg); !util.IsIgnoredErr(err) { + setupLog.Error(err, "could not start the Prometheus metrics server") + return err + } + + return nil + }) + } + // start the proxy server. this is the server that // accepts, holds and forwards user requests eg.Go(func() error { @@ -181,6 +202,16 @@ func runAdminServer( return kedahttp.ServeContext(ctx, addr, adminServer) } +func runMetricsServer( + ctx context.Context, + lggr logr.Logger, + metricsCfg *config.Metrics, +) error { + lggr.Info("starting the prometheus metrics server", "port", metricsCfg.OtelPrometheusExporterPort, "path", "/metrics") + addr := fmt.Sprintf("0.0.0.0:%d", metricsCfg.OtelPrometheusExporterPort) + return kedahttp.ServeContext(ctx, addr, promhttp.Handler()) +} + func runProxyServer( ctx context.Context, logger logr.Logger, @@ -221,6 +252,10 @@ func runProxyServer( rootHandler, ) + rootHandler = middleware.NewMetrics( + rootHandler, + ) + addr := fmt.Sprintf("0.0.0.0:%d", port) logger.Info("proxy server starting", "address", addr) return kedahttp.ServeContext(ctx, addr, rootHandler) diff --git a/interceptor/metrics/metricscollector.go b/interceptor/metrics/metricscollector.go new file mode 100644 index 00000000..4921d702 --- /dev/null +++ b/interceptor/metrics/metricscollector.go @@ -0,0 +1,40 @@ +package metrics + +import ( + "github.com/kedacore/http-add-on/interceptor/config" +) + +var ( + collectors []Collector +) + +const meterName = "keda-interceptor-proxy" + +type Collector interface { + RecordRequestCount(method string, path string, responseCode int, host string) + RecordPendingRequestCount(host string, value int64) +} + +func NewMetricsCollectors(metricsConfig *config.Metrics) { + if metricsConfig.OtelPrometheusExporterEnabled { + promometrics := NewPrometheusMetrics() + collectors = append(collectors, promometrics) + } + + if metricsConfig.OtelHTTPExporterEnabled { + otelhttpmetrics := NewOtelMetrics(metricsConfig) + collectors = append(collectors, otelhttpmetrics) + } +} + +func RecordRequestCount(method string, path string, responseCode int, host string) { + for _, collector := range collectors { + collector.RecordRequestCount(method, path, responseCode, host) + } +} + +func RecordPendingRequestCount(host string, value int64) { + for _, collector := range collectors { + collector.RecordPendingRequestCount(host, value) + } +} diff --git a/interceptor/metrics/otelmetrics.go b/interceptor/metrics/otelmetrics.go new file mode 100644 index 00000000..91f4b8a3 --- /dev/null +++ b/interceptor/metrics/otelmetrics.go @@ -0,0 +1,115 @@ +package metrics + +import ( + "context" + "log" + "strings" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + api "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + + "github.com/kedacore/http-add-on/interceptor/config" + "github.com/kedacore/http-add-on/pkg/build" +) + +type OtelMetrics struct { + meter api.Meter + requestCounter api.Int64Counter + pendingRequestCounter api.Int64UpDownCounter +} + +func NewOtelMetrics(metricsConfig *config.Metrics, options ...metric.Option) *OtelMetrics { + ctx := context.Background() + var exporter *otlpmetrichttp.Exporter + var err error + endpoint := otlpmetrichttp.WithEndpoint(metricsConfig.OtelHTTPCollectorEndpoint) + headersFromEnvVar := getHeaders(metricsConfig.OtelHTTPHeaders) + headers := otlpmetrichttp.WithHeaders(headersFromEnvVar) + + if metricsConfig.OtelHTTPCollectorInsecure { + insecure := otlpmetrichttp.WithInsecure() + exporter, err = otlpmetrichttp.New(ctx, endpoint, headers, insecure) + } else { + exporter, err = otlpmetrichttp.New(ctx, endpoint, headers) + } + + if err != nil { + log.Fatalf("could not create otelmetrichttp exporter: %v", err) + } + + if options == nil { + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("interceptor-proxy"), + semconv.ServiceVersionKey.String(build.Version()), + ) + + options = []metric.Option{ + metric.WithReader(metric.NewPeriodicReader(exporter, metric.WithInterval(time.Duration(metricsConfig.OtelMetricExportInterval)*time.Second))), + metric.WithResource(res), + } + } + + provider := metric.NewMeterProvider(options...) + meter := provider.Meter(meterName) + + reqCounter, err := meter.Int64Counter("interceptor_request_count", api.WithDescription("a counter of requests processed by the interceptor proxy")) + if err != nil { + log.Fatalf("could not create new otelhttpmetric request counter: %v", err) + } + + pendingRequestCounter, err := meter.Int64UpDownCounter("interceptor_pending_request_count", api.WithDescription("a count of requests pending forwarding by the interceptor proxy")) + if err != nil { + log.Fatalf("could not create new otelhttpmetric pending request counter: %v", err) + } + + return &OtelMetrics{ + meter: meter, + requestCounter: reqCounter, + pendingRequestCounter: pendingRequestCounter, + } +} + +func (om *OtelMetrics) RecordRequestCount(method string, path string, responseCode int, host string) { + ctx := context.Background() + opt := api.WithAttributeSet( + attribute.NewSet( + attribute.Key("method").String(method), + attribute.Key("path").String(path), + attribute.Key("code").Int(responseCode), + attribute.Key("host").String(host), + ), + ) + om.requestCounter.Add(ctx, 1, opt) +} + +func (om *OtelMetrics) RecordPendingRequestCount(host string, value int64) { + ctx := context.Background() + opt := api.WithAttributeSet( + attribute.NewSet( + attribute.Key("host").String(host), + ), + ) + + om.pendingRequestCounter.Add(ctx, value, opt) +} + +func getHeaders(s string) map[string]string { + // Get the headers in key-value pair from the KEDA_HTTP_OTEL_HTTP_HEADERS environment variable + var m = map[string]string{} + + if s != "" { + h := strings.Split(s, ",") + for _, v := range h { + x := strings.Split(v, "=") + m[x[0]] = x[1] + } + } + + return m +} diff --git a/interceptor/metrics/otelmetrics_test.go b/interceptor/metrics/otelmetrics_test.go new file mode 100644 index 00000000..af5ac7c1 --- /dev/null +++ b/interceptor/metrics/otelmetrics_test.go @@ -0,0 +1,61 @@ +package metrics + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/kedacore/http-add-on/interceptor/config" +) + +var ( + testOtel *OtelMetrics + testReader metric.Reader +) + +func init() { + testReader = metric.NewManualReader() + options := metric.WithReader(testReader) + metricsCfg := config.Metrics{} + testOtel = NewOtelMetrics(&metricsCfg, options) +} + +func TestRequestCounter(t *testing.T) { + testOtel.RecordRequestCount("GET", "/test", 200, "test-host-1") + got := metricdata.ResourceMetrics{} + err := testReader.Collect(context.Background(), &got) + + assert.Nil(t, err) + scopeMetrics := got.ScopeMetrics[0] + assert.NotEqual(t, len(scopeMetrics.Metrics), 0) + + metricInfo := retrieveMetric(scopeMetrics.Metrics, "interceptor_request_count") + data := metricInfo.Data.(metricdata.Sum[int64]).DataPoints[0] + assert.Equal(t, data.Value, int64(1)) +} + +func TestPendingRequestCounter(t *testing.T) { + testOtel.RecordPendingRequestCount("test-host", 5) + got := metricdata.ResourceMetrics{} + err := testReader.Collect(context.Background(), &got) + + assert.Nil(t, err) + scopeMetrics := got.ScopeMetrics[0] + assert.NotEqual(t, len(scopeMetrics.Metrics), 0) + + metricInfo := retrieveMetric(scopeMetrics.Metrics, "interceptor_pending_request_count") + data := metricInfo.Data.(metricdata.Sum[int64]).DataPoints[0] + assert.Equal(t, data.Value, int64(5)) +} + +func retrieveMetric(metrics []metricdata.Metrics, metricname string) *metricdata.Metrics { + for _, m := range metrics { + if m.Name == metricname { + return &m + } + } + return nil +} diff --git a/interceptor/metrics/prommetrics.go b/interceptor/metrics/prommetrics.go new file mode 100644 index 00000000..0af57a77 --- /dev/null +++ b/interceptor/metrics/prommetrics.go @@ -0,0 +1,86 @@ +package metrics + +import ( + "context" + "log" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/prometheus" + api "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + + "github.com/kedacore/http-add-on/pkg/build" +) + +type PrometheusMetrics struct { + meter api.Meter + requestCounter api.Int64Counter + pendingRequestCounter api.Int64UpDownCounter +} + +func NewPrometheusMetrics(options ...prometheus.Option) *PrometheusMetrics { + var exporter *prometheus.Exporter + var err error + if options == nil { + exporter, err = prometheus.New() + } else { + exporter, err = prometheus.New(options...) + } + if err != nil { + log.Fatalf("could not create Prometheus exporter: %v", err) + } + + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("interceptor-proxy"), + semconv.ServiceVersionKey.String(build.Version()), + ) + + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + meter := provider.Meter(meterName) + + reqCounter, err := meter.Int64Counter("interceptor_request_count", api.WithDescription("a counter of requests processed by the interceptor proxy")) + if err != nil { + log.Fatalf("could not create new Prometheus request counter: %v", err) + } + + pendingRequestCounter, err := meter.Int64UpDownCounter("interceptor_pending_request_count", api.WithDescription("a count of requests pending forwarding by the interceptor proxy")) + if err != nil { + log.Fatalf("could not create new Prometheus pending request counter: %v", err) + } + + return &PrometheusMetrics{ + meter: meter, + requestCounter: reqCounter, + pendingRequestCounter: pendingRequestCounter, + } +} + +func (p *PrometheusMetrics) RecordRequestCount(method string, path string, responseCode int, host string) { + ctx := context.Background() + opt := api.WithAttributeSet( + attribute.NewSet( + attribute.Key("method").String(method), + attribute.Key("path").String(path), + attribute.Key("code").Int(responseCode), + attribute.Key("host").String(host), + ), + ) + p.requestCounter.Add(ctx, 1, opt) +} + +func (p *PrometheusMetrics) RecordPendingRequestCount(host string, value int64) { + ctx := context.Background() + opt := api.WithAttributeSet( + attribute.NewSet( + attribute.Key("host").String(host), + ), + ) + + p.pendingRequestCounter.Add(ctx, value, opt) +} diff --git a/interceptor/metrics/prommetrics_test.go b/interceptor/metrics/prommetrics_test.go new file mode 100644 index 00000000..c7f82838 --- /dev/null +++ b/interceptor/metrics/prommetrics_test.go @@ -0,0 +1,57 @@ +package metrics + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + promexporter "go.opentelemetry.io/otel/exporters/prometheus" +) + +func TestPromRequestCountMetric(t *testing.T) { + testRegistry := prometheus.NewRegistry() + options := []promexporter.Option{promexporter.WithRegisterer(testRegistry)} + testPrometheus := NewPrometheusMetrics(options...) + expectedOutput := ` + # HELP interceptor_request_count_total a counter of requests processed by the interceptor proxy + # TYPE interceptor_request_count_total counter + interceptor_request_count_total{code="500",host="test-host",method="post",otel_scope_name="keda-interceptor-proxy",otel_scope_version="",path="/test"} 1 + interceptor_request_count_total{code="200",host="test-host",method="post",otel_scope_name="keda-interceptor-proxy",otel_scope_version="",path="/test"} 1 + # HELP otel_scope_info Instrumentation Scope metadata + # TYPE otel_scope_info gauge + otel_scope_info{otel_scope_name="keda-interceptor-proxy",otel_scope_version=""} 1 + # HELP target_info Target metadata + # TYPE target_info gauge + target_info{service_name="interceptor-proxy",service_version="main"} 1 + + ` + expectedOutputReader := strings.NewReader(expectedOutput) + testPrometheus.RecordRequestCount("post", "/test", 500, "test-host") + testPrometheus.RecordRequestCount("post", "/test", 200, "test-host") + err := testutil.CollectAndCompare(testRegistry, expectedOutputReader) + assert.Nil(t, err) +} + +func TestPromPendingRequestCountMetric(t *testing.T) { + testRegistry := prometheus.NewRegistry() + options := []promexporter.Option{promexporter.WithRegisterer(testRegistry)} + testPrometheus := NewPrometheusMetrics(options...) + expectedOutput := ` + # HELP interceptor_pending_request_count a count of requests pending forwarding by the interceptor proxy + # TYPE interceptor_pending_request_count gauge + interceptor_pending_request_count{host="test-host",otel_scope_name="keda-interceptor-proxy",otel_scope_version=""} 10 + # HELP otel_scope_info Instrumentation Scope metadata + # TYPE otel_scope_info gauge + otel_scope_info{otel_scope_name="keda-interceptor-proxy",otel_scope_version=""} 1 + # HELP target_info Target metadata + # TYPE target_info gauge + target_info{service_name="interceptor-proxy",service_version="main"} 1 + + ` + expectedOutputReader := strings.NewReader(expectedOutput) + testPrometheus.RecordPendingRequestCount("test-host", 10) + err := testutil.CollectAndCompare(testRegistry, expectedOutputReader) + assert.Nil(t, err) +} diff --git a/interceptor/middleware/counting.go b/interceptor/middleware/counting.go index 7036c5ff..642a7ca5 100644 --- a/interceptor/middleware/counting.go +++ b/interceptor/middleware/counting.go @@ -6,6 +6,7 @@ import ( "github.com/go-logr/logr" + "github.com/kedacore/http-add-on/interceptor/metrics" "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/queue" "github.com/kedacore/http-add-on/pkg/util" @@ -68,6 +69,8 @@ func (cm *Counting) inc(logger logr.Logger, key string) bool { return false } + metrics.RecordPendingRequestCount(key, int64(1)) + return true } @@ -78,5 +81,7 @@ func (cm *Counting) dec(logger logr.Logger, key string) bool { return false } + metrics.RecordPendingRequestCount(key, int64(-1)) + return true } diff --git a/interceptor/middleware/logging.go b/interceptor/middleware/logging.go index fd9730a1..1421b2d4 100644 --- a/interceptor/middleware/logging.go +++ b/interceptor/middleware/logging.go @@ -31,7 +31,7 @@ var _ http.Handler = (*Logging)(nil) func (lm *Logging) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = util.RequestWithLogger(r, lm.logger.WithName("LoggingMiddleware")) - w = newLoggingResponseWriter(w) + w = newResponseWriter(w) var sw util.Stopwatch defer lm.logAsync(w, r, &sw) @@ -50,9 +50,9 @@ func (lm *Logging) log(w http.ResponseWriter, r *http.Request, sw *util.Stopwatc ctx := r.Context() logger := util.LoggerFromContext(ctx) - lrw := w.(*loggingResponseWriter) + lrw := w.(*responseWriter) if lrw == nil { - lrw = newLoggingResponseWriter(w) + lrw = newResponseWriter(w) } timestamp := sw.StartTime().Format(CombinedLogTimeFormat) diff --git a/interceptor/middleware/loggingresponsewriter.go b/interceptor/middleware/loggingresponsewriter.go deleted file mode 100644 index 9c893f1c..00000000 --- a/interceptor/middleware/loggingresponsewriter.go +++ /dev/null @@ -1,48 +0,0 @@ -package middleware - -import ( - "net/http" -) - -type loggingResponseWriter struct { - downstreamResponseWriter http.ResponseWriter - bytesWritten int - statusCode int -} - -func newLoggingResponseWriter(downstreamResponseWriter http.ResponseWriter) *loggingResponseWriter { - return &loggingResponseWriter{ - downstreamResponseWriter: downstreamResponseWriter, - } -} - -func (lrw *loggingResponseWriter) BytesWritten() int { - return lrw.bytesWritten -} - -func (lrw *loggingResponseWriter) StatusCode() int { - return lrw.statusCode -} - -var _ http.ResponseWriter = (*loggingResponseWriter)(nil) - -func (lrw *loggingResponseWriter) Header() http.Header { - return lrw.downstreamResponseWriter.Header() -} - -func (lrw *loggingResponseWriter) Write(bytes []byte) (int, error) { - n, err := lrw.downstreamResponseWriter.Write(bytes) - if f, ok := lrw.downstreamResponseWriter.(http.Flusher); ok { - f.Flush() - } - - lrw.bytesWritten += n - - return n, err -} - -func (lrw *loggingResponseWriter) WriteHeader(statusCode int) { - lrw.downstreamResponseWriter.WriteHeader(statusCode) - - lrw.statusCode = statusCode -} diff --git a/interceptor/middleware/metrics.go b/interceptor/middleware/metrics.go new file mode 100644 index 00000000..2e0a165c --- /dev/null +++ b/interceptor/middleware/metrics.go @@ -0,0 +1,37 @@ +package middleware + +import ( + "net/http" + + "github.com/kedacore/http-add-on/interceptor/metrics" +) + +type Metrics struct { + upstreamHandler http.Handler +} + +func NewMetrics(upstreamHandler http.Handler) *Metrics { + return &Metrics{ + upstreamHandler: upstreamHandler, + } +} + +func (m *Metrics) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w = newResponseWriter(w) + + defer m.metrics(w, r) + + m.upstreamHandler.ServeHTTP(w, r) +} + +func (m *Metrics) metrics(w http.ResponseWriter, r *http.Request) { + mrw := w.(*responseWriter) + if mrw == nil { + mrw = newResponseWriter(w) + } + + // exclude readiness & liveness probes from the emitted metrics + if r.URL.Path != "/livez" && r.URL.Path != "/readyz" { + metrics.RecordRequestCount(r.Method, r.URL.Path, mrw.statusCode, r.Host) + } +} diff --git a/interceptor/middleware/responsewriter.go b/interceptor/middleware/responsewriter.go new file mode 100644 index 00000000..ed18051c --- /dev/null +++ b/interceptor/middleware/responsewriter.go @@ -0,0 +1,48 @@ +package middleware + +import ( + "net/http" +) + +type responseWriter struct { + downstreamResponseWriter http.ResponseWriter + bytesWritten int + statusCode int +} + +func newResponseWriter(downstreamResponseWriter http.ResponseWriter) *responseWriter { + return &responseWriter{ + downstreamResponseWriter: downstreamResponseWriter, + } +} + +func (rw *responseWriter) BytesWritten() int { + return rw.bytesWritten +} + +func (rw *responseWriter) StatusCode() int { + return rw.statusCode +} + +var _ http.ResponseWriter = (*responseWriter)(nil) + +func (rw *responseWriter) Header() http.Header { + return rw.downstreamResponseWriter.Header() +} + +func (rw *responseWriter) Write(bytes []byte) (int, error) { + n, err := rw.downstreamResponseWriter.Write(bytes) + if f, ok := rw.downstreamResponseWriter.(http.Flusher); ok { + f.Flush() + } + + rw.bytesWritten += n + + return n, err +} + +func (rw *responseWriter) WriteHeader(statusCode int) { + rw.downstreamResponseWriter.WriteHeader(statusCode) + + rw.statusCode = statusCode +} diff --git a/interceptor/middleware/loggingresponsewriter_test.go b/interceptor/middleware/responsewriter_test.go similarity index 71% rename from interceptor/middleware/loggingresponsewriter_test.go rename to interceptor/middleware/responsewriter_test.go index 2a920a2f..860b4265 100644 --- a/interceptor/middleware/loggingresponsewriter_test.go +++ b/interceptor/middleware/responsewriter_test.go @@ -8,18 +8,18 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("loggingResponseWriter", func() { +var _ = Describe("responseWriter", func() { Context("New", func() { It("returns new object with expected field values set", func() { var ( w = httptest.NewRecorder() ) - lrw := newLoggingResponseWriter(w) - Expect(lrw).NotTo(BeNil()) - Expect(lrw.downstreamResponseWriter).To(Equal(w)) - Expect(lrw.bytesWritten).To(Equal(0)) - Expect(lrw.statusCode).To(Equal(0)) + rw := newResponseWriter(w) + Expect(rw).NotTo(BeNil()) + Expect(rw.downstreamResponseWriter).To(Equal(w)) + Expect(rw.bytesWritten).To(Equal(0)) + Expect(rw.statusCode).To(Equal(0)) }) }) @@ -29,11 +29,11 @@ var _ = Describe("loggingResponseWriter", func() { bw = 128 ) - lrw := &loggingResponseWriter{ + rw := &responseWriter{ bytesWritten: bw, } - ret := lrw.BytesWritten() + ret := rw.BytesWritten() Expect(ret).To(Equal(bw)) }) }) @@ -44,11 +44,11 @@ var _ = Describe("loggingResponseWriter", func() { sc = http.StatusTeapot ) - lrw := &loggingResponseWriter{ + rw := &responseWriter{ statusCode: sc, } - ret := lrw.StatusCode() + ret := rw.StatusCode() Expect(ret).To(Equal(sc)) }) }) @@ -59,14 +59,14 @@ var _ = Describe("loggingResponseWriter", func() { w = httptest.NewRecorder() ) - lrw := &loggingResponseWriter{ + rw := &responseWriter{ downstreamResponseWriter: w, } h := w.Header() h.Set("Content-Type", "application/json") - ret := lrw.Header() + ret := rw.Header() Expect(ret).To(Equal(h)) }) }) @@ -83,16 +83,16 @@ var _ = Describe("loggingResponseWriter", func() { w = httptest.NewRecorder() ) - lrw := &loggingResponseWriter{ + rw := &responseWriter{ bytesWritten: initialBW, downstreamResponseWriter: w, } - n, err := lrw.Write([]byte(body)) + n, err := rw.Write([]byte(body)) Expect(err).To(BeNil()) Expect(n).To(Equal(bodyLen)) - Expect(lrw.bytesWritten).To(Equal(initialBW + bodyLen)) + Expect(rw.bytesWritten).To(Equal(initialBW + bodyLen)) Expect(w.Body.String()).To(Equal(body)) }) @@ -108,13 +108,13 @@ var _ = Describe("loggingResponseWriter", func() { w = httptest.NewRecorder() ) - lrw := &loggingResponseWriter{ + rw := &responseWriter{ statusCode: http.StatusOK, downstreamResponseWriter: w, } - lrw.WriteHeader(sc) + rw.WriteHeader(sc) - Expect(lrw.statusCode).To(Equal(sc)) + Expect(rw.statusCode).To(Equal(sc)) Expect(w.Code).To(Equal(sc)) }) diff --git a/tests/checks/interceptor_otel_metrics/interceptor_otel_metrics_test.go b/tests/checks/interceptor_otel_metrics/interceptor_otel_metrics_test.go new file mode 100644 index 00000000..bceb5411 --- /dev/null +++ b/tests/checks/interceptor_otel_metrics/interceptor_otel_metrics_test.go @@ -0,0 +1,241 @@ +//go:build e2e +// +build e2e + +package interceptor_otel_metrics_test + +import ( + "fmt" + "strings" + "testing" + "time" + + prommodel "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/http-add-on/tests/helper" +) + +const ( + testName = "interceptor-otel-metrics-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + serviceName = fmt.Sprintf("%s-service", testName) + clientName = fmt.Sprintf("%s-client", testName) + httpScaledObjectName = fmt.Sprintf("%s-http-so", testName) + host = testName + minReplicaCount = 0 + maxReplicaCount = 1 + otelCollectorPromURL = "http://opentelemetry-collector.open-telemetry-system:8889/metrics" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ServiceName string + ClientName string + HTTPScaledObjectName string + Host string + MinReplicas int + MaxReplicas int +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + ports: + - port: 8080 + targetPort: http + protocol: TCP + name: http + selector: + app: {{.DeploymentName}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}} + image: registry.k8s.io/e2e-test-images/agnhost:2.45 + args: + - netexec + ports: + - name: http + containerPort: 8080 + protocol: TCP + readinessProbe: + httpGet: + path: / + port: http +` + + loadJobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: generate-request + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: curl-client + image: curlimages/curl + imagePullPolicy: Always + command: ["curl", "-H", "Host: {{.Host}}", "keda-http-add-on-interceptor-proxy.keda:8080"] + restartPolicy: Never + activeDeadlineSeconds: 600 + backoffLimit: 5 +` + + httpScaledObjectTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + deployment: {{.DeploymentName}} + service: {{.ServiceName}} + port: 8080 + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} +` + + clientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.ClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.ClientName}} + image: curlimages/curl + command: + - sh + - -c + - "exec tail -f /dev/null"` +) + +func TestMetricGeneration(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", minReplicaCount) + + // Send a test request to the interceptor + sendLoad(t, kc, data) + + // Fetch metrics and validate them + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", otelCollectorPromURL)) + val, ok := family["interceptor_request_count_total"] + // If the metric is not found first time around then retry with a delay. + if !ok { + // Add a small sleep to allow metrics to be pushed from the exporter to the collector + time.Sleep(5 * time.Second) + // Fetch metrics and validate them + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", otelCollectorPromURL)) + val, ok = family["interceptor_request_count_total"] + } + assert.True(t, ok, "interceptor_request_count_total is available") + + requestCount := getMetricsValue(val) + assert.GreaterOrEqual(t, requestCount, float64(1)) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func sendLoad(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- sending load ---") + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) +} + +func fetchAndParsePrometheusMetrics(t *testing.T, cmd string) map[string]*prommodel.MetricFamily { + out, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, cmd) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + + parser := expfmt.TextParser{} + // Ensure EOL + reader := strings.NewReader(strings.ReplaceAll(out, "\r\n", "\n")) + families, err := parser.TextToMetricFamilies(reader) + assert.NoErrorf(t, err, "cannot parse metrics - %s", err) + + return families +} + +func getMetricsValue(val *prommodel.MetricFamily) float64 { + if val.GetName() == "interceptor_request_count_total" { + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "host" && *label.Value == testName { + return metric.GetCounter().GetValue() + } + } + } + } + return 0 +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ServiceName: serviceName, + ClientName: clientName, + HTTPScaledObjectName: httpScaledObjectName, + Host: host, + MinReplicas: minReplicaCount, + MaxReplicas: maxReplicaCount, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "serviceNameTemplate", Config: serviceTemplate}, + {Name: "clientTemplate", Config: clientTemplate}, + {Name: "httpScaledObjectTemplate", Config: httpScaledObjectTemplate}, + } +} diff --git a/tests/checks/interceptor_prometheus_metrics/interceptor_prometheus_metrics_test.go b/tests/checks/interceptor_prometheus_metrics/interceptor_prometheus_metrics_test.go new file mode 100644 index 00000000..52981265 --- /dev/null +++ b/tests/checks/interceptor_prometheus_metrics/interceptor_prometheus_metrics_test.go @@ -0,0 +1,232 @@ +//go:build e2e +// +build e2e + +package interceptor_prometheus_metrics_test + +import ( + "fmt" + "strings" + "testing" + + prommodel "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/http-add-on/tests/helper" +) + +const ( + testName = "interceptor-prom-metrics-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + serviceName = fmt.Sprintf("%s-service", testName) + clientName = fmt.Sprintf("%s-client", testName) + httpScaledObjectName = fmt.Sprintf("%s-http-so", testName) + host = testName + minReplicaCount = 0 + maxReplicaCount = 1 + kedaInterceptorPrometheusURL = "http://keda-http-add-on-interceptor-metrics.keda:2223/metrics" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ServiceName string + ClientName string + HTTPScaledObjectName string + Host string + MinReplicas int + MaxReplicas int +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + ports: + - port: 8080 + targetPort: http + protocol: TCP + name: http + selector: + app: {{.DeploymentName}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}} + image: registry.k8s.io/e2e-test-images/agnhost:2.45 + args: + - netexec + ports: + - name: http + containerPort: 8080 + protocol: TCP + readinessProbe: + httpGet: + path: / + port: http +` + + loadJobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: generate-request + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: curl-client + image: curlimages/curl + imagePullPolicy: Always + command: ["curl", "-H", "Host: {{.Host}}", "keda-http-add-on-interceptor-proxy.keda:8080"] + restartPolicy: Never + activeDeadlineSeconds: 600 + backoffLimit: 5 +` + + httpScaledObjectTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + deployment: {{.DeploymentName}} + service: {{.ServiceName}} + port: 8080 + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} +` + + clientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.ClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.ClientName}} + image: curlimages/curl + command: + - sh + - -c + - "exec tail -f /dev/null"` +) + +func TestMetricGeneration(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", minReplicaCount) + + // Send a test request to the interceptor + sendLoad(t, kc, data) + + // Fetch metrics and validate them + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaInterceptorPrometheusURL)) + val, ok := family["interceptor_request_count_total"] + assert.True(t, ok, "interceptor_request_count_total is available") + + requestCount := getMetricsValue(val) + assert.GreaterOrEqual(t, requestCount, float64(1)) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func sendLoad(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- sending load ---") + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) +} + +func fetchAndParsePrometheusMetrics(t *testing.T, cmd string) map[string]*prommodel.MetricFamily { + out, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, cmd) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + + parser := expfmt.TextParser{} + // Ensure EOL + reader := strings.NewReader(strings.ReplaceAll(out, "\r\n", "\n")) + families, err := parser.TextToMetricFamilies(reader) + assert.NoErrorf(t, err, "cannot parse metrics - %s", err) + + return families +} + +func getMetricsValue(val *prommodel.MetricFamily) float64 { + if val.GetName() == "interceptor_request_count_total" { + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "host" && *label.Value == testName { + return metric.GetCounter().GetValue() + } + } + } + } + return 0 +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ServiceName: serviceName, + ClientName: clientName, + HTTPScaledObjectName: httpScaledObjectName, + Host: host, + MinReplicas: minReplicaCount, + MaxReplicas: maxReplicaCount, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "serviceNameTemplate", Config: serviceTemplate}, + {Name: "clientTemplate", Config: clientTemplate}, + {Name: "httpScaledObjectTemplate", Config: httpScaledObjectTemplate}, + } +} diff --git a/tests/utils/cleanup_test.go b/tests/utils/cleanup_test.go index e644ab6a..f379dce8 100644 --- a/tests/utils/cleanup_test.go +++ b/tests/utils/cleanup_test.go @@ -40,3 +40,10 @@ func TestRemoveArgoRollouts(t *testing.T) { func TestRemoveKEDANamespace(t *testing.T) { DeleteNamespace(t, KEDANamespace) } + +func TestRemoveOpentelemetryComponents(t *testing.T) { + OpentelemetryNamespace := "open-telemetry-system" + _, err := ExecuteCommand(fmt.Sprintf("helm uninstall opentelemetry-collector --namespace %s", OpentelemetryNamespace)) + require.NoErrorf(t, err, "cannot uninstall opentelemetry-collector - %s", err) + DeleteNamespace(t, OpentelemetryNamespace) +} diff --git a/tests/utils/setup_test.go b/tests/utils/setup_test.go index 00056600..63182655 100644 --- a/tests/utils/setup_test.go +++ b/tests/utils/setup_test.go @@ -5,6 +5,7 @@ package utils import ( "fmt" + "os" "os/exec" "testing" @@ -14,6 +15,62 @@ import ( . "github.com/kedacore/http-add-on/tests/helper" ) +var ( + OtlpConfig = `mode: deployment +image: + repository: "otel/opentelemetry-collector-contrib" +config: + exporters: + logging: + loglevel: debug + prometheus: + endpoint: 0.0.0.0:8889 + receivers: + jaeger: null + prometheus: null + zipkin: null + service: + pipelines: + traces: null + metrics: + receivers: + - otlp + exporters: + - logging + - prometheus + logs: null +` + OtlpServicePatch = `apiVersion: v1 +kind: Service +metadata: + name: opentelemetry-collector +spec: + selector: + app.kubernetes.io/name: opentelemetry-collector + ports: + - protocol: TCP + port: 8889 + targetPort: 8889 + name: prometheus + type: ClusterIP + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: opentelemetry-collector +spec: + template: + spec: + containers: + - name: opentelemetry-collector + ports: + - containerPort: 8889 + name: prometheus + protocol: TCP +` +) + func TestVerifyCommands(t *testing.T) { commands := []string{"kubectl"} for _, cmd := range commands { @@ -122,3 +179,35 @@ func TestDeployKEDAHttpAddOn(t *testing.T) { t.Log(string(out)) t.Log("KEDA Http Add-on deployed successfully using 'make deploy' command") } + +func TestSetupOpentelemetryComponents(t *testing.T) { + OpentelemetryNamespace := "open-telemetry-system" + otlpTempFileName := "otlp.yml" + otlpServiceTempFileName := "otlpServicePatch.yml" + defer os.Remove(otlpTempFileName) + defer os.Remove(otlpServiceTempFileName) + err := os.WriteFile(otlpTempFileName, []byte(OtlpConfig), 0755) + assert.NoErrorf(t, err, "cannot create otlp config file - %s", err) + + err = os.WriteFile(otlpServiceTempFileName, []byte(OtlpServicePatch), 0755) + assert.NoErrorf(t, err, "cannot create otlp service patch file - %s", err) + + _, err = ExecuteCommand("helm version") + require.NoErrorf(t, err, "helm is not installed - %s", err) + + _, err = ExecuteCommand("helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts") + require.NoErrorf(t, err, "cannot add open-telemetry helm repo - %s", err) + + _, err = ExecuteCommand("helm repo update open-telemetry") + require.NoErrorf(t, err, "cannot update open-telemetry helm repo - %s", err) + + KubeClient = GetKubernetesClient(t) + CreateNamespace(t, KubeClient, OpentelemetryNamespace) + + _, err = ExecuteCommand(fmt.Sprintf("helm upgrade --install opentelemetry-collector open-telemetry/opentelemetry-collector -f %s --namespace %s", otlpTempFileName, OpentelemetryNamespace)) + + require.NoErrorf(t, err, "cannot install opentelemetry - %s", err) + + _, err = ExecuteCommand(fmt.Sprintf("kubectl apply -f %s -n %s", otlpServiceTempFileName, OpentelemetryNamespace)) + require.NoErrorf(t, err, "cannot update opentelemetry ports - %s", err) +}