Skip to content

Commit

Permalink
Merge branch 'main' into spiritzhou/azureeventgrid
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkerkhove authored Apr 22, 2024
2 parents f48ae1a + 4ba1365 commit 3007bdf
Show file tree
Hide file tree
Showing 17 changed files with 398 additions and 127 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ jobs:
details_url: https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}

- name: Upload test logs
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4
uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4
with:
name: e2e-test-logs
path: "${{ github.workspace }}/**/*.log"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/template-main-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
NODE_POOL_SIZE: 1

- name: Upload test logs
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4
uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4
if: ${{ always() }}
with:
name: e2e-test-logs
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/template-smoke-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
run: make smoke-test

- name: Upload test logs
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4
uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4
if: ${{ always() }}
with:
name: smoke-test-logs ${{ inputs.runs-on }}-${{ inputs.kubernetesVersion }}
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ Here is an overview of all new **experimental** features:
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **General**: Add `validations.keda.sh/hpa-ownership` annotation to HPA to disable ownership validation ([#5516](https://github.com/kedacore/keda/issues/5516))
- **General**: Improve Prometheus metrics to align with best practices ([#4854](https://github.com/kedacore/keda/issues/4854))
- **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670))
- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574))
- **GCP Pub/Sub Scaler**: Add support for resolving resource names from the scale target's environment ([#5693](https://github.com/kedacore/keda/issues/5693))
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
- **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544))
Expand All @@ -95,7 +97,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- Various Prometheus metrics have been renamed to follow the preferred naming conventions. The old ones are still available, but will be removed in the future ([#4854](https://github.com/kedacore/keda/issues/4854)).

### Breaking Changes

Expand All @@ -108,6 +110,7 @@ New deprecation(s):
- **General**: Introduce ENABLE_OPENTELEMETRY in deploying/testing process ([#5375](https://github.com/kedacore/keda/issues/5375)|[#5578](https://github.com/kedacore/keda/issues/5578))
- **General**: Migrate away from unmaintained golang/mock and use uber/gomock ([#5440](https://github.com/kedacore/keda/issues/5440))
- **General**: Minor refactor to reduce copy/paste code in ScaledObject webhook ([#5397](https://github.com/kedacore/keda/issues/5397))
- **General**: No need to list all secret in the namespace to find just one ([#5669](https://github.com/kedacore/keda/pull/5669))
- **Kafka**: Expose GSSAPI service name ([#5474](https://github.com/kedacore/keda/issues/5474))

## v2.13.1
Expand Down
2 changes: 1 addition & 1 deletion ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Here is an overview of our current release estimations:

| Version | Estimated Release Date |
|:--------|:-----------------------------------------------------|
| v2.14 | April 12th, 2024 |
| v2.14 | April 25th, 2024 |
| v2.15 | July 9th, 2024 |
| v2.16 | Oct 3rd, 2024 |

Expand Down
6 changes: 3 additions & 3 deletions config/grafana/keda-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum by(job) (rate(keda_scaler_errors{}[5m]))",
"expr": "sum by(job) (rate(keda_scaler_detail_errors_total{}[5m]))",
"legendFormat": "{{ job }}",
"range": true,
"refId": "A"
Expand Down Expand Up @@ -313,7 +313,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum by(scaler) (rate(keda_scaler_errors{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\", scaler=~\"$scaler\"}[5m]))",
"expr": "sum by(scaler) (rate(keda_scaler_detail_errors_total{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\", scaler=~\"$scaler\"}[5m]))",
"legendFormat": "{{ scaler }}",
"range": true,
"refId": "A"
Expand Down Expand Up @@ -423,7 +423,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum by(scaledObject) (rate(keda_scaled_object_errors{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\"}[5m]))",
"expr": "sum by(scaledObject) (rate(keda_scaled_object_errors_total{exported_namespace=~\"$namespace\", scaledObject=~\"$scaledObject\"}[5m]))",
"legendFormat": "{{ scaledObject }}",
"range": true,
"refId": "A"
Expand Down
27 changes: 13 additions & 14 deletions pkg/certificates/certificate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-logr/logr"
"github.com/open-policy-agent/cert-controller/pkg/rotator"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -109,26 +110,24 @@ func getDNSNames(service, k8sClusterDomain string) []string {

// ensureSecret ensures that the secret used for storing TLS certificates exists
func (cm CertManager) ensureSecret(ctx context.Context, mgr manager.Manager, secretName string) error {
secrets := &corev1.SecretList{}
secret := &corev1.Secret{}
kedaNamespace := kedautil.GetPodNamespace()
opt := &client.ListOptions{
objKey := client.ObjectKey{
Namespace: kedaNamespace,
Name: secretName,
}

err := mgr.GetAPIReader().List(ctx, secrets, opt)
create := false
err := mgr.GetAPIReader().Get(ctx, objKey, secret)
if err != nil {
cm.Logger.Error(err, "unable to check secrets")
return err
}

exists := false
for _, secret := range secrets.Items {
if secret.Name == secretName {
exists = true
break
if errors.IsNotFound(err) {
create = true
} else {
cm.Logger.Error(err, "unable to check secret")
return err
}
}
if !exists {

if create {
secret := &corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: secretName,
Expand Down
10 changes: 6 additions & 4 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package metricscollector

import (
"time"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
)

Expand All @@ -39,10 +41,10 @@ type MetricsCollector interface {
RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)

// RecordScalerLatency create a measurement of the latency to external metric
RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)
RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration)

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64)
RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration)

// RecordScalerActive create a measurement of the activity of the scaler
RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool)
Expand Down Expand Up @@ -101,14 +103,14 @@ func RecordScalerMetric(namespace string, scaledObject string, scaler string, tr
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
for _, element := range collectors {
element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value)
}
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) {
for _, element := range collectors {
element.RecordScalableObjectLatency(namespace, name, isScaledObject, value)
}
Expand Down
107 changes: 82 additions & 25 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"runtime"
"strconv"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -22,18 +23,22 @@ const meterName = "keda-open-telemetry-metrics"
const defaultNamespace = "default"

var (
meterProvider *metric.MeterProvider
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val
meterProvider *metric.MeterProvider
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounterDeprecated api.Int64UpDownCounter
otCrdTotalsCounterDeprecated api.Int64UpDownCounter
otTriggerRegisteredTotalsCounter api.Int64UpDownCounter
otCrdRegisteredTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVal OtelMetricFloat64Val
Expand Down Expand Up @@ -95,19 +100,29 @@ func initMeters() {
otLog.Error(err, msg)
}

otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers"))
otTriggerTotalsCounterDeprecated, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("DEPRECATED - will be removed in 2.16 - use 'keda.trigger.registered.count' instead"))
if err != nil {
otLog.Error(err, msg)
}

otCrdTotalsCounter, err = meter.Int64UpDownCounter("keda.resource.totals", api.WithDescription("Total resources"))
otTriggerRegisteredTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.registered.count", api.WithDescription("Total number of triggers per trigger type registered"))
if err != nil {
otLog.Error(err, msg)
}

otCrdTotalsCounterDeprecated, err = meter.Int64UpDownCounter("keda.resource.totals", api.WithDescription("DEPRECATED - will be removed in 2.16 - use 'keda.resource.registered.count' instead"))
if err != nil {
otLog.Error(err, msg)
}

otCrdRegisteredTotalsCounter, err = meter.Int64UpDownCounter("keda.resource.registered.count", api.WithDescription("Total number of KEDA custom resources per namespace for each custom resource type (CRD) registered"))
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.value",
api.WithDescription("Metric Value used for HPA"),
api.WithDescription("The current value for each scaler's metric that would be used by the HPA in computing the target average"),
api.WithFloat64Callback(ScalerMetricValueCallback),
)
if err != nil {
Expand All @@ -116,7 +131,16 @@ func initMeters() {

_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency",
api.WithDescription("Scaler Metrics Latency"),
api.WithDescription("DEPRECATED - use `keda_scaler_metrics_latency_seconds` instead"),
api.WithFloat64Callback(ScalerMetricsLatencyCallbackDeprecated),
)
if err != nil {
otLog.Error(err, msg)
}
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency.seconds",
api.WithDescription("The latency of retrieving current metric from each scaler"),
api.WithUnit("s"),
api.WithFloat64Callback(ScalerMetricsLatencyCallback),
)
if err != nil {
Expand All @@ -125,7 +149,16 @@ func initMeters() {

_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("DEPRECATED - use `keda_internal_scale_loop_latency_seconds` instead"),
api.WithFloat64Callback(ScalableObjectLatencyCallbackDeprecated),
)
if err != nil {
otLog.Error(err, msg)
}
_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency.seconds",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithUnit("s"),
api.WithFloat64Callback(ScalableObjectLatencyCallback),
)
if err != nil {
Expand All @@ -134,7 +167,7 @@ func initMeters() {

_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithDescription("Indicates whether a scaler is active (1), or not (0)"),
api.WithFloat64Callback(ScalerActiveCallback),
)
if err != nil {
Expand Down Expand Up @@ -207,10 +240,20 @@ func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer)
return nil
}

func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyValDeprecated.val, otelScalerMetricsLatencyValDeprecated.measurementOption)
}
otelScalerMetricsLatencyValDeprecated = OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricsLatencyVal.val = value
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
otelScalerMetricsLatencyVal.val = value.Seconds()
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValDeprecated.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValDeprecated.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand All @@ -221,8 +264,16 @@ func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer)
return nil
}

func ScalableObjectLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyValDeprecated.val, otelInternalLoopLatencyValDeprecated.measurementOption)
}
otelInternalLoopLatencyValDeprecated = OtelMetricFloat64Val{}
return nil
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) {
resourceType := "scaledjob"
if isScaledObject {
resourceType = "scaledobject"
Expand All @@ -233,8 +284,10 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

otelInternalLoopLatencyVal.val = value
otelInternalLoopLatencyVal.val = value.Seconds()
otelInternalLoopLatencyVal.measurementOption = opt
otelInternalLoopLatencyValDeprecated.val = float64(value.Milliseconds())
otelInternalLoopLatencyValDeprecated.measurementOption = opt
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down Expand Up @@ -315,13 +368,15 @@ func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, e

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerTotalsCounterDeprecated.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerRegisteredTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
}
}

func (o *OtelMetrics) DecrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerTotalsCounterDeprecated.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType)))
otTriggerRegisteredTotalsCounter.Add(context.Background(), -1, api.WithAttributes(attribute.Key("type").String(triggerType)))
}
}

Expand All @@ -334,7 +389,8 @@ func (o *OtelMetrics) IncrementCRDTotal(crdType, namespace string) {
attribute.Key("type").String(crdType),
)

otCrdTotalsCounter.Add(context.Background(), 1, opt)
otCrdTotalsCounterDeprecated.Add(context.Background(), 1, opt)
otCrdRegisteredTotalsCounter.Add(context.Background(), 1, opt)
}

func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) {
Expand All @@ -346,7 +402,8 @@ func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) {
attribute.Key("namespace").String(namespace),
attribute.Key("type").String(crdType),
)
otCrdTotalsCounter.Add(context.Background(), -1, opt)
otCrdTotalsCounterDeprecated.Add(context.Background(), -1, opt)
otCrdRegisteredTotalsCounter.Add(context.Background(), -1, opt)
}

func getScalerMeasurementOption(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool) api.MeasurementOption {
Expand Down
Loading

0 comments on commit 3007bdf

Please sign in to comment.