Skip to content

Commit

Permalink
More tests, glueing things together, breaking import cycle
Browse files Browse the repository at this point in the history
Signed-off-by: Jirka Kremser <[email protected]>
  • Loading branch information
jkremser committed Oct 9, 2024
1 parent c2733c4 commit 3dd1e97
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 174 deletions.
11 changes: 9 additions & 2 deletions examples/so.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@ spec:
- type: external
metadata:
scalerAddress: keda-otel-scaler:4318

# required
metricQuery: "sum(http/server/request_count{app_id=nodeapp, method=GET, path=/v1.0/state/statestore})"

# optional - default no limit
clampMin: 1

# optional - default no limit
clampMax: 10
operationOverTime: last_one/rate/count/average/min/max
overTimePeriodSeconds: 60

# optional - default 'last_one', available values: last_one,rate,count,avg,min,max
operationOverTime: last_one
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kedify/otel-add-on/metric"
"github.com/kedify/otel-add-on/receiver"
"github.com/kedify/otel-add-on/scaler"
"github.com/kedify/otel-add-on/types"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
Expand Down Expand Up @@ -187,8 +188,8 @@ func startGrpcServer(
ctx context.Context,
cfg *scaler.Config,
lggr logr.Logger,
ms metric.MemStore,
mp metric.Parser,
ms types.MemStore,
mp types.Parser,
port int,
targetPendingRequests int64,
) error {
Expand Down
80 changes: 36 additions & 44 deletions metric/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,22 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/kedify/otel-add-on/types"
"github.com/kedify/otel-add-on/util"
)

type ms struct {
store map[MetricName]StoredMetrics
store map[types.MetricName]types.StoredMetrics
stalePeriodSeconds int
}

func (m ms) Get(name MetricName, searchLabels Labels, timeOp OperationOverTime, defaultAggregation AggregationOverVectors) (float64, Found, error) {
func (m ms) Get(name types.MetricName, searchLabels types.Labels, timeOp types.OperationOverTime, defaultAggregation types.AggregationOverVectors) (float64, types.Found, error) {
now := time.Now().Unix()
if _, found := m.store[name]; !found {
// not found
return -1., false, nil
}
if err := checkTimeOp(timeOp); err != nil {
if err := util.CheckTimeOp(timeOp); err != nil {
return -1., false, err
}
if err := checkDefaultAggregation(defaultAggregation); err != nil {
Expand Down Expand Up @@ -66,27 +67,18 @@ func (m ms) Get(name MetricName, searchLabels Labels, timeOp OperationOverTime,
return accumulator, true, nil
}

func checkDefaultAggregation(aggregation AggregationOverVectors) error {
func checkDefaultAggregation(aggregation types.AggregationOverVectors) error {
switch aggregation {
case VecSum, VecAvg, VecMin, VecMax:
case types.VecSum, types.VecAvg, types.VecMin, types.VecMax:
return nil
default:
return fmt.Errorf("unknown AggregationOverVectors:%s", aggregation)
}
}

func checkTimeOp(op OperationOverTime) error {
switch op {
case OpLastOne, OpRate, OpCount, OpAvg, OpMin, OpMax:
return nil
default:
return fmt.Errorf("unknown OperationOverTime:%s", op)
}
}

func (m ms) Put(entry NewMetricEntry) {
func (m ms) Put(entry types.NewMetricEntry) {
if _, found := m.store[entry.Name]; !found {
m.store[entry.Name] = make(map[LabelsHash]MetricData)
m.store[entry.Name] = make(map[types.LabelsHash]types.MetricData)
}
now := time.Now().Unix()
labelsH := hashOfMap(entry.Labels)
Expand All @@ -96,11 +88,11 @@ func (m ms) Put(entry NewMetricEntry) {
} else {
// found
md := m.store[entry.Name][labelsH]
notStale := util.Filter(md.Data, func(val ObservedValue) bool {
notStale := util.Filter(md.Data, func(val types.ObservedValue) bool {
return !m.isStale(val.Time, now)
})
fmt.Sprintf("not stale: %v", notStale)
md.Data = append(notStale, ObservedValue{
md.Data = append(notStale, types.ObservedValue{
Time: entry.Time,
Value: entry.Value,
})
Expand All @@ -110,14 +102,14 @@ func (m ms) Put(entry NewMetricEntry) {
}
}

func NewMetricStore(stalePeriodSeconds int) MemStore {
func NewMetricStore(stalePeriodSeconds int) types.MemStore {
return ms{
store: make(map[MetricName]StoredMetrics),
store: make(map[types.MetricName]types.StoredMetrics),
stalePeriodSeconds: stalePeriodSeconds,
}
}

func hashOfMap(m Labels) LabelsHash {
func hashOfMap(m types.Labels) types.LabelsHash {
h := sha256.New()
keys := make([]string, len(m))
i := 0
Expand All @@ -133,66 +125,66 @@ func hashOfMap(m Labels) LabelsHash {
b = sha256.Sum256([]byte(fmt.Sprintf("%v", v)))
h.Write(b[:])
}
return LabelsHash(fmt.Sprintf("%x", h.Sum(nil)))
return types.LabelsHash(fmt.Sprintf("%x", h.Sum(nil)))
}

func (m ms) isStale(datapoint pcommon.Timestamp, now int64) bool {
return now-int64(m.stalePeriodSeconds) > int64(datapoint)
}

func (m ms) calculateAggregate(value float64, counter int, accumulator float64, aggregation AggregationOverVectors) float64 {
func (m ms) calculateAggregate(value float64, counter int, accumulator float64, aggregation types.AggregationOverVectors) float64 {
if counter == 1 {
return value
}
switch aggregation {
case VecSum:
case types.VecSum:
return accumulator + value
case VecAvg:
case types.VecAvg:
// calculate the avg on the fly to avoid potential overflows,
// idea: each number adds 1/count of itself to the final result
c := float64(counter)
cMinusOne := float64(counter - 1)
return ((accumulator / c) * cMinusOne) + (value / c)
case VecMin:
case types.VecMin:
return math.Min(accumulator, value)
case VecMax:
case types.VecMax:
return math.Max(accumulator, value)
default:
panic("unknown aggregation function: " + aggregation)
}
}

func newMetricDatapoint(entry NewMetricEntry) MetricData {
return MetricData{
func newMetricDatapoint(entry types.NewMetricEntry) types.MetricData {
return types.MetricData{
Labels: entry.Labels,
LastUpdate: entry.Time,
Data: []ObservedValue{
Data: []types.ObservedValue{
{
Time: entry.Time,
Value: entry.Value,
},
},
AggregatesOverTime: map[OperationOverTime]float64{
OpMin: entry.Value,
OpMax: entry.Value,
OpAvg: entry.Value,
OpLastOne: entry.Value,
OpCount: 1,
OpRate: 0,
AggregatesOverTime: map[types.OperationOverTime]float64{
types.OpMin: entry.Value,
types.OpMax: entry.Value,
types.OpAvg: entry.Value,
types.OpLastOne: entry.Value,
types.OpCount: 1,
types.OpRate: 0,
},
}
}

func (m ms) updateAggregatesOverTime(md MetricData) {
func (m ms) updateAggregatesOverTime(md types.MetricData) {
for i := 0; i < len(md.Data); i++ {
for _, op := range []OperationOverTime{OpMin, OpMax, OpAvg} {
md.AggregatesOverTime[op] = m.calculateAggregate(md.Data[i].Value, i+1, md.AggregatesOverTime[op], AggregationOverVectors(op))
for _, op := range []types.OperationOverTime{types.OpMin, types.OpMax, types.OpAvg} {
md.AggregatesOverTime[op] = m.calculateAggregate(md.Data[i].Value, i+1, md.AggregatesOverTime[op], types.AggregationOverVectors(op))
}
}
md.AggregatesOverTime[OpRate] = (md.Data[len(md.Data)-1].Value - md.Data[0].Value) / float64(md.Data[len(md.Data)-1].Time-md.Data[0].Time)
md.AggregatesOverTime[OpCount] = float64(len(md.Data))
md.AggregatesOverTime[OpLastOne] = md.Data[len(md.Data)-1].Value
md.AggregatesOverTime[types.OpRate] = (md.Data[len(md.Data)-1].Value - md.Data[0].Value) / float64(md.Data[len(md.Data)-1].Time-md.Data[0].Time)
md.AggregatesOverTime[types.OpCount] = float64(len(md.Data))
md.AggregatesOverTime[types.OpLastOne] = md.Data[len(md.Data)-1].Value
}

// enforce iface impl
var _ MemStore = new(ms)
var _ types.MemStore = new(ms)
Loading

0 comments on commit 3dd1e97

Please sign in to comment.