From ee4ab1b7190dfed93774fd79051bad7c7261d867 Mon Sep 17 00:00:00 2001 From: Jirka Kremser <535866+jkremser@users.noreply.github.com> Date: Tue, 16 Apr 2024 23:03:31 +0200 Subject: [PATCH 1/6] No need to list all secret in the namespace to find just one (#5669) Signed-off-by: Jirka Kremser --- CHANGELOG.md | 1 + pkg/certificates/certificate_manager.go | 27 ++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc7219db7d4..524e8e33c05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ New deprecation(s): - **General**: Introduce ENABLE_OPENTELEMETRY in deploying/testing process ([#5375](https://github.com/kedacore/keda/issues/5375)|[#5578](https://github.com/kedacore/keda/issues/5578)) - **General**: Migrate away from unmaintained golang/mock and use uber/gomock ([#5440](https://github.com/kedacore/keda/issues/5440)) - **General**: Minor refactor to reduce copy/paste code in ScaledObject webhook ([#5397](https://github.com/kedacore/keda/issues/5397)) +- **General**: No need to list all secret in the namespace to find just one ([#5669](https://github.com/kedacore/keda/pull/5669)) - **Kafka**: Expose GSSAPI service name ([#5474](https://github.com/kedacore/keda/issues/5474)) ## v2.13.1 diff --git a/pkg/certificates/certificate_manager.go b/pkg/certificates/certificate_manager.go index 5f390e1b205..abd4e5e806d 100644 --- a/pkg/certificates/certificate_manager.go +++ b/pkg/certificates/certificate_manager.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" "github.com/open-policy-agent/cert-controller/pkg/rotator" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -109,26 +110,24 @@ func getDNSNames(service, k8sClusterDomain string) []string { // ensureSecret ensures that the secret used for storing TLS certificates exists func (cm CertManager) ensureSecret(ctx context.Context, mgr manager.Manager, secretName string) error { - secrets := &corev1.SecretList{} + secret := &corev1.Secret{} kedaNamespace := kedautil.GetPodNamespace() - opt := &client.ListOptions{ + objKey := client.ObjectKey{ Namespace: kedaNamespace, + Name: secretName, } - - err := mgr.GetAPIReader().List(ctx, secrets, opt) + create := false + err := mgr.GetAPIReader().Get(ctx, objKey, secret) if err != nil { - cm.Logger.Error(err, "unable to check secrets") - return err - } - - exists := false - for _, secret := range secrets.Items { - if secret.Name == secretName { - exists = true - break + if errors.IsNotFound(err) { + create = true + } else { + cm.Logger.Error(err, "unable to check secret") + return err } } - if !exists { + + if create { secret := &corev1.Secret{ ObjectMeta: v1.ObjectMeta{ Name: secretName, From 37222697da84bcf53d078232bcb2e64de0077bb1 Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Fri, 19 Apr 2024 19:08:59 +0200 Subject: [PATCH 2/6] Reworks the prometheus metrics to adhere to best practices (#5687) Signed-off-by: Bernard Grymonpon Signed-off-by: Bernard Grymonpon Signed-off-by: Jan Wozniak Co-authored-by: Bernard Grymonpon --- CHANGELOG.md | 3 +- config/grafana/keda-dashboard.json | 6 +- pkg/metricscollector/metricscollectors.go | 10 +- pkg/metricscollector/opentelemetry.go | 107 ++++++++--- pkg/metricscollector/opentelemetry_test.go | 35 +++- pkg/metricscollector/prommetrics.go | 170 ++++++++++++++---- .../webhook/webhook_prommetrics.go | 22 +++ pkg/scaling/cache/scalers_cache.go | 6 +- pkg/scaling/scale_handler.go | 12 +- 9 files changed, 285 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 524e8e33c05..c86efce55e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features: - **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375)) - **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441)) - **General**: Add `validations.keda.sh/hpa-ownership` annotation to HPA to disable ownership validation ([#5516](https://github.com/kedacore/keda/issues/5516)) +- **General**: Improve Prometheus metrics to align with best practices ([#4854](https://github.com/kedacore/keda/issues/4854)) - **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670)) - **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574)) - **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633)) @@ -95,7 +96,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked New deprecation(s): -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- Various Prometheus metrics have been renamed to follow the preferred naming conventions. The old ones are still available, but will be removed in the future ([#4854](https://github.com/kedacore/keda/issues/4854)). ### Breaking Changes diff --git a/config/grafana/keda-dashboard.json b/config/grafana/keda-dashboard.json index b717dd424c0..afcb3d4f0bb 100644 --- a/config/grafana/keda-dashboard.json +++ b/config/grafana/keda-dashboard.json @@ -173,7 +173,7 @@ "uid": "${datasource}" }, "editorMode": "code", - "expr": "sum by(job) (rate(keda_scaler_errors{}[5m]))", + "expr": "sum by(job) (rate(keda_scaler_detail_errors_total{}[5m]))", "legendFormat": "{{ job }}", "range": true, "refId": "A" @@ -313,7 +313,7 @@ "uid": "${datasource}" }, "editorMode": "code", - "expr": "sum by(scaler) (rate(keda_scaler_errors{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\", scaler=~\"$scaler\"}[5m]))", + "expr": "sum by(scaler) (rate(keda_scaler_detail_errors_total{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\", scaler=~\"$scaler\"}[5m]))", "legendFormat": "{{ scaler }}", "range": true, "refId": "A" @@ -423,7 +423,7 @@ "uid": "${datasource}" }, "editorMode": "code", - "expr": "sum by(scaledObject) (rate(keda_scaled_object_errors{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\"}[5m]))", + "expr": "sum by(scaledObject) (rate(keda_scaled_object_errors_total{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\"}[5m]))", "legendFormat": "{{ scaledObject }}", "range": true, "refId": "A" diff --git a/pkg/metricscollector/metricscollectors.go b/pkg/metricscollector/metricscollectors.go index ef6458b76b7..d1eec2634e5 100644 --- a/pkg/metricscollector/metricscollectors.go +++ b/pkg/metricscollector/metricscollectors.go @@ -17,6 +17,8 @@ limitations under the License. package metricscollector import ( + "time" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" ) @@ -39,10 +41,10 @@ type MetricsCollector interface { RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) // RecordScalerLatency create a measurement of the latency to external metric - RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) + RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) // RecordScalableObjectLatency create a measurement of the latency executing scalable object loop - RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) + RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) // RecordScalerActive create a measurement of the activity of the scaler RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) @@ -101,14 +103,14 @@ func RecordScalerMetric(namespace string, scaledObject string, scaler string, tr } // RecordScalerLatency create a measurement of the latency to external metric -func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { +func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) { for _, element := range collectors { element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value) } } // RecordScalableObjectLatency create a measurement of the latency executing scalable object loop -func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) { +func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) { for _, element := range collectors { element.RecordScalableObjectLatency(namespace, name, isScaledObject, value) } diff --git a/pkg/metricscollector/opentelemetry.go b/pkg/metricscollector/opentelemetry.go index f66a0d55038..ee1d3190185 100644 --- a/pkg/metricscollector/opentelemetry.go +++ b/pkg/metricscollector/opentelemetry.go @@ -5,6 +5,7 @@ import ( "fmt" "runtime" "strconv" + "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -22,18 +23,22 @@ const meterName = "keda-open-telemetry-metrics" const defaultNamespace = "default" var ( - meterProvider *metric.MeterProvider - meter api.Meter - otScalerErrorsCounter api.Int64Counter - otScaledObjectErrorsCounter api.Int64Counter - otScaledJobErrorsCounter api.Int64Counter - otTriggerTotalsCounter api.Int64UpDownCounter - otCrdTotalsCounter api.Int64UpDownCounter - - otelScalerMetricVal OtelMetricFloat64Val - otelScalerMetricsLatencyVal OtelMetricFloat64Val - otelInternalLoopLatencyVal OtelMetricFloat64Val - otelBuildInfoVal OtelMetricInt64Val + meterProvider *metric.MeterProvider + meter api.Meter + otScalerErrorsCounter api.Int64Counter + otScaledObjectErrorsCounter api.Int64Counter + otScaledJobErrorsCounter api.Int64Counter + otTriggerTotalsCounterDeprecated api.Int64UpDownCounter + otCrdTotalsCounterDeprecated api.Int64UpDownCounter + otTriggerRegisteredTotalsCounter api.Int64UpDownCounter + otCrdRegisteredTotalsCounter api.Int64UpDownCounter + + otelScalerMetricVal OtelMetricFloat64Val + otelScalerMetricsLatencyVal OtelMetricFloat64Val + otelScalerMetricsLatencyValDeprecated OtelMetricFloat64Val + otelInternalLoopLatencyVal OtelMetricFloat64Val + otelInternalLoopLatencyValDeprecated OtelMetricFloat64Val + otelBuildInfoVal OtelMetricInt64Val otCloudEventEmittedCounter api.Int64Counter otCloudEventQueueStatusVal OtelMetricFloat64Val @@ -95,19 +100,29 @@ func initMeters() { otLog.Error(err, msg) } - otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers")) + otTriggerTotalsCounterDeprecated, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("DEPRECATED - will be removed in 2.16 - use 'keda.trigger.registered.count' instead")) if err != nil { otLog.Error(err, msg) } - otCrdTotalsCounter, err = meter.Int64UpDownCounter("keda.resource.totals", api.WithDescription("Total resources")) + otTriggerRegisteredTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.registered.count", api.WithDescription("Total number of triggers per trigger type registered")) + if err != nil { + otLog.Error(err, msg) + } + + otCrdTotalsCounterDeprecated, err = meter.Int64UpDownCounter("keda.resource.totals", api.WithDescription("DEPRECATED - will be removed in 2.16 - use 'keda.resource.registered.count' instead")) + if err != nil { + otLog.Error(err, msg) + } + + otCrdRegisteredTotalsCounter, err = meter.Int64UpDownCounter("keda.resource.registered.count", api.WithDescription("Total number of KEDA custom resources per namespace for each custom resource type (CRD) registered")) if err != nil { otLog.Error(err, msg) } _, err = meter.Float64ObservableGauge( "keda.scaler.metrics.value", - api.WithDescription("Metric Value used for HPA"), + api.WithDescription("The current value for each scaler's metric that would be used by the HPA in computing the target average"), api.WithFloat64Callback(ScalerMetricValueCallback), ) if err != nil { @@ -116,7 +131,16 @@ func initMeters() { _, err = meter.Float64ObservableGauge( "keda.scaler.metrics.latency", - api.WithDescription("Scaler Metrics Latency"), + api.WithDescription("DEPRECATED - use `keda_scaler_metrics_latency_seconds` instead"), + api.WithFloat64Callback(ScalerMetricsLatencyCallbackDeprecated), + ) + if err != nil { + otLog.Error(err, msg) + } + _, err = meter.Float64ObservableGauge( + "keda.scaler.metrics.latency.seconds", + api.WithDescription("The latency of retrieving current metric from each scaler"), + api.WithUnit("s"), api.WithFloat64Callback(ScalerMetricsLatencyCallback), ) if err != nil { @@ -125,7 +149,16 @@ func initMeters() { _, err = meter.Float64ObservableGauge( "keda.internal.scale.loop.latency", + api.WithDescription("DEPRECATED - use `keda_internal_scale_loop_latency_seconds` instead"), + api.WithFloat64Callback(ScalableObjectLatencyCallbackDeprecated), + ) + if err != nil { + otLog.Error(err, msg) + } + _, err = meter.Float64ObservableGauge( + "keda.internal.scale.loop.latency.seconds", api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"), + api.WithUnit("s"), api.WithFloat64Callback(ScalableObjectLatencyCallback), ) if err != nil { @@ -134,7 +167,7 @@ func initMeters() { _, err = meter.Float64ObservableGauge( "keda.scaler.active", - api.WithDescription("Activity of a Scaler Metric"), + api.WithDescription("Indicates whether a scaler is active (1), or not (0)"), api.WithFloat64Callback(ScalerActiveCallback), ) if err != nil { @@ -207,10 +240,20 @@ func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) return nil } +func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error { + if otelScalerMetricsLatencyValDeprecated.measurementOption != nil { + obsrv.Observe(otelScalerMetricsLatencyValDeprecated.val, otelScalerMetricsLatencyValDeprecated.measurementOption) + } + otelScalerMetricsLatencyValDeprecated = OtelMetricFloat64Val{} + return nil +} + // RecordScalerLatency create a measurement of the latency to external metric -func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { - otelScalerMetricsLatencyVal.val = value +func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) { + otelScalerMetricsLatencyVal.val = value.Seconds() otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerMetricsLatencyValDeprecated.val = float64(value.Milliseconds()) + otelScalerMetricsLatencyValDeprecated.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) } func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error { @@ -221,8 +264,16 @@ func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) return nil } +func ScalableObjectLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error { + if otelInternalLoopLatencyValDeprecated.measurementOption != nil { + obsrv.Observe(otelInternalLoopLatencyValDeprecated.val, otelInternalLoopLatencyValDeprecated.measurementOption) + } + otelInternalLoopLatencyValDeprecated = OtelMetricFloat64Val{} + return nil +} + // RecordScalableObjectLatency create a measurement of the latency executing scalable object loop -func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) { +func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) { resourceType := "scaledjob" if isScaledObject { resourceType = "scaledobject" @@ -233,8 +284,10 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, attribute.Key("type").String(resourceType), attribute.Key("name").String(name)) - otelInternalLoopLatencyVal.val = value + otelInternalLoopLatencyVal.val = value.Seconds() otelInternalLoopLatencyVal.measurementOption = opt + otelInternalLoopLatencyValDeprecated.val = float64(value.Milliseconds()) + otelInternalLoopLatencyValDeprecated.measurementOption = opt } func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error { @@ -315,13 +368,15 @@ func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, e func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) { if triggerType != "" { - otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType))) + otTriggerTotalsCounterDeprecated.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType))) + otTriggerRegisteredTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType))) } } func (o *OtelMetrics) DecrementTriggerTotal(triggerType string) { if triggerType != "" { - otTriggerTotalsCounter.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType))) + otTriggerTotalsCounterDeprecated.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType))) + otTriggerRegisteredTotalsCounter.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType))) } } @@ -334,7 +389,8 @@ func (o *OtelMetrics) IncrementCRDTotal(crdType, namespace string) { attribute.Key("type").String(crdType), ) - otCrdTotalsCounter.Add(context.Background(), 1, opt) + otCrdTotalsCounterDeprecated.Add(context.Background(), 1, opt) + otCrdRegisteredTotalsCounter.Add(context.Background(), 1, opt) } func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) { @@ -346,7 +402,8 @@ func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) { attribute.Key("namespace").String(namespace), attribute.Key("type").String(crdType), ) - otCrdTotalsCounter.Add(context.Background(), -1, opt) + otCrdTotalsCounterDeprecated.Add(context.Background(), -1, opt) + otCrdRegisteredTotalsCounter.Add(context.Background(), -1, opt) } func getScalerMeasurementOption(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool) api.MeasurementOption { diff --git a/pkg/metricscollector/opentelemetry_test.go b/pkg/metricscollector/opentelemetry_test.go index 3b763d2017e..82f3071be49 100644 --- a/pkg/metricscollector/opentelemetry_test.go +++ b/pkg/metricscollector/opentelemetry_test.go @@ -3,6 +3,7 @@ package metricscollector import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/sdk/metric" @@ -59,11 +60,11 @@ func TestIncrementTriggerTotal(t *testing.T) { assert.Nil(t, err) scopeMetrics := got.ScopeMetrics[0] assert.NotEqual(t, len(scopeMetrics.Metrics), 0) - buildInfo := retrieveMetric(scopeMetrics.Metrics, "keda.trigger.totals") + triggercount := retrieveMetric(scopeMetrics.Metrics, "keda.trigger.registered.count") - assert.NotNil(t, buildInfo) + assert.NotNil(t, triggercount) - data := buildInfo.Data.(metricdata.Sum[int64]).DataPoints[0] + data := triggercount.Data.(metricdata.Sum[int64]).DataPoints[0] assert.Equal(t, data.Value, int64(1)) testOtel.DecrementTriggerTotal("testtrigger") @@ -72,10 +73,32 @@ func TestIncrementTriggerTotal(t *testing.T) { assert.Nil(t, err) scopeMetrics = got.ScopeMetrics[0] assert.NotEqual(t, len(scopeMetrics.Metrics), 0) - buildInfo = retrieveMetric(scopeMetrics.Metrics, "keda.trigger.totals") + triggercount = retrieveMetric(scopeMetrics.Metrics, "keda.trigger.registered.count") - assert.NotNil(t, buildInfo) + assert.NotNil(t, triggercount) - data = buildInfo.Data.(metricdata.Sum[int64]).DataPoints[0] + data = triggercount.Data.(metricdata.Sum[int64]).DataPoints[0] assert.Equal(t, data.Value, int64(0)) } + +func TestLoopLatency(t *testing.T) { + testOtel.RecordScalableObjectLatency("namespace", "name", true, 500*time.Millisecond) + got := metricdata.ResourceMetrics{} + err := testReader.Collect(context.Background(), &got) + + assert.Nil(t, err) + scopeMetrics := got.ScopeMetrics[0] + assert.NotEqual(t, len(scopeMetrics.Metrics), 0) + + latency := retrieveMetric(scopeMetrics.Metrics, "keda.internal.scale.loop.latency") + assert.NotNil(t, latency) + assert.Equal(t, latency.Unit, "") + data := latency.Data.(metricdata.Gauge[float64]).DataPoints[0] + assert.Equal(t, data.Value, float64(500)) + + latencySeconds := retrieveMetric(scopeMetrics.Metrics, "keda.internal.scale.loop.latency.seconds") + assert.NotNil(t, latencySeconds) + assert.Equal(t, latencySeconds.Unit, "s") + data = latencySeconds.Data.(metricdata.Gauge[float64]).DataPoints[0] + assert.Equal(t, data.Value, float64(0.5)) +} diff --git a/pkg/metricscollector/prommetrics.go b/pkg/metricscollector/prommetrics.go index 2f77bf06f30..2cf51f58e14 100644 --- a/pkg/metricscollector/prommetrics.go +++ b/pkg/metricscollector/prommetrics.go @@ -17,8 +17,10 @@ limitations under the License. package metricscollector import ( + "fmt" "runtime" "strconv" + "time" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" @@ -28,6 +30,10 @@ import ( "github.com/kedacore/keda/v2/version" ) +// bestPracticeDeprecatedMsg is a constant string that is used to indicate that a metric is deprecated as +// part of best practice refactoring - https://github.com/kedacore/keda/pull/5174 +const bestPracticeDeprecatedMsg = "DEPRECATED - will be removed in 2.16:" + var log = logf.Log.WithName("prometheus_server") var ( @@ -36,25 +42,25 @@ var ( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Name: "build_info", - Help: "A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built.", + Help: "Info metric, with static information about KEDA build like: version, git commit and Golang runtime info.", }, []string{"version", "git_commit", "goversion", "goos", "goarch"}, ) - scalerErrorsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ + scalerMetricsValue = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", - Name: "errors_total", - Help: "Total number of errors for all scalers", + Name: "metrics_value", + Help: "The current value for each scaler's metric that would be used by the HPA in computing the target average.", }, - []string{}, + metricLabels, ) - scalerMetricsValue = prometheus.NewGaugeVec( + scalerMetricsLatencyDeprecated = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", - Name: "metrics_value", - Help: "Metric Value used for HPA", + Name: "metrics_latency", + Help: fmt.Sprintf("%v use 'keda_scaler_metrics_latency_seconds' instead.", bestPracticeDeprecatedMsg), }, metricLabels, ) @@ -62,8 +68,8 @@ var ( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", - Name: "metrics_latency", - Help: "Scaler Metrics Latency", + Name: "metrics_latency_seconds", + Help: "The latency of retrieving current metric from each scaler, in seconds.", }, metricLabels, ) @@ -72,7 +78,7 @@ var ( Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", Name: "active", - Help: "Activity of a Scaler Metric", + Help: "Indicates whether a scaler is active (1), or not (0).", }, metricLabels, ) @@ -81,62 +87,125 @@ var ( Namespace: DefaultPromMetricsNamespace, Subsystem: "scaled_object", Name: "paused", - Help: "Indicates whether a ScaledObject is paused", + Help: "Indicates whether a ScaledObject is paused (1), or not (0).", }, []string{"namespace", "scaledObject"}, ) - scalerErrors = prometheus.NewCounterVec( + scalerErrorsDeprecated = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", Name: "errors", - Help: "Number of scaler errors", + Help: fmt.Sprintf("%v use 'keda_scaler_detail_errors_total' instead.", bestPracticeDeprecatedMsg), }, metricLabels, ) - scaledObjectErrors = prometheus.NewCounterVec( + scalerErrorsTotalDeprecated = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaler", + Name: "errors_total", + Help: fmt.Sprintf("%v use use a `sum(keda_scaler_detail_errors_total{scaler!=\"\"})` over all scalers", bestPracticeDeprecatedMsg), + }, + []string{}, + ) + scalerErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaler", + Name: "detail_errors_total", + Help: "The total number of errors encountered for each scaler.", + }, + metricLabels, + ) + scaledObjectErrorsDeprecated = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "scaled_object", Name: "errors", - Help: "Number of scaled object errors", + Help: fmt.Sprintf("%v use 'keda_scaled_object_errors_total' instead.", bestPracticeDeprecatedMsg), }, []string{"namespace", "scaledObject"}, ) - scaledJobErrors = prometheus.NewCounterVec( + scaledObjectErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_object", + Name: "errors_total", + Help: "The number of errors that have occurred for each ScaledObject.", + }, + []string{"namespace", "scaledObject"}, + ) + scaledJobErrorsDeprecated = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "scaled_job", Name: "errors", + Help: fmt.Sprintf("%v use 'keda_scaled_job_errors_total' instead.", bestPracticeDeprecatedMsg), + }, + []string{"namespace", "scaledJob"}, + ) + scaledJobErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_job", + Name: "errors_total", Help: "Number of scaled job errors", }, []string{"namespace", "scaledJob"}, ) - triggerTotalsGaugeVec = prometheus.NewGaugeVec( + triggerTotalsGaugeVecDeprecated = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "trigger", Name: "totals", + Help: fmt.Sprintf("%v use 'keda_trigger_registered_total' instead.", bestPracticeDeprecatedMsg), }, []string{"type"}, ) - - crdTotalsGaugeVec = prometheus.NewGaugeVec( + triggerRegistered = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "trigger", + Name: "registered_total", + Help: "Total number of triggers per trigger type registered.", + }, + []string{"type"}, + ) + crdTotalsGaugeVecDeprecated = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "resource", Name: "totals", + Help: fmt.Sprintf("%v use 'keda_resource_handled_total' instead.", bestPracticeDeprecatedMsg), }, []string{"type", "namespace"}, ) - - internalLoopLatency = prometheus.NewGaugeVec( + crdRegistered = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "resource", + Name: "registered_total", + Help: "Total number of KEDA custom resources per namespace for each custom resource type (CRD) registered.", + }, + []string{"type", "namespace"}, + ) + internalLoopLatencyDeprecated = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, Subsystem: "internal_scale_loop", Name: "latency", - Help: "Internal latency of ScaledObject/ScaledJob loop execution", + Help: fmt.Sprintf("%v use 'keda_internal_scale_loop_latency_seconds' instead.", bestPracticeDeprecatedMsg), + }, + []string{"namespace", "type", "resource"}, + ) + internalLoopLatency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "internal_scale_loop", + Name: "latency_seconds", + Help: "Total deviation (in seconds) between the expected execution time and the actual execution time for the scaling loop.", }, []string{"namespace", "type", "resource"}, ) @@ -167,18 +236,25 @@ type PromMetrics struct { } func NewPromMetrics() *PromMetrics { - metrics.Registry.MustRegister(scalerErrorsTotal) + metrics.Registry.MustRegister(scalerErrorsTotalDeprecated) metrics.Registry.MustRegister(scalerMetricsValue) + metrics.Registry.MustRegister(scalerMetricsLatencyDeprecated) metrics.Registry.MustRegister(scalerMetricsLatency) + metrics.Registry.MustRegister(internalLoopLatencyDeprecated) metrics.Registry.MustRegister(internalLoopLatency) metrics.Registry.MustRegister(scalerActive) + metrics.Registry.MustRegister(scalerErrorsDeprecated) metrics.Registry.MustRegister(scalerErrors) + metrics.Registry.MustRegister(scaledObjectErrorsDeprecated) metrics.Registry.MustRegister(scaledObjectErrors) metrics.Registry.MustRegister(scaledObjectPaused) + metrics.Registry.MustRegister(triggerRegistered) + metrics.Registry.MustRegister(crdRegistered) + metrics.Registry.MustRegister(scaledJobErrorsDeprecated) metrics.Registry.MustRegister(scaledJobErrors) - metrics.Registry.MustRegister(triggerTotalsGaugeVec) - metrics.Registry.MustRegister(crdTotalsGaugeVec) + metrics.Registry.MustRegister(triggerTotalsGaugeVecDeprecated) + metrics.Registry.MustRegister(crdTotalsGaugeVecDeprecated) metrics.Registry.MustRegister(buildInfo) metrics.Registry.MustRegister(cloudeventEmitted) @@ -199,13 +275,15 @@ func (p *PromMetrics) RecordScalerMetric(namespace string, scaledResource string } // RecordScalerLatency create a measurement of the latency to external metric -func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { - scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value) +func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) { + scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value.Seconds()) + scalerMetricsLatencyDeprecated.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(float64(value.Milliseconds())) } // RecordScalableObjectLatency create a measurement of the latency executing scalable object loop -func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) { - internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value) +func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) { + internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value.Seconds()) + internalLoopLatencyDeprecated.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(float64(value.Milliseconds())) } // RecordScalerActive create a measurement of the activity of the scaler @@ -234,14 +312,19 @@ func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject st func (p *PromMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) { if err != nil { scalerErrors.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Inc() + scalerErrorsDeprecated.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Inc() p.RecordScaledObjectError(namespace, scaledResource, err) - scalerErrorsTotal.With(prometheus.Labels{}).Inc() + scalerErrorsTotalDeprecated.With(prometheus.Labels{}).Inc() return } // initialize metric with 0 if not already set _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)) if errscaler != nil { - log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v") + log.Error(errscaler, "Unable to record metrics: %v") + } + _, errscalerdep := scalerErrorsDeprecated.GetMetricWith(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)) + if errscalerdep != nil { + log.Error(errscaler, "Unable to record (deprecated) metrics: %v") } } @@ -250,12 +333,18 @@ func (p *PromMetrics) RecordScaledObjectError(namespace string, scaledObject str labels := prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject} if err != nil { scaledObjectErrors.With(labels).Inc() + scaledObjectErrorsDeprecated.With(labels).Inc() return } // initialize metric with 0 if not already set _, errscaledobject := scaledObjectErrors.GetMetricWith(labels) if errscaledobject != nil { - log.Error(errscaledobject, "Unable to write to metrics to Prometheus Server: %v") + log.Error(errscaledobject, "Unable to record metrics: %v") + return + } + _, errscaledobjectdep := scaledObjectErrorsDeprecated.GetMetricWith(labels) + if errscaledobjectdep != nil { + log.Error(errscaledobject, "Unable to record metrics: %v") return } } @@ -264,6 +353,7 @@ func (p *PromMetrics) RecordScaledObjectError(namespace string, scaledObject str func (p *PromMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) { labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob} if err != nil { + scaledJobErrorsDeprecated.With(labels).Inc() scaledJobErrors.With(labels).Inc() return } @@ -288,13 +378,15 @@ func getResourceType(isScaledObject bool) string { func (p *PromMetrics) IncrementTriggerTotal(triggerType string) { if triggerType != "" { - triggerTotalsGaugeVec.WithLabelValues(triggerType).Inc() + triggerRegistered.WithLabelValues(triggerType).Inc() + triggerTotalsGaugeVecDeprecated.WithLabelValues(triggerType).Inc() } } func (p *PromMetrics) DecrementTriggerTotal(triggerType string) { if triggerType != "" { - triggerTotalsGaugeVec.WithLabelValues(triggerType).Dec() + triggerRegistered.WithLabelValues(triggerType).Dec() + triggerTotalsGaugeVecDeprecated.WithLabelValues(triggerType).Dec() } } @@ -303,7 +395,8 @@ func (p *PromMetrics) IncrementCRDTotal(crdType, namespace string) { namespace = defaultNamespace } - crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Inc() + crdRegistered.WithLabelValues(crdType, namespace).Inc() + crdTotalsGaugeVecDeprecated.WithLabelValues(crdType, namespace).Inc() } func (p *PromMetrics) DecrementCRDTotal(crdType, namespace string) { @@ -311,7 +404,8 @@ func (p *PromMetrics) DecrementCRDTotal(crdType, namespace string) { namespace = defaultNamespace } - crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec() + crdRegistered.WithLabelValues(crdType, namespace).Dec() + crdTotalsGaugeVecDeprecated.WithLabelValues(crdType, namespace).Dec() } // RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink diff --git a/pkg/metricscollector/webhook/webhook_prommetrics.go b/pkg/metricscollector/webhook/webhook_prommetrics.go index c7ba4df49c0..329d3e4b4e3 100644 --- a/pkg/metricscollector/webhook/webhook_prommetrics.go +++ b/pkg/metricscollector/webhook/webhook_prommetrics.go @@ -31,6 +31,15 @@ var ( Namespace: DefaultPromMetricsNamespace, Subsystem: "webhook", Name: "scaled_object_validation_total", + Help: "DEPRECATED - will be removed in 2.16 - Use `scaled_object_validations_total` instead.", + }, + []string{"namespace", "action"}, + ) + scaledObjectValidationsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "webhook", + Name: "scaled_object_validations_total", Help: "Total number of scaled object validations", }, []string{"namespace", "action"}, @@ -40,6 +49,15 @@ var ( Namespace: DefaultPromMetricsNamespace, Subsystem: "webhook", Name: "scaled_object_validation_errors", + Help: "DEPRECATED - will be removed in 2.16 - Use `scaled_object_validation_errors_total` instead.", + }, + []string{"namespace", "action", "reason"}, + ) + scaledObjectValidationErrorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "webhook", + Name: "scaled_object_validation_errors_total", Help: "Total number of scaled object validating errors", }, []string{"namespace", "action", "reason"}, @@ -48,17 +66,21 @@ var ( func init() { metrics.Registry.MustRegister(scaledObjectValidatingTotal) + metrics.Registry.MustRegister(scaledObjectValidationsTotal) metrics.Registry.MustRegister(scaledObjectValidatingErrors) + metrics.Registry.MustRegister(scaledObjectValidationErrorsTotal) } // RecordScaledObjectValidatingTotal counts the number of ScaledObject validations func RecordScaledObjectValidatingTotal(namespace, action string) { labels := prometheus.Labels{"namespace": namespace, "action": action} scaledObjectValidatingTotal.With(labels).Inc() + scaledObjectValidationsTotal.With(labels).Inc() } // RecordScaledObjectValidatingErrors counts the number of ScaledObject validating errors func RecordScaledObjectValidatingErrors(namespace, action, reason string) { labels := prometheus.Labels{"namespace": namespace, "action": action, "reason": reason} scaledObjectValidatingErrors.With(labels).Inc() + scaledObjectValidationErrorsTotal.With(labels).Inc() } diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index 49042f86f0d..acd253804c4 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -122,14 +122,14 @@ func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, ind // GetMetricsAndActivityForScaler returns metric value, activity and latency for a scaler identified by the metric name // and by the input index (from the list of scalers in this ScaledObject) -func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, int64, error) { +func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, time.Duration, error) { if index < 0 || index >= len(c.Scalers) { return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) } startTime := time.Now() metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName) if err == nil { - return metric, activity, time.Since(startTime).Milliseconds(), nil + return metric, activity, time.Since(startTime), nil } ns, err := c.refreshScaler(ctx, index) @@ -138,7 +138,7 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index } startTime = time.Now() metric, activity, err = ns.GetMetricsAndActivity(ctx, metricName) - return metric, activity, time.Since(startTime).Milliseconds(), err + return metric, activity, time.Since(startTime), err } func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index a62263c7166..7d6a25bf740 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -174,7 +174,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a // we calculate the next execution time based on the pollingInterval and record the difference // between the expected execution time and the real execution time delay := time.Since(next) - metricscollector.RecordScalableObjectLatency(withTriggers.Namespace, withTriggers.Name, isScaledObject, float64(delay.Milliseconds())) + metricscollector.RecordScalableObjectLatency(withTriggers.Namespace, withTriggers.Name, isScaledObject, delay) tmr := time.NewTimer(pollingInterval) next = time.Now().Add(pollingInterval) @@ -522,10 +522,10 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } if !metricsFoundInCache { - var latency int64 + var latency time.Duration metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) if latency != -1 { - metricscollector.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, float64(latency)) + metricscollector.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, latency) } logger.V(1).Info("Getting metrics from trigger", "trigger", triggerName, "metricName", metricName, "metrics", metrics, "scalerError", err) } @@ -750,11 +750,11 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, metricName := spec.External.Metric.Name - var latency int64 + var latency time.Duration metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err) if latency != -1 { - metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, float64(latency)) + metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency) } result.Metrics = append(result.Metrics, metrics...) logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) @@ -842,7 +842,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if latency != -1 { - metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency)) + metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency) } if err != nil { scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) From 38d261a8ed52e1d88386792efb0aff7de6a6cc22 Mon Sep 17 00:00:00 2001 From: Tom Kerkhove Date: Fri, 19 Apr 2024 19:11:54 +0200 Subject: [PATCH 3/6] chore: Delay v2.14 release to next week (#5703) Signed-off-by: Tom Kerkhove --- ROADMAP.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ROADMAP.md b/ROADMAP.md index 57154e49494..39db4df05ad 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -14,7 +14,7 @@ Here is an overview of our current release estimations: | Version | Estimated Release Date | |:--------|:-----------------------------------------------------| -| v2.14 | April 12th, 2024 | +| v2.14 | April 25th, 2024 | | v2.15 | July 9th, 2024 | | v2.16 | Oct 3rd, 2024 | From 31d848c1b65210801c4d04f6d829bde16dbccbff Mon Sep 17 00:00:00 2001 From: Jeremy Tymes Date: Fri, 19 Apr 2024 13:12:49 -0400 Subject: [PATCH 4/6] Resolve Pub/Sub resources from scale target's environment (#5701) Signed-off-by: Jeremy Tymes --- CHANGELOG.md | 1 + pkg/scalers/gcp_pubsub_scaler.go | 97 +++++++++++++++++++++------ pkg/scalers/gcp_pubsub_scaler_test.go | 18 ++++- 3 files changed, 93 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c86efce55e8..1b4ba259902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ Here is an overview of all new **experimental** features: - **General**: Improve Prometheus metrics to align with best practices ([#4854](https://github.com/kedacore/keda/issues/4854)) - **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670)) - **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574)) +- **GCP Pub/Sub Scaler**: Add support for resolving resource names from the scale target's environment ([#5693](https://github.com/kedacore/keda/issues/5693)) - **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633)) - **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633)) - **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544)) diff --git a/pkg/scalers/gcp_pubsub_scaler.go b/pkg/scalers/gcp_pubsub_scaler.go index 031fad1a614..226b569f2b2 100644 --- a/pkg/scalers/gcp_pubsub_scaler.go +++ b/pkg/scalers/gcp_pubsub_scaler.go @@ -71,6 +71,80 @@ func NewPubSubScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { }, nil } +func parsePubSubResourceConfig(config *scalersconfig.ScalerConfig, meta *pubsubMetadata) error { + sub, subPresent := config.TriggerMetadata["subscriptionName"] + subFromEnv, subFromEnvPresent := config.TriggerMetadata["subscriptionNameFromEnv"] + if subPresent && subFromEnvPresent { + return fmt.Errorf("exactly one of subscriptionName or subscriptionNameFromEnv is allowed") + } + hasSub := subPresent || subFromEnvPresent + + topic, topicPresent := config.TriggerMetadata["topicName"] + topicFromEnv, topicFromEnvPresent := config.TriggerMetadata["topicNameFromEnv"] + if topicPresent && topicFromEnvPresent { + return fmt.Errorf("exactly one of topicName or topicNameFromEnv is allowed") + } + hasTopic := topicPresent || topicFromEnvPresent + + if (!hasSub && !hasTopic) || (hasSub && hasTopic) { + return fmt.Errorf("exactly one of subscription or topic name must be given") + } + + if hasSub { + if subPresent { + if sub == "" { + return fmt.Errorf("no subscription name given") + } + + meta.resourceName = sub + } else { + if subFromEnv == "" { + return fmt.Errorf("no environment variable name given for resolving subscription name") + } + + resolvedSub, ok := config.ResolvedEnv[subFromEnv] + if !ok { + return fmt.Errorf("resolved environment doesn't contain name '%s'", subFromEnv) + } + + if resolvedSub == "" { + return fmt.Errorf("resolved environment subscription name is empty") + } + + meta.resourceName = config.ResolvedEnv[subFromEnv] + } + + meta.resourceType = resourceTypePubSubSubscription + } else { + if topicPresent { + if topic == "" { + return fmt.Errorf("no topic name given") + } + + meta.resourceName = topic + } else { + if topicFromEnv == "" { + return fmt.Errorf("no environment variable name given for resolving topic name") + } + + resolvedTopic, ok := config.ResolvedEnv[topicFromEnv] + if !ok { + return fmt.Errorf("resolved environment doesn't contain name '%s'", topicFromEnv) + } + + if resolvedTopic == "" { + return fmt.Errorf("resolved environment topic name is empty") + } + + meta.resourceName = config.ResolvedEnv[topicFromEnv] + } + + meta.resourceType = resourceTypePubSubTopic + } + + return nil +} + func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*pubsubMetadata, error) { meta := pubsubMetadata{mode: pubSubModeSubscriptionSize, value: pubSubDefaultValue} @@ -106,26 +180,9 @@ func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) meta.aggregation = config.TriggerMetadata["aggregation"] - sub, subPresent := config.TriggerMetadata["subscriptionName"] - topic, topicPresent := config.TriggerMetadata["topicName"] - if (!subPresent && !topicPresent) || (subPresent && topicPresent) { - return nil, fmt.Errorf("exactly one of subscription or topic name must be given") - } - - if subPresent { - if sub == "" { - return nil, fmt.Errorf("no subscription name given") - } - - meta.resourceName = sub - meta.resourceType = resourceTypePubSubSubscription - } else { - if topic == "" { - return nil, fmt.Errorf("no topic name given") - } - - meta.resourceName = topic - meta.resourceType = resourceTypePubSubTopic + err := parsePubSubResourceConfig(config, &meta) + if err != nil { + return nil, err } meta.activationValue = 0 diff --git a/pkg/scalers/gcp_pubsub_scaler_test.go b/pkg/scalers/gcp_pubsub_scaler_test.go index bba4516414a..a0433f376fb 100644 --- a/pkg/scalers/gcp_pubsub_scaler_test.go +++ b/pkg/scalers/gcp_pubsub_scaler_test.go @@ -10,7 +10,9 @@ import ( ) var testPubSubResolvedEnv = map[string]string{ - "SAMPLE_CREDS": "{}", + "SAMPLE_CREDS": "{}", + "MY_ENV_SUBSCRIPTION": "myEnvSubscription", + "MY_ENV_TOPIC": "myEnvTopic", } type parsePubSubMetadataTestData struct { @@ -76,6 +78,14 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{ {nil, map[string]string{"value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, // both subscriptionSize and topicName present {nil, map[string]string{"subscriptionSize": "7", "topicName": "mytopic", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // both subscriptionName and subscriptionNameFromEnv present + {nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionNameFromEnv": "MY_ENV_SUBSCRIPTION", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // both topicName and topicNameFromEnv present + {nil, map[string]string{"topicName": "mytopic", "topicNameFromEnv": "MY_ENV_TOPIC", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // subscriptionNameFromEnv present + {nil, map[string]string{"subscriptionNameFromEnv": "MY_ENV_SUBSCRIPTION", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // topicNameFromEnv present + {nil, map[string]string{"topicNameFromEnv": "MY_ENV_TOPIC", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, } var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{ @@ -90,6 +100,8 @@ var gcpResourceNameTests = []gcpPubSubSubscription{ {&testPubSubMetadata[12], 1, "projects/myproject/mysubscription", ""}, {&testPubSubMetadata[17], 1, "mytopic", "myproject"}, {&testPubSubMetadata[18], 1, "projects/myproject/mytopic", ""}, + {&testPubSubMetadata[24], 1, "myEnvSubscription", ""}, + {&testPubSubMetadata[25], 1, "myEnvTopic", ""}, } var gcpSubscriptionDefaults = []gcpPubSubSubscription{ @@ -140,7 +152,7 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) { } } -func TestGcpPubSubSubscriptionName(t *testing.T) { +func TestGcpPubSubResourceName(t *testing.T) { for _, testData := range gcpResourceNameTests { meta, err := parsePubSubMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, TriggerIndex: testData.triggerIndex}, logr.Discard()) if err != nil { @@ -150,7 +162,7 @@ func TestGcpPubSubSubscriptionName(t *testing.T) { resourceID, projectID := getResourceData(&mockGcpPubSubScaler) if resourceID != testData.name || projectID != testData.projectID { - t.Error("Wrong Subscription parsing:", resourceID, projectID) + t.Error("Wrong Resource parsing:", resourceID, projectID) } } } From ef0149d5d0817f93cb7f3d46a292803b5615153e Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Fri, 19 Apr 2024 17:23:23 +0000 Subject: [PATCH 5/6] chore(deps): update actions/upload-artifact digest to 1746f4a (#5709) Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/pr-e2e.yml | 2 +- .github/workflows/template-main-e2e-test.yml | 2 +- .github/workflows/template-smoke-tests.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pr-e2e.yml b/.github/workflows/pr-e2e.yml index 7f479cc7dad..10d53787e57 100644 --- a/.github/workflows/pr-e2e.yml +++ b/.github/workflows/pr-e2e.yml @@ -219,7 +219,7 @@ jobs: details_url: https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} - name: Upload test logs - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4 + uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4 with: name: e2e-test-logs path: "${{ github.workspace }}/**/*.log" diff --git a/.github/workflows/template-main-e2e-test.yml b/.github/workflows/template-main-e2e-test.yml index decb2f5144c..7fed216b913 100644 --- a/.github/workflows/template-main-e2e-test.yml +++ b/.github/workflows/template-main-e2e-test.yml @@ -48,7 +48,7 @@ jobs: NODE_POOL_SIZE: 1 - name: Upload test logs - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4 + uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4 if: ${{ always() }} with: name: e2e-test-logs diff --git a/.github/workflows/template-smoke-tests.yml b/.github/workflows/template-smoke-tests.yml index a72a5f13ec0..2afd4b13330 100644 --- a/.github/workflows/template-smoke-tests.yml +++ b/.github/workflows/template-smoke-tests.yml @@ -44,7 +44,7 @@ jobs: run: make smoke-test - name: Upload test logs - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4 + uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4 if: ${{ always() }} with: name: smoke-test-logs ${{ inputs.runs-on }}-${{ inputs.kubernetesVersion }} From 4ba13656b982184f7dd002c5cbce2fbd7e937e6b Mon Sep 17 00:00:00 2001 From: Jorge Turrado Ferrero Date: Mon, 22 Apr 2024 09:31:57 +0200 Subject: [PATCH 6/6] fix: Use correct otel-collector distro (#5716) Signed-off-by: Jorge Turrado Ferrero --- tests/utils/helper/helper.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/utils/helper/helper.go b/tests/utils/helper/helper.go index f9a66661f1b..e1a3649bef0 100644 --- a/tests/utils/helper/helper.go +++ b/tests/utils/helper/helper.go @@ -47,6 +47,8 @@ data: target_label: kubernetes_pod_name ` OtlpConfig = `mode: deployment +image: + repository: "otel/opentelemetry-collector-contrib" config: exporters: logging: