From 3dd1e970188a808ec609cdf1fde49f48f45c6036 Mon Sep 17 00:00:00 2001 From: Jirka Kremser Date: Wed, 9 Oct 2024 19:31:54 +0200 Subject: [PATCH] More tests, glueing things together, breaking import cycle Signed-off-by: Jirka Kremser --- examples/so.yaml | 11 +- main.go | 5 +- metric/mem_store.go | 80 ++++++-------- metric/mem_store_test.go | 166 ++++++++++++++-------------- metric/simple_parser.go | 18 +-- metric/simple_parser_test.go | 12 +- receiver/receiver.go | 16 +-- scaler/handlers.go | 35 +++--- metric/types.go => types/metrics.go | 4 +- util/handler_specific.go | 75 +++++++++++++ util/helpers.go | 49 ++++++++ 11 files changed, 297 insertions(+), 174 deletions(-) rename metric/types.go => types/metrics.go (97%) create mode 100644 util/handler_specific.go diff --git a/examples/so.yaml b/examples/so.yaml index f21c7ae..8a87ee7 100644 --- a/examples/so.yaml +++ b/examples/so.yaml @@ -11,8 +11,15 @@ spec: - type: external metadata: scalerAddress: keda-otel-scaler:4318 + + # required metricQuery: "sum(http/server/request_count{app_id=nodeapp, method=GET, path=/v1.0/state/statestore})" + + # optional - default no limit clampMin: 1 + + # optional - default no limit clampMax: 10 - operationOverTime: last_one/rate/count/average/min/max - overTimePeriodSeconds: 60 + + # optional - default 'last_one', available values: last_one,rate,count,avg,min,max + operationOverTime: last_one diff --git a/main.go b/main.go index d7a8e09..4d883b8 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ import ( "github.com/kedify/otel-add-on/metric" "github.com/kedify/otel-add-on/receiver" "github.com/kedify/otel-add-on/scaler" + "github.com/kedify/otel-add-on/types" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -187,8 +188,8 @@ func startGrpcServer( ctx context.Context, cfg *scaler.Config, lggr logr.Logger, - ms metric.MemStore, - mp metric.Parser, + ms types.MemStore, + mp types.Parser, port int, targetPendingRequests int64, ) error { diff --git a/metric/mem_store.go b/metric/mem_store.go index 4a77e05..9daa740 100644 --- a/metric/mem_store.go +++ b/metric/mem_store.go @@ -9,21 +9,22 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" + "github.com/kedify/otel-add-on/types" "github.com/kedify/otel-add-on/util" ) type ms struct { - store map[MetricName]StoredMetrics + store map[types.MetricName]types.StoredMetrics stalePeriodSeconds int } -func (m ms) Get(name MetricName, searchLabels Labels, timeOp OperationOverTime, defaultAggregation AggregationOverVectors) (float64, Found, error) { +func (m ms) Get(name types.MetricName, searchLabels types.Labels, timeOp types.OperationOverTime, defaultAggregation types.AggregationOverVectors) (float64, types.Found, error) { now := time.Now().Unix() if _, found := m.store[name]; !found { // not found return -1., false, nil } - if err := checkTimeOp(timeOp); err != nil { + if err := util.CheckTimeOp(timeOp); err != nil { return -1., false, err } if err := checkDefaultAggregation(defaultAggregation); err != nil { @@ -66,27 +67,18 @@ func (m ms) Get(name MetricName, searchLabels Labels, timeOp OperationOverTime, return accumulator, true, nil } -func checkDefaultAggregation(aggregation AggregationOverVectors) error { +func checkDefaultAggregation(aggregation types.AggregationOverVectors) error { switch aggregation { - case VecSum, VecAvg, VecMin, VecMax: + case types.VecSum, types.VecAvg, types.VecMin, types.VecMax: return nil default: return fmt.Errorf("unknown AggregationOverVectors:%s", aggregation) } } -func checkTimeOp(op OperationOverTime) error { - switch op { - case OpLastOne, OpRate, OpCount, OpAvg, OpMin, OpMax: - return nil - default: - return fmt.Errorf("unknown OperationOverTime:%s", op) - } -} - -func (m ms) Put(entry NewMetricEntry) { +func (m ms) Put(entry types.NewMetricEntry) { if _, found := m.store[entry.Name]; !found { - m.store[entry.Name] = make(map[LabelsHash]MetricData) + m.store[entry.Name] = make(map[types.LabelsHash]types.MetricData) } now := time.Now().Unix() labelsH := hashOfMap(entry.Labels) @@ -96,11 +88,11 @@ func (m ms) Put(entry NewMetricEntry) { } else { // found md := m.store[entry.Name][labelsH] - notStale := util.Filter(md.Data, func(val ObservedValue) bool { + notStale := util.Filter(md.Data, func(val types.ObservedValue) bool { return !m.isStale(val.Time, now) }) fmt.Sprintf("not stale: %v", notStale) - md.Data = append(notStale, ObservedValue{ + md.Data = append(notStale, types.ObservedValue{ Time: entry.Time, Value: entry.Value, }) @@ -110,14 +102,14 @@ func (m ms) Put(entry NewMetricEntry) { } } -func NewMetricStore(stalePeriodSeconds int) MemStore { +func NewMetricStore(stalePeriodSeconds int) types.MemStore { return ms{ - store: make(map[MetricName]StoredMetrics), + store: make(map[types.MetricName]types.StoredMetrics), stalePeriodSeconds: stalePeriodSeconds, } } -func hashOfMap(m Labels) LabelsHash { +func hashOfMap(m types.Labels) types.LabelsHash { h := sha256.New() keys := make([]string, len(m)) i := 0 @@ -133,66 +125,66 @@ func hashOfMap(m Labels) LabelsHash { b = sha256.Sum256([]byte(fmt.Sprintf("%v", v))) h.Write(b[:]) } - return LabelsHash(fmt.Sprintf("%x", h.Sum(nil))) + return types.LabelsHash(fmt.Sprintf("%x", h.Sum(nil))) } func (m ms) isStale(datapoint pcommon.Timestamp, now int64) bool { return now-int64(m.stalePeriodSeconds) > int64(datapoint) } -func (m ms) calculateAggregate(value float64, counter int, accumulator float64, aggregation AggregationOverVectors) float64 { +func (m ms) calculateAggregate(value float64, counter int, accumulator float64, aggregation types.AggregationOverVectors) float64 { if counter == 1 { return value } switch aggregation { - case VecSum: + case types.VecSum: return accumulator + value - case VecAvg: + case types.VecAvg: // calculate the avg on the fly to avoid potential overflows, // idea: each number adds 1/count of itself to the final result c := float64(counter) cMinusOne := float64(counter - 1) return ((accumulator / c) * cMinusOne) + (value / c) - case VecMin: + case types.VecMin: return math.Min(accumulator, value) - case VecMax: + case types.VecMax: return math.Max(accumulator, value) default: panic("unknown aggregation function: " + aggregation) } } -func newMetricDatapoint(entry NewMetricEntry) MetricData { - return MetricData{ +func newMetricDatapoint(entry types.NewMetricEntry) types.MetricData { + return types.MetricData{ Labels: entry.Labels, LastUpdate: entry.Time, - Data: []ObservedValue{ + Data: []types.ObservedValue{ { Time: entry.Time, Value: entry.Value, }, }, - AggregatesOverTime: map[OperationOverTime]float64{ - OpMin: entry.Value, - OpMax: entry.Value, - OpAvg: entry.Value, - OpLastOne: entry.Value, - OpCount: 1, - OpRate: 0, + AggregatesOverTime: map[types.OperationOverTime]float64{ + types.OpMin: entry.Value, + types.OpMax: entry.Value, + types.OpAvg: entry.Value, + types.OpLastOne: entry.Value, + types.OpCount: 1, + types.OpRate: 0, }, } } -func (m ms) updateAggregatesOverTime(md MetricData) { +func (m ms) updateAggregatesOverTime(md types.MetricData) { for i := 0; i < len(md.Data); i++ { - for _, op := range []OperationOverTime{OpMin, OpMax, OpAvg} { - md.AggregatesOverTime[op] = m.calculateAggregate(md.Data[i].Value, i+1, md.AggregatesOverTime[op], AggregationOverVectors(op)) + for _, op := range []types.OperationOverTime{types.OpMin, types.OpMax, types.OpAvg} { + md.AggregatesOverTime[op] = m.calculateAggregate(md.Data[i].Value, i+1, md.AggregatesOverTime[op], types.AggregationOverVectors(op)) } } - md.AggregatesOverTime[OpRate] = (md.Data[len(md.Data)-1].Value - md.Data[0].Value) / float64(md.Data[len(md.Data)-1].Time-md.Data[0].Time) - md.AggregatesOverTime[OpCount] = float64(len(md.Data)) - md.AggregatesOverTime[OpLastOne] = md.Data[len(md.Data)-1].Value + md.AggregatesOverTime[types.OpRate] = (md.Data[len(md.Data)-1].Value - md.Data[0].Value) / float64(md.Data[len(md.Data)-1].Time-md.Data[0].Time) + md.AggregatesOverTime[types.OpCount] = float64(len(md.Data)) + md.AggregatesOverTime[types.OpLastOne] = md.Data[len(md.Data)-1].Value } // enforce iface impl -var _ MemStore = new(ms) +var _ types.MemStore = new(ms) diff --git a/metric/mem_store_test.go b/metric/mem_store_test.go index 04652c6..a5045c0 100644 --- a/metric/mem_store_test.go +++ b/metric/mem_store_test.go @@ -6,6 +6,8 @@ import ( "time" "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/kedify/otel-add-on/types" ) const ( @@ -16,9 +18,9 @@ const ( func TestMemStorePutOneAndGetOne(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 42., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -29,16 +31,16 @@ func TestMemStorePutOneAndGetOne(t *testing.T) { }) // check - val, found, err := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, OpLastOne, VecSum) + val, found, err := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val, found, err, 42.) } func TestMemStoreErr(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 42., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -49,18 +51,18 @@ func TestMemStoreErr(t *testing.T) { }) // check - _, _, err1 := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, OpLastOne+"_typo", VecSum) + _, _, err1 := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, types.OpLastOne+"_typo", types.VecSum) assertMetricErr(t, err1) - _, _, err2 := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, OpLastOne, "typo_"+VecSum) + _, _, err2 := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, types.OpLastOne, "typo_"+types.VecSum) assertMetricErr(t, err2) } func TestMemStoreGetNotFound(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 42., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -71,28 +73,28 @@ func TestMemStoreGetNotFound(t *testing.T) { }) // check - val1, found1, err1 := ms.Get("metric-404", map[string]any{"b": "2", "a": "1"}, OpLastOne, VecSum) + val1, found1, err1 := ms.Get("metric-404", map[string]any{"b": "2", "a": "1"}, types.OpLastOne, types.VecSum) assertMetricNotFound(t, val1, found1, err1) if found1 { t.Errorf("expected: [false], got: [%v]", bool(found1)) } - val2, found2, err2 := ms.Get("metric-1", map[string]any{"bb": "2", "a": "1"}, OpLastOne, VecSum) + val2, found2, err2 := ms.Get("metric-1", map[string]any{"bb": "2", "a": "1"}, types.OpLastOne, types.VecSum) assertMetricNotFound(t, val2, found2, err2) - val3, found3, err3 := ms.Get("metric-1", map[string]any{"bb": "2", "a": "1", "c": "3"}, OpLastOne, VecSum) + val3, found3, err3 := ms.Get("metric-1", map[string]any{"bb": "2", "a": "1", "c": "3"}, types.OpLastOne, types.VecSum) assertMetricNotFound(t, val3, found3, err3) - val4, found4, err4 := ms.Get("metric-1", map[string]any{}, OpLastOne, VecSum) + val4, found4, err4 := ms.Get("metric-1", map[string]any{}, types.OpLastOne, types.VecSum) assertMetricNotFound(t, val4, found4, err4) } func TestMemStoreOperationLastOne(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 42., Time: pcommon.Timestamp(time.Now().Unix() - 1), }, @@ -101,9 +103,9 @@ func TestMemStoreOperationLastOne(t *testing.T) { "b": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 45., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -114,16 +116,16 @@ func TestMemStoreOperationLastOne(t *testing.T) { }) // check - val, found, err := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, OpLastOne, VecSum) + val, found, err := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val, found, err, 45.) } func TestMemStorePutTwoAndGetTwo(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 42., Time: pcommon.Timestamp(time.Now().Unix() - 1), }, @@ -132,9 +134,9 @@ func TestMemStorePutTwoAndGetTwo(t *testing.T) { "b": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric2", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 45., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -145,19 +147,19 @@ func TestMemStorePutTwoAndGetTwo(t *testing.T) { }) // check - val1, found1, err1 := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, OpLastOne, VecSum) + val1, found1, err1 := ms.Get("metric1", map[string]any{"b": "2", "a": "1"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val1, found1, err1, 42.) - val2, found2, err2 := ms.Get("metric2", map[string]any{"bb": "20", "aa": "10"}, OpLastOne, VecSum) + val2, found2, err2 := ms.Get("metric2", map[string]any{"bb": "20", "aa": "10"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val2, found2, err2, 45.) } func TestMemStoreSumAcrossDifferentMetrics(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 1., Time: pcommon.Timestamp(time.Now().Unix() - 1), }, @@ -167,9 +169,9 @@ func TestMemStoreSumAcrossDifferentMetrics(t *testing.T) { "c": "1", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 2., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -179,9 +181,9 @@ func TestMemStoreSumAcrossDifferentMetrics(t *testing.T) { "c": "1", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 3., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -191,9 +193,9 @@ func TestMemStoreSumAcrossDifferentMetrics(t *testing.T) { "c": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 4., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -203,9 +205,9 @@ func TestMemStoreSumAcrossDifferentMetrics(t *testing.T) { "c": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric2", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 5., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -217,22 +219,22 @@ func TestMemStoreSumAcrossDifferentMetrics(t *testing.T) { }) // check - val1, found1, err1 := ms.Get("metric1", map[string]any{"a": "1", "c": "1"}, OpLastOne, VecSum) + val1, found1, err1 := ms.Get("metric1", map[string]any{"a": "1", "c": "1"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val1, found1, err1, 3.) - val2, found2, err2 := ms.Get("metric1", map[string]any{"a": "1", "c": "2"}, OpLastOne, VecSum) + val2, found2, err2 := ms.Get("metric1", map[string]any{"a": "1", "c": "2"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val2, found2, err2, 7.) - val3, found3, err3 := ms.Get("metric1", map[string]any{"a": "1"}, OpLastOne, VecSum) + val3, found3, err3 := ms.Get("metric1", map[string]any{"a": "1"}, types.OpLastOne, types.VecSum) assertMetricFound(t, val3, found3, err3, 10.) } func TestMemStoreAvg(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 1., Time: pcommon.Timestamp(time.Now().Unix() - 1), }, @@ -242,9 +244,9 @@ func TestMemStoreAvg(t *testing.T) { "c": "1", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 2., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -254,9 +256,9 @@ func TestMemStoreAvg(t *testing.T) { "c": "1", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 3., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -266,9 +268,9 @@ func TestMemStoreAvg(t *testing.T) { "c": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 4., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -278,9 +280,9 @@ func TestMemStoreAvg(t *testing.T) { "c": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric2", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 5., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -292,22 +294,22 @@ func TestMemStoreAvg(t *testing.T) { }) // check - val1, found1, err1 := ms.Get("metric1", map[string]any{"a": "1", "c": "1"}, OpLastOne, VecAvg) + val1, found1, err1 := ms.Get("metric1", map[string]any{"a": "1", "c": "1"}, types.OpLastOne, types.VecAvg) assertMetricFound(t, val1, found1, err1, 1.5) - val2, found2, err2 := ms.Get("metric1", map[string]any{"a": "1", "c": "2"}, OpLastOne, VecAvg) + val2, found2, err2 := ms.Get("metric1", map[string]any{"a": "1", "c": "2"}, types.OpLastOne, types.VecAvg) assertMetricFound(t, val2, found2, err2, 3.5) - val3, found3, err3 := ms.Get("metric1", map[string]any{"a": "1"}, OpLastOne, VecAvg) + val3, found3, err3 := ms.Get("metric1", map[string]any{"a": "1"}, types.OpLastOne, types.VecAvg) assertMetricFound(t, val3, found3, err3, 2.5) } func TestMemStoreMinMax(t *testing.T) { // setup ms := NewMetricStore(5) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 1., Time: pcommon.Timestamp(time.Now().Unix() - 1), }, @@ -317,9 +319,9 @@ func TestMemStoreMinMax(t *testing.T) { "c": "1", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 2., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -329,9 +331,9 @@ func TestMemStoreMinMax(t *testing.T) { "c": "1", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 3., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -341,9 +343,9 @@ func TestMemStoreMinMax(t *testing.T) { "c": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric1", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 4., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -353,9 +355,9 @@ func TestMemStoreMinMax(t *testing.T) { "c": "2", }, }) - ms.Put(NewMetricEntry{ + ms.Put(types.NewMetricEntry{ Name: "metric2", - ObservedValue: ObservedValue{ + ObservedValue: types.ObservedValue{ Value: 5., Time: pcommon.Timestamp(time.Now().Unix()), }, @@ -367,16 +369,16 @@ func TestMemStoreMinMax(t *testing.T) { }) // check - val1, found1, err1 := ms.Get("metric1", map[string]any{"a": "1", "c": "1"}, OpLastOne, VecMin) + val1, found1, err1 := ms.Get("metric1", map[string]any{"a": "1", "c": "1"}, types.OpLastOne, types.VecMin) assertMetricFound(t, val1, found1, err1, 1.) - val2, found2, err2 := ms.Get("metric1", map[string]any{"a": "1", "c": "2"}, OpLastOne, VecMin) + val2, found2, err2 := ms.Get("metric1", map[string]any{"a": "1", "c": "2"}, types.OpLastOne, types.VecMin) assertMetricFound(t, val2, found2, err2, 3.) - val3, found3, err3 := ms.Get("metric1", map[string]any{"a": "1"}, OpLastOne, VecMin) + val3, found3, err3 := ms.Get("metric1", map[string]any{"a": "1"}, types.OpLastOne, types.VecMin) assertMetricFound(t, val3, found3, err3, 1.) - val4, found4, err4 := ms.Get("metric1", map[string]any{"a": "1"}, OpLastOne, VecMax) + val4, found4, err4 := ms.Get("metric1", map[string]any{"a": "1"}, types.OpLastOne, types.VecMax) assertMetricFound(t, val4, found4, err4, 4.) } @@ -388,7 +390,7 @@ func TestMemStoreAvgOverTime(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 10, labels, 1., 2., 3., 4., 5.) - val, found, err := ms.Get(MetricName(name), labels, OpAvg, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpAvg, types.VecSum) assertMetricFound(t, val, found, err, 3.) } @@ -400,7 +402,7 @@ func TestMemStoreAvgOverTimeStale(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 10, labels, 1., 2., 3., 4., 5.) - val, found, err := ms.Get(MetricName(name), labels, OpAvg, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpAvg, types.VecSum) assertMetricFound(t, val, found, err, 4.5) } @@ -412,7 +414,7 @@ func TestMemStoreMinOverTime(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 10, labels, 99., 2., 1., 4., 5.) - val, found, err := ms.Get(MetricName(name), labels, OpMin, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpMin, types.VecSum) assertMetricFound(t, val, found, err, 1.) } @@ -424,7 +426,7 @@ func TestMemStoreLastOneOverTime(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 10, labels, 99., 2., 1., 4., 5.) - val, found, err := ms.Get(MetricName(name), labels, OpLastOne, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpLastOne, types.VecSum) assertMetricFound(t, val, found, err, 5.) } @@ -436,7 +438,7 @@ func TestMemStoreMinOverTimeStale(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 10, labels, 1., 2., 3., 4., 5.) - val, found, err := ms.Get(MetricName(name), labels, OpMin, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpMin, types.VecSum) assertMetricFound(t, val, found, err, 3.) } @@ -448,7 +450,7 @@ func TestMemStoreCountsOverTime(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 10, labels, 1., 2., 3., 4., 5., 6.) - val, found, err := ms.Get(MetricName(name), labels, OpCount, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpCount, types.VecSum) assertMetricFound(t, val, found, err, 6.) } @@ -460,7 +462,7 @@ func TestMemStoreRateOverTime1(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 1, labels, 1., 2., 3., 4., 5., 6.) - val, found, err := ms.Get(MetricName(name), labels, OpRate, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpRate, types.VecSum) assertMetricFound(t, val, found, err, 1.) } @@ -472,7 +474,7 @@ func TestMemStoreRateOverTime2(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 2, labels, 1., 2., 3., 4., 5., 6.) - val, found, err := ms.Get(MetricName(name), labels, OpRate, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpRate, types.VecSum) assertMetricFound(t, val, found, err, .5) } @@ -484,7 +486,7 @@ func TestMemStoreRateOverTime3(t *testing.T) { } name := "m3t/r1c" setupMetrics(ms, name, 1, labels, 1., 3., 5., 7., 9., 11.) - val, found, err := ms.Get(MetricName(name), labels, OpRate, VecSum) + val, found, err := ms.Get(types.MetricName(name), labels, types.OpRate, types.VecSum) assertMetricFound(t, val, found, err, 2.) } @@ -503,18 +505,18 @@ func TestMemStoreSumOverAverages(t *testing.T) { setupMetrics(ms, name1, 1, labels1, 1., 2., 3., 4., 5., 6.) // avg is 3.5 setupMetrics(ms, name1, 1, labels2, 2., 2., 2., 4., 2., 2.) // avg is 2.333 setupMetrics(ms, "noise", 1, labels2, 1., 2., 3., 4., 5.) // this shouldn't be included - val, found, err := ms.Get(MetricName(name1), map[string]any{ + val, found, err := ms.Get(types.MetricName(name1), map[string]any{ "a": "1", - }, OpAvg, VecSum) + }, types.OpAvg, types.VecSum) assertMetricFound(t, val, found, err, 3.5+2.333) } -func setupMetrics(store MemStore, name string, secondsStep int64, labels map[string]any, vals ...float64) { +func setupMetrics(store types.MemStore, name string, secondsStep int64, labels map[string]any, vals ...float64) { now := time.Now().Unix() for i, v := range vals { - store.Put(NewMetricEntry{ - Name: MetricName(name), - ObservedValue: ObservedValue{ + store.Put(types.NewMetricEntry{ + Name: types.MetricName(name), + ObservedValue: types.ObservedValue{ Value: v, Time: pcommon.Timestamp(now - int64(len(vals))*secondsStep + int64(i)*secondsStep), }, @@ -523,17 +525,17 @@ func setupMetrics(store MemStore, name string, secondsStep int64, labels map[str } } -func assertMetric(t *testing.T, val float64, found Found, err error, expectedVal float64, expectedFound bool, expectedErr error) { +func assertMetric(t *testing.T, val float64, found types.Found, err error, expectedVal float64, expectedFound bool, expectedErr error) { if err != expectedErr || bool(found) != expectedFound || !equalsFloat(val, expectedVal) { t.Errorf("expected: [%f, %v, %v], got: [%f, %v, %v]", expectedVal, expectedFound, expectedErr, val, found, err) } } -func assertMetricNotFound(t *testing.T, val float64, found Found, err error) { +func assertMetricNotFound(t *testing.T, val float64, found types.Found, err error) { assertMetric(t, val, found, err, NotFoundVal, false, nil) } -func assertMetricFound(t *testing.T, val float64, found Found, err error, expectedVal float64) { +func assertMetricFound(t *testing.T, val float64, found types.Found, err error, expectedVal float64) { assertMetric(t, val, found, err, expectedVal, true, nil) } diff --git a/metric/simple_parser.go b/metric/simple_parser.go index 6d57f87..bdc23a7 100644 --- a/metric/simple_parser.go +++ b/metric/simple_parser.go @@ -3,25 +3,27 @@ package metric import ( "fmt" "strings" + + "github.com/kedify/otel-add-on/types" ) type p struct { } // enforce iface impl -var _ Parser = new(p) +var _ types.Parser = new(p) -func NewParser() Parser { +func NewParser() types.Parser { return p{} } -func (p p) Parse(metricQuery string) (MetricName, Labels, AggregationOverVectors, error) { +func (p p) Parse(metricQuery string) (types.MetricName, types.Labels, types.AggregationOverVectors, error) { if metricQuery == "" { return "", nil, "", fmt.Errorf("unable to parse metric query: %s", metricQuery) } mq := strings.TrimSpace(metricQuery) - aggregateFunction := VecSum // default - for _, aggFn := range []AggregationOverVectors{VecSum, VecAvg, VecMin, VecMax} { + aggregateFunction := types.VecSum // default + for _, aggFn := range []types.AggregationOverVectors{types.VecSum, types.VecAvg, types.VecMin, types.VecMax} { if strings.HasPrefix(mq, string(aggFn)+"(") && strings.HasSuffix(mq, ")") { aggregateFunction = aggFn mq = strings.TrimPrefix(mq, string(aggFn)+"(") @@ -34,9 +36,9 @@ func (p p) Parse(metricQuery string) (MetricName, Labels, AggregationOverVectors return "", nil, "", fmt.Errorf("unable to parse metric query: %s", metricQuery) } if first == -1 && last == -1 { // no labels specified - return MetricName(mq), map[string]any{}, aggregateFunction, nil + return types.MetricName(mq), map[string]any{}, aggregateFunction, nil } - metricName := MetricName(mq[:first]) + metricName := types.MetricName(mq[:first]) labels, err := p.ParseLabels(mq[first+1 : last]) if err != nil { return "", nil, "", err @@ -44,7 +46,7 @@ func (p p) Parse(metricQuery string) (MetricName, Labels, AggregationOverVectors return metricName, labels, aggregateFunction, nil } -func (p p) ParseLabels(labelsQuery string) (Labels, error) { +func (p p) ParseLabels(labelsQuery string) (types.Labels, error) { lq := strings.TrimSpace(labelsQuery) if lq == "" { return nil, fmt.Errorf("unable to parse labels: %s", lq) diff --git a/metric/simple_parser_test.go b/metric/simple_parser_test.go index 7b95632..00ba7f4 100644 --- a/metric/simple_parser_test.go +++ b/metric/simple_parser_test.go @@ -3,6 +3,8 @@ package metric import ( "fmt" "testing" + + "github.com/kedify/otel-add-on/types" ) func TestSimpleParserOk(t *testing.T) { @@ -11,7 +13,7 @@ func TestSimpleParserOk(t *testing.T) { // check name, labels, agg, err := p.Parse("avg(metric_foo_bar{a=1, b=2})") - if name != "metric_foo_bar" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"b": "2", "a": "1"}) || agg != VecAvg || err != nil { + if name != "metric_foo_bar" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"b": "2", "a": "1"}) || agg != types.VecAvg || err != nil { t.Errorf("expected: [metric_foo_bar, map[a:1 b:2], avg, ], got: [%s, %v, %v, %v]", name, labels, agg, err) } } @@ -56,7 +58,7 @@ func TestSimpleParserDefaultAgg(t *testing.T) { // check name, labels, agg, err := p.Parse("metric_foo{a=1, b=2, c=5}") - if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"b": "2", "a": "1", "c": "5"}) || agg != VecSum || err != nil { + if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"b": "2", "a": "1", "c": "5"}) || agg != types.VecSum || err != nil { t.Errorf("expected: [metric_foo, map[a:1 b:2 c:5], avg, ], got: [%s, %v, %v, %v]", name, labels, agg, err) } } @@ -67,7 +69,7 @@ func TestSimpleParserMin(t *testing.T) { // check name, labels, agg, err := p.Parse("min(metric_foo{ahoj=cau})") - if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"ahoj": "cau"}) || agg != VecMin || err != nil { + if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"ahoj": "cau"}) || agg != types.VecMin || err != nil { t.Errorf("expected: [metric_foo, map[ahoj:cau], min, ], got: [%s, %v, %v, %v]", name, labels, agg, err) } } @@ -78,7 +80,7 @@ func TestSimpleParserNoLabels(t *testing.T) { // check name, labels, agg, err := p.Parse("max(metric_foo)") - if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{}) || agg != VecMax || err != nil { + if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{}) || agg != types.VecMax || err != nil { t.Errorf("expected: [metric_foo, map[], max, ], got: [%s, %v, %v, %v]", name, labels, agg, err) } } @@ -89,7 +91,7 @@ func TestSimpleParserNoLabelsNoAgg(t *testing.T) { // check name, labels, agg, err := p.Parse("hello") - if name != "hello" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{}) || agg != VecSum || err != nil { + if name != "hello" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{}) || agg != types.VecSum || err != nil { t.Errorf("expected: [hello, map[], sum, ], got: [%s, %v, %v, %v]", name, labels, agg, err) } } diff --git a/receiver/receiver.go b/receiver/receiver.go index cf788a7..3f8f4bd 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -15,7 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" - "github.com/kedify/otel-add-on/metric" + "github.com/kedify/otel-add-on/types" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -41,13 +41,13 @@ type otlpReceiver struct { obsrepGRPC *receiverhelper.ObsReport settings *receiver.Settings - metricMemStore metric.MemStore + metricMemStore types.MemStore } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func NewOtlpReceiver(cfg *otlpreceiver.Config, set *receiver.Settings, memStore metric.MemStore) (*otlpReceiver, error) { +func NewOtlpReceiver(cfg *otlpreceiver.Config, set *receiver.Settings, memStore types.MemStore) (*otlpReceiver, error) { r := &otlpReceiver{ cfg: cfg, nextMetrics: nil, @@ -133,11 +133,11 @@ type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics obsreport *receiverhelper.ObsReport - metricMemStore metric.MemStore + metricMemStore types.MemStore } // New creates a new Receiver reference. -func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport, memStore metric.MemStore) *Receiver { +func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport, memStore types.MemStore) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsreport: obsreport, @@ -179,9 +179,9 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p fmt.Printf(" tags: %+v\n", datapoint.Attributes().AsRaw()) value := math.Max(datapoint.DoubleValue(), float64(datapoint.IntValue())) fmt.Printf(" value: %+v\n", value) - r.metricMemStore.Put(metric.NewMetricEntry{ - Name: metric.MetricName(metrics.At(k).Name()), - ObservedValue: metric.ObservedValue{ + r.metricMemStore.Put(types.NewMetricEntry{ + Name: types.MetricName(metrics.At(k).Name()), + ObservedValue: types.ObservedValue{ Value: value, Time: datapoint.Timestamp(), }, diff --git a/scaler/handlers.go b/scaler/handlers.go index 45e9070..8dab556 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -7,18 +7,17 @@ package scaler import ( "context" "errors" - "fmt" + "math" "strconv" "time" "github.com/go-logr/logr" "github.com/kedacore/keda/v2/pkg/scalers/externalscaler" - "github.com/kedify/otel-add-on/metric" + "github.com/kedify/otel-add-on/types" + "github.com/kedify/otel-add-on/util" "google.golang.org/protobuf/types/known/emptypb" - - "github.com/kedacore/http-add-on/pkg/util" ) const ( @@ -38,8 +37,8 @@ func init() { type impl struct { lggr logr.Logger - metricStore metric.MemStore - metricParser metric.Parser + metricStore types.MemStore + metricParser types.Parser //soInformer informersv1alpha1.ScaledObjectInformer targetMetric int64 externalscaler.UnimplementedExternalScalerServer @@ -47,8 +46,8 @@ type impl struct { func New( lggr logr.Logger, - metricStore metric.MemStore, - metricParser metric.Parser, + metricStore types.MemStore, + metricParser types.Parser, //soInformer informersv1alpha1.ScaledObjectInformer, defaultTargetMetric int64, ) *impl { @@ -219,27 +218,21 @@ func (e *impl) GetMetrics( //metricName := namespacedName.Name scalerMetadata := sor.GetScalerMetadata() - metricQuery, ok := scalerMetadata["metricQuery"] - if !ok { - err := fmt.Errorf("unable to get metric query from scaled object's metadata") - lggr.Error(err, "GetMetrics") - return nil, err - } - name, labels, agg, err := e.metricParser.Parse(metricQuery) + metricName, labels, agg, err := util.GetMetricQuery(lggr, scalerMetadata, e.metricParser) if err != nil { - lggr.Error(err, "GetMetrics") return nil, err } - // todo: time op - value, stale, found := e.metricStore.Get(name, labels, metric.OpLastOne, agg) - lggr.V(1).Info("got metric value: ", "value", value, "stale", stale, "found", found) + opOverTime := util.GetOperationOvertTime(lggr, scalerMetadata) + value, found, err := e.metricStore.Get(metricName, labels, opOverTime, agg) + lggr.V(1).Info("got metric value: ", "value", value, "found", found, "error", err) + value = util.ClampValue(lggr, value, scalerMetadata) res := &externalscaler.GetMetricsResponse{ MetricValues: []*externalscaler.MetricValue{ { - MetricName: metricQuery, - MetricValue: int64(value), + MetricName: string(metricName), + MetricValue: int64(math.Round(value)), }, }, } diff --git a/metric/types.go b/types/metrics.go similarity index 97% rename from metric/types.go rename to types/metrics.go index 65b5a5f..a7f864d 100644 --- a/metric/types.go +++ b/types/metrics.go @@ -1,4 +1,4 @@ -package metric +package types import "go.opentelemetry.io/collector/pdata/pcommon" @@ -70,6 +70,6 @@ type MemStore interface { } type Parser interface { - // Parse parses the metric queyr provided as a string + // Parse parses the metric query provided as a string Parse(string) (MetricName, Labels, AggregationOverVectors, error) } diff --git a/util/handler_specific.go b/util/handler_specific.go new file mode 100644 index 0000000..f592431 --- /dev/null +++ b/util/handler_specific.go @@ -0,0 +1,75 @@ +package util + +import ( + "fmt" + "math" + "strconv" + + "github.com/go-logr/logr" + + "github.com/kedify/otel-add-on/types" +) + +const ( + MetadataClampMin = "clampMin" + MetadataClampMax = "clampMax" + MetadataMetricQuery = "metricQuery" + MetadataOperationOverTime = "operationOverTime" + + MetadataOperationOverTimeDefaultValue = types.OpLastOne +) + +func ClampValue(lggr logr.Logger, value float64, metadata map[string]string) float64 { + clampMin, clampMinFound := metadata[MetadataClampMin] + clampMax, clampMaxFound := metadata[MetadataClampMax] + if clampMinFound { + mi, e := strconv.Atoi(clampMin) + if e != nil { + lggr.Info(" warning: cannot convert clampMin value: ", "value", clampMin, "error", e) + } else { + value = math.Max(value, float64(mi)) + } + } + if clampMaxFound { + ma, e := strconv.Atoi(clampMax) + if e != nil { + lggr.Info(" warning: cannot convert clampMax value: ", "value", clampMax, "error", e) + } else { + value = math.Min(value, float64(ma)) + } + } + return value +} + +func GetOperationOvertTime(lggr logr.Logger, metadata map[string]string) types.OperationOverTime { + operationOverTime, operationOverTimeFound := metadata[MetadataOperationOverTime] + if !operationOverTimeFound { + return MetadataOperationOverTimeDefaultValue + } + if err := CheckTimeOp(types.OperationOverTime(operationOverTime)); err != nil { + lggr.Info(" warning: cannot convert read operationOverTime: ", "operationOverTime", operationOverTime) + return MetadataOperationOverTimeDefaultValue + } + return types.OperationOverTime(operationOverTime) +} + +func GetMetricQuery(lggr logr.Logger, metadata map[string]string, mp types.Parser) (types.MetricName, types.Labels, types.AggregationOverVectors, error) { + metricQuery, ok := metadata["metricQuery"] + if !ok { + err := fmt.Errorf("unable to get metric query from scaled object's metadata") + lggr.Error(err, "GetMetrics") + return "", nil, "", err + } + name, labels, agg, err := mp.Parse(metricQuery) + if err != nil { + lggr.Error(err, "GetMetrics") + return "", nil, "", err + } + + if !ok { + err := fmt.Errorf("unable to get metric query from scaled object's metadata") + lggr.Error(err, "GetMetrics") + return "", nil, "", err + } + return name, labels, agg, nil +} diff --git a/util/helpers.go b/util/helpers.go index 9141a85..d2b79da 100644 --- a/util/helpers.go +++ b/util/helpers.go @@ -1,5 +1,14 @@ package util +import ( + "fmt" + "os" + "strconv" + "time" + + "github.com/kedify/otel-add-on/types" +) + func Map[I, R any](input []I, f func(I) R) []R { result := make([]R, len(input)) for i := range input { @@ -35,3 +44,43 @@ func Filter2[I any](input []I, f func(I) bool) []I { } }) } + +func ResolveOsEnvBool(envName string, defaultValue bool) (bool, error) { + valueStr, found := os.LookupEnv(envName) + + if found && valueStr != "" { + return strconv.ParseBool(valueStr) + } + + return defaultValue, nil +} + +func ResolveOsEnvInt(envName string, defaultValue int) (int, error) { + valueStr, found := os.LookupEnv(envName) + + if found && valueStr != "" { + return strconv.Atoi(valueStr) + } + + return defaultValue, nil +} + +func ResolveOsEnvDuration(envName string) (*time.Duration, error) { + valueStr, found := os.LookupEnv(envName) + + if found && valueStr != "" { + value, err := time.ParseDuration(valueStr) + return &value, err + } + + return nil, nil +} + +func CheckTimeOp(op types.OperationOverTime) error { + switch op { + case types.OpLastOne, types.OpRate, types.OpCount, types.OpAvg, types.OpMin, types.OpMax: + return nil + default: + return fmt.Errorf("unknown OperationOverTime:%s", op) + } +}