Skip to content

Commit

Permalink
enable individual labels from aggregate (#269)
Browse files Browse the repository at this point in the history
* enable individual labels from aggregate

* added to unit test
  • Loading branch information
KalmanMeth authored Aug 1, 2022
1 parent bfbd89b commit 6c1dae1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
24 changes: 15 additions & 9 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Aggregate struct {

type GroupState struct {
normalizedValues NormalizedValues
labels Labels
recentRawValues []float64
recentOpValue float64
recentCount int
Expand Down Expand Up @@ -95,14 +96,14 @@ func (labels Labels) getNormalizedValues() NormalizedValues {
return NormalizedValues(normalizedAsString)
}

func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, NormalizedValues) {
func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, NormalizedValues, Labels) {
labels, allLabelsFound := aggregate.LabelsFromEntry(entry)
if !allLabelsFound {
return fmt.Errorf("missing keys in entry"), ""
return fmt.Errorf("missing keys in entry"), "", nil
}

normalizedValues := labels.getNormalizedValues()
return nil, normalizedValues
return nil, normalizedValues, labels
}

func getInitValue(operation string) float64 {
Expand All @@ -120,15 +121,15 @@ func getInitValue(operation string) float64 {
}
}

func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues) error {
func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues, labels Labels) error {

aggregate.mutex.Lock()
defer aggregate.mutex.Unlock()

var groupState *GroupState
oldEntry, ok := aggregate.cache.GetCacheEntry(string(normalizedValues))
if !ok {
groupState = &GroupState{normalizedValues: normalizedValues}
groupState = &GroupState{normalizedValues: normalizedValues, labels: labels}
initVal := getInitValue(string(aggregate.Definition.Operation))
groupState.totalValue = initVal
groupState.recentOpValue = initVal
Expand Down Expand Up @@ -183,13 +184,13 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error {
for _, entry := range entries {
// filter entries matching labels with aggregates
err, normalizedValues := aggregate.FilterEntry(entry)
err, normalizedValues, labels := aggregate.FilterEntry(entry)
if err != nil {
continue
}

// update aggregate group by entry
err = aggregate.UpdateByEntry(entry, normalizedValues)
err = aggregate.UpdateByEntry(entry, normalizedValues, labels)
if err != nil {
log.Debugf("UpdateByEntry error %v", err)
continue
Expand All @@ -208,7 +209,7 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {
// iterate over the items in the cache
aggregate.cache.Iterate(func(key string, value interface{}) {
group := value.(*GroupState)
metrics = append(metrics, config.GenericMap{
newEntry := config.GenericMap{
"name": aggregate.Definition.Name,
"operation": aggregate.Definition.Operation,
"record_key": aggregate.Definition.RecordKey,
Expand All @@ -220,7 +221,12 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {
"recent_op_value": group.recentOpValue,
"recent_count": group.recentCount,
strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues),
})
}
// add the items in aggregate.Definition.By individually to the entry
for _, key := range aggregate.Definition.By {
newEntry[key] = group.labels[key]
}
metrics = append(metrics, newEntry)
// Once reported, we reset the recentXXX fields
if aggregate.Definition.Operation == OperationRawValues {
group.recentRawValues = make([]float64, 0)
Expand Down
9 changes: 7 additions & 2 deletions pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ func Test_FilterEntry(t *testing.T) {
aggregate := GetMockAggregate()
entry := test.GetIngestMockEntry(false)

err, _ := aggregate.FilterEntry(entry)
err, _, _ := aggregate.FilterEntry(entry)
require.Equal(t, err, nil)

err, normalizedLabels, labels := aggregate.FilterEntry(entry)
require.Equal(t, err, nil)
require.Equal(t, Labels{"srcIP": "10.0.0.1", "dstIP": "20.0.0.2"}, labels)
require.Equal(t, NormalizedValues("20.0.0.2,10.0.0.1"), normalizedLabels)

entry = test.GetIngestMockEntry(true)

err, _ = aggregate.FilterEntry(entry)
err, _, _ = aggregate.FilterEntry(entry)

require.EqualError(t, err, "missing keys in entry")
}
Expand Down

0 comments on commit 6c1dae1

Please sign in to comment.