Skip to content

Commit

Permalink
Merge pull request #589 from nokia/fix588
Browse files Browse the repository at this point in the history
fix event-value-tag processor
  • Loading branch information
karimra authored Jan 25, 2025
2 parents ebefd64 + 5bb1401 commit 601119f
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 36 deletions.
60 changes: 36 additions & 24 deletions pkg/formatters/event_value_tag/event_value_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (vt *valueTag) Init(cfg interface{}, opts ...formatters.Option) error {
return nil
}

type foo struct {
type tagVal struct {
tags map[string]string
value interface{}
}
Expand All @@ -68,24 +68,14 @@ func (vt *valueTag) Apply(evs ...*formatters.EventMsg) []*formatters.EventMsg {
if vt.TagName == "" {
vt.TagName = vt.ValueName
}
// Look for events with ValueName
toApply := make([]foo, 0)
for _, ev := range evs {
for k, v := range ev.Values {
if vt.ValueName == k {
toApply = append(toApply, foo{ev.Tags, v})
if vt.Consume {
delete(ev.Values, k)
}
}
}
}
for _, bar := range toApply {

cache := make(map[string]bool)
vts := vt.buildApplyRules(evs)
for _, tv := range vts {
for _, ev := range evs {
if checkKeys(bar.tags, ev.Tags) {
if _, ok := ev.Values[vt.ValueName]; !ok {
ev.Tags[vt.TagName] = fmt.Sprint(bar.value)
}
match := compareTags(tv.tags, ev.Tags, cache)
if match {
ev.Tags[vt.TagName] = fmt.Sprint(tv.value)
}
}
}
Expand All @@ -104,17 +94,39 @@ func (vt *valueTag) WithTargets(tcs map[string]*types.TargetConfig) {}

func (vt *valueTag) WithActions(act map[string]map[string]interface{}) {}

func checkKeys(a map[string]string, b map[string]string) bool {
// returns true if all keys match, false otherwise.
func compareTags(a map[string]string, b map[string]string, cache map[string]bool) bool {
cacheKey := fmt.Sprintf("%v-%v", a, b)
if cachedResult, exists := cache[cacheKey]; exists {
return cachedResult
}

if len(a) > len(b) {
cache[cacheKey] = false
return false
}
for k, v := range a {
if vv, ok := b[k]; ok {
if v != vv {
return false
}
} else {
if vv, ok := b[k]; !ok || v != vv {
cache[cacheKey] = false
return false
}
}

cache[cacheKey] = true
return true
}

func (vt *valueTag) WithProcessors(procs map[string]map[string]any) {}

func (vt *valueTag) buildApplyRules(evs []*formatters.EventMsg) []*tagVal {
toApply := make([]*tagVal, 0)
for _, ev := range evs {
if v, ok := ev.Values[vt.ValueName]; ok {
toApply = append(toApply, &tagVal{tags: ev.Tags, value: v})
if vt.Consume {
delete(ev.Values, vt.ValueName)
}
}
}
return toApply
}
131 changes: 119 additions & 12 deletions pkg/formatters/event_value_tag/event_value_tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package event_value_tag

import (
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -48,18 +49,18 @@ var testset = map[string]struct {
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
Values: map[string]interface{}{"foo": "new_value"},
},
},
output: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value", "foo": "value"},
Tags: map[string]string{"tag": "value", "foo": "new_value"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
Tags: map[string]string{"tag": "value", "foo": "new_value"},
Values: map[string]interface{}{"foo": "new_value"},
},
},
},
Expand Down Expand Up @@ -87,6 +88,32 @@ var testset = map[string]struct {
},
},
},
{
input: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"counter1": "1"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
},
},
output: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value", "foo": "value"},
Values: map[string]interface{}{"counter1": "1"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value", "foo": "value"},
Values: map[string]interface{}{"foo": "value"},
},
},
},
},
},
"rename-tag": {
Expand All @@ -113,18 +140,18 @@ var testset = map[string]struct {
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
Values: map[string]interface{}{"foo": "new_value"},
},
},
output: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value", "bar": "value"},
Tags: map[string]string{"tag": "value", "bar": "new_value"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
Tags: map[string]string{"tag": "value", "bar": "new_value"},
Values: map[string]interface{}{"foo": "new_value"},
},
},
},
Expand Down Expand Up @@ -152,6 +179,32 @@ var testset = map[string]struct {
},
},
},
{
input: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"counter1": "1"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
},
},
output: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value", "bar": "value"},
Values: map[string]interface{}{"counter1": "1"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value", "bar": "value"},
Values: map[string]interface{}{"foo": "value"},
},
},
},
},
},
"consume-value": {
Expand All @@ -178,17 +231,17 @@ var testset = map[string]struct {
{
Timestamp: 2,
Tags: map[string]string{"tag": "value"},
Values: map[string]interface{}{"foo": "value"},
Values: map[string]interface{}{"foo": "new_value"},
},
},
output: []*formatters.EventMsg{
{
Timestamp: 1,
Tags: map[string]string{"tag": "value", "foo": "value"},
Tags: map[string]string{"tag": "value", "foo": "new_value"},
},
{
Timestamp: 2,
Tags: map[string]string{"tag": "value", "foo": "value"},
Tags: map[string]string{"tag": "value", "foo": "new_value"},
Values: make(map[string]interface{}, 0),
},
},
Expand Down Expand Up @@ -238,7 +291,8 @@ func TestEventValueTag(t *testing.T) {
outs := p.Apply(item.input...)
for j := range outs {
if !reflect.DeepEqual(outs[j], item.output[j]) {
t.Errorf("failed at %s item %d, index %d, expected %+v, got: %+v", name, i, j, item.output[j], outs[j])
t.Errorf("failed at %s item %d, index %d, expected %+v", name, i, j, item.output[j])
t.Errorf("failed at %s item %d, index %d, got: %+v", name, i, j, outs[j])
}
}
})
Expand All @@ -248,3 +302,56 @@ func TestEventValueTag(t *testing.T) {
}
}
}

func generateEventMsgs(numEvents, numValues int, targetKey, targetValue string) []*formatters.EventMsg {
evs := make([]*formatters.EventMsg, numEvents)
for i := 0; i < numEvents; i++ {
values := make(map[string]any)
for j := 0; j < numValues; j++ {
values[fmt.Sprintf("key%d", j)] = fmt.Sprintf("value%d", j)
}
values[targetKey] = targetValue
evs[i] = &formatters.EventMsg{
Tags: map[string]string{"tag": "test"},
Values: values,
}
}
return evs
}

func BenchmarkBuildApplyRules(b *testing.B) {
evs := generateEventMsgs(100, 10, "targetKey", "targetValue")
vt := &valueTag{ValueName: "targetKey", Consume: true}

b.ResetTimer()
for i := 0; i < b.N; i++ {
vt.buildApplyRules(evs)
}
}

func BenchmarkBuildApplyRules2(b *testing.B) {
evs := generateEventMsgs(100, 10, "targetKey", "targetValue")
vt := &valueTag{ValueName: "targetKey", Consume: true}

b.ResetTimer()
for i := 0; i < b.N; i++ {
vt.buildApplyRules2(evs)
}
}

// as ref
func (vt *valueTag) buildApplyRules2(evs []*formatters.EventMsg) []*tagVal {
toApply := make([]*tagVal, 0)

for _, ev := range evs {
for k, v := range ev.Values {
if vt.ValueName == k {
toApply = append(toApply, &tagVal{ev.Tags, v})
if vt.Consume {
delete(ev.Values, vt.ValueName)
}
}
}
}
return toApply
}

0 comments on commit 601119f

Please sign in to comment.