From 5bb1401647141aadd87159ff45783cf90fb2b38f Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Fri, 24 Jan 2025 14:59:51 -0800 Subject: [PATCH] fix event-value-tag processor --- .../event_value_tag/event_value_tag.go | 60 ++++---- .../event_value_tag/event_value_tag_test.go | 131 ++++++++++++++++-- 2 files changed, 155 insertions(+), 36 deletions(-) diff --git a/pkg/formatters/event_value_tag/event_value_tag.go b/pkg/formatters/event_value_tag/event_value_tag.go index e890dab8..e5acab6d 100644 --- a/pkg/formatters/event_value_tag/event_value_tag.go +++ b/pkg/formatters/event_value_tag/event_value_tag.go @@ -59,7 +59,7 @@ func (vt *valueTag) Init(cfg interface{}, opts ...formatters.Option) error { return nil } -type foo struct { +type tagVal struct { tags map[string]string value interface{} } @@ -68,24 +68,14 @@ func (vt *valueTag) Apply(evs ...*formatters.EventMsg) []*formatters.EventMsg { if vt.TagName == "" { vt.TagName = vt.ValueName } - // Look for events with ValueName - toApply := make([]foo, 0) - for _, ev := range evs { - for k, v := range ev.Values { - if vt.ValueName == k { - toApply = append(toApply, foo{ev.Tags, v}) - if vt.Consume { - delete(ev.Values, k) - } - } - } - } - for _, bar := range toApply { + + cache := make(map[string]bool) + vts := vt.buildApplyRules(evs) + for _, tv := range vts { for _, ev := range evs { - if checkKeys(bar.tags, ev.Tags) { - if _, ok := ev.Values[vt.ValueName]; !ok { - ev.Tags[vt.TagName] = fmt.Sprint(bar.value) - } + match := compareTags(tv.tags, ev.Tags, cache) + if match { + ev.Tags[vt.TagName] = fmt.Sprint(tv.value) } } } @@ -104,17 +94,39 @@ func (vt *valueTag) WithTargets(tcs map[string]*types.TargetConfig) {} func (vt *valueTag) WithActions(act map[string]map[string]interface{}) {} -func checkKeys(a map[string]string, b map[string]string) bool { +// returns true if all keys match, false otherwise. +func compareTags(a map[string]string, b map[string]string, cache map[string]bool) bool { + cacheKey := fmt.Sprintf("%v-%v", a, b) + if cachedResult, exists := cache[cacheKey]; exists { + return cachedResult + } + + if len(a) > len(b) { + cache[cacheKey] = false + return false + } for k, v := range a { - if vv, ok := b[k]; ok { - if v != vv { - return false - } - } else { + if vv, ok := b[k]; !ok || v != vv { + cache[cacheKey] = false return false } } + + cache[cacheKey] = true return true } func (vt *valueTag) WithProcessors(procs map[string]map[string]any) {} + +func (vt *valueTag) buildApplyRules(evs []*formatters.EventMsg) []*tagVal { + toApply := make([]*tagVal, 0) + for _, ev := range evs { + if v, ok := ev.Values[vt.ValueName]; ok { + toApply = append(toApply, &tagVal{tags: ev.Tags, value: v}) + if vt.Consume { + delete(ev.Values, vt.ValueName) + } + } + } + return toApply +} diff --git a/pkg/formatters/event_value_tag/event_value_tag_test.go b/pkg/formatters/event_value_tag/event_value_tag_test.go index fc3898fb..84f2d89b 100644 --- a/pkg/formatters/event_value_tag/event_value_tag_test.go +++ b/pkg/formatters/event_value_tag/event_value_tag_test.go @@ -9,6 +9,7 @@ package event_value_tag import ( + "fmt" "reflect" "testing" @@ -48,18 +49,18 @@ var testset = map[string]struct { { Timestamp: 2, Tags: map[string]string{"tag": "value"}, - Values: map[string]interface{}{"foo": "value"}, + Values: map[string]interface{}{"foo": "new_value"}, }, }, output: []*formatters.EventMsg{ { Timestamp: 1, - Tags: map[string]string{"tag": "value", "foo": "value"}, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, }, { Timestamp: 2, - Tags: map[string]string{"tag": "value"}, - Values: map[string]interface{}{"foo": "value"}, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, + Values: map[string]interface{}{"foo": "new_value"}, }, }, }, @@ -87,6 +88,32 @@ var testset = map[string]struct { }, }, }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "foo": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "foo": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + }, }, }, "rename-tag": { @@ -113,18 +140,18 @@ var testset = map[string]struct { { Timestamp: 2, Tags: map[string]string{"tag": "value"}, - Values: map[string]interface{}{"foo": "value"}, + Values: map[string]interface{}{"foo": "new_value"}, }, }, output: []*formatters.EventMsg{ { Timestamp: 1, - Tags: map[string]string{"tag": "value", "bar": "value"}, + Tags: map[string]string{"tag": "value", "bar": "new_value"}, }, { Timestamp: 2, - Tags: map[string]string{"tag": "value"}, - Values: map[string]interface{}{"foo": "value"}, + Tags: map[string]string{"tag": "value", "bar": "new_value"}, + Values: map[string]interface{}{"foo": "new_value"}, }, }, }, @@ -152,6 +179,32 @@ var testset = map[string]struct { }, }, }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "bar": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "bar": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + }, }, }, "consume-value": { @@ -178,17 +231,17 @@ var testset = map[string]struct { { Timestamp: 2, Tags: map[string]string{"tag": "value"}, - Values: map[string]interface{}{"foo": "value"}, + Values: map[string]interface{}{"foo": "new_value"}, }, }, output: []*formatters.EventMsg{ { Timestamp: 1, - Tags: map[string]string{"tag": "value", "foo": "value"}, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, }, { Timestamp: 2, - Tags: map[string]string{"tag": "value", "foo": "value"}, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, Values: make(map[string]interface{}, 0), }, }, @@ -238,7 +291,8 @@ func TestEventValueTag(t *testing.T) { outs := p.Apply(item.input...) for j := range outs { if !reflect.DeepEqual(outs[j], item.output[j]) { - t.Errorf("failed at %s item %d, index %d, expected %+v, got: %+v", name, i, j, item.output[j], outs[j]) + t.Errorf("failed at %s item %d, index %d, expected %+v", name, i, j, item.output[j]) + t.Errorf("failed at %s item %d, index %d, got: %+v", name, i, j, outs[j]) } } }) @@ -248,3 +302,56 @@ func TestEventValueTag(t *testing.T) { } } } + +func generateEventMsgs(numEvents, numValues int, targetKey, targetValue string) []*formatters.EventMsg { + evs := make([]*formatters.EventMsg, numEvents) + for i := 0; i < numEvents; i++ { + values := make(map[string]any) + for j := 0; j < numValues; j++ { + values[fmt.Sprintf("key%d", j)] = fmt.Sprintf("value%d", j) + } + values[targetKey] = targetValue + evs[i] = &formatters.EventMsg{ + Tags: map[string]string{"tag": "test"}, + Values: values, + } + } + return evs +} + +func BenchmarkBuildApplyRules(b *testing.B) { + evs := generateEventMsgs(100, 10, "targetKey", "targetValue") + vt := &valueTag{ValueName: "targetKey", Consume: true} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + vt.buildApplyRules(evs) + } +} + +func BenchmarkBuildApplyRules2(b *testing.B) { + evs := generateEventMsgs(100, 10, "targetKey", "targetValue") + vt := &valueTag{ValueName: "targetKey", Consume: true} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + vt.buildApplyRules2(evs) + } +} + +// as ref +func (vt *valueTag) buildApplyRules2(evs []*formatters.EventMsg) []*tagVal { + toApply := make([]*tagVal, 0) + + for _, ev := range evs { + for k, v := range ev.Values { + if vt.ValueName == k { + toApply = append(toApply, &tagVal{ev.Tags, v}) + if vt.Consume { + delete(ev.Values, vt.ValueName) + } + } + } + } + return toApply +}