Skip to content

Commit

Permalink
Merge pull request #593 from nokia/event-group-by-opt
Browse files Browse the repository at this point in the history
optimize event-group-by processor
  • Loading branch information
karimra authored Jan 26, 2025
2 parents 1626e5a + 937fa34 commit 5293235
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 33 deletions.
135 changes: 107 additions & 28 deletions pkg/formatters/event_group_by/event_group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ package event_group_by

import (
"encoding/json"
"hash/fnv"
"io"
"log"
"os"
"sort"
"slices"
"strings"

"github.com/openconfig/gnmic/pkg/api/types"
Expand Down Expand Up @@ -79,13 +80,14 @@ func (p *groupBy) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg {
groups := make(map[string][]*formatters.EventMsg)
names := make([]string, 0)
for _, e := range es {
if _, ok := groups[e.Name]; !ok {
_, ok := groups[e.Name]
if !ok {
groups[e.Name] = make([]*formatters.EventMsg, 0)
names = append(names, e.Name)
}
groups[e.Name] = append(groups[e.Name], e)
}
sort.Strings(names)
slices.Sort(names)
for _, n := range names {
result = append(result, p.byTags(groups[n])...)
}
Expand All @@ -109,7 +111,7 @@ func (p *groupBy) WithActions(act map[string]map[string]interface{}) {}

func (p *groupBy) WithProcessors(procs map[string]map[string]any) {}

func (p *groupBy) byTags(es []*formatters.EventMsg) []*formatters.EventMsg {
func (p *groupBy) byTagsOld(es []*formatters.EventMsg) []*formatters.EventMsg {
if len(p.Tags) == 0 {
return es
}
Expand All @@ -124,41 +126,118 @@ func (p *groupBy) byTags(es []*formatters.EventMsg) []*formatters.EventMsg {
var key strings.Builder
for _, t := range p.Tags {
if v, ok := e.Tags[t]; ok {
key.WriteString(t)
key.Write(eqByte)
key.WriteString(v)
key.Write(pipeByte)
continue
}
exist = false
break
}
if exist {
skey := key.String()
if _, ok := groups[skey]; !ok {
keys = append(keys, skey)
groups[skey] = &formatters.EventMsg{
Name: e.Name,
Timestamp: e.Timestamp,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
for k, v := range e.Tags {
groups[skey].Tags[k] = v
}
for k, v := range e.Values {
groups[skey].Values[k] = v
}
if e.Deletes != nil {
groups[skey].Deletes = make([]string, 0)
groups[skey].Deletes = append(groups[skey].Deletes, e.Deletes...)
}

if !exist {
result = append(result, e)
continue
}
result = append(result, e)

skey := key.String()
group, ok := groups[skey]
if !ok {
keys = append(keys, skey)
group = &formatters.EventMsg{
Name: e.Name,
Timestamp: e.Timestamp,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
groups[skey] = group
}
for k, v := range e.Tags {
group.Tags[k] = v
}
for k, v := range e.Values {
group.Values[k] = v
}
if e.Deletes != nil {
group.Deletes = append(group.Deletes, e.Deletes...)
}
}
sort.Strings(keys)
slices.Sort(keys)
for _, k := range keys {
result = append(result, groups[k])
}
return result
}

func (p *groupBy) byTags(es []*formatters.EventMsg) []*formatters.EventMsg {
if len(p.Tags) == 0 {
return es
}

result := make([]*formatters.EventMsg, 0, len(es))
groups := make(map[uint64]*formatters.EventMsg)

for _, e := range es {
if e == nil || e.Tags == nil || e.Values == nil {
continue
}

//grouping key based on tags value
skey, match := generateKeyAndCheck(e.Tags, p.Tags)
if !match {
result = append(result, e)
continue
}

group, exists := groups[skey]
if !exists {
group = &formatters.EventMsg{
Name: e.Name,
Timestamp: e.Timestamp,
Tags: make(map[string]string, len(e.Tags)),
Values: make(map[string]interface{}, len(e.Values)),
Deletes: make([]string, 0, len(e.Deletes)),
}
groups[skey] = group
}

// merge tags, values and deletes into the group
for k, v := range e.Tags {
group.Tags[k] = v
}
for k, v := range e.Values {
group.Values[k] = v
}
if e.Deletes != nil {
group.Deletes = append(group.Deletes, e.Deletes...)
}
}

for _, ev := range groups {
result = append(result, ev)
}

return result
}

func generateKeyAndCheck(tags map[string]string, keys []string) (uint64, bool) {
h := fnv.New64a()

for _, k := range keys {
v, ok := tags[k]
if !ok {
return 0, false
}
h.Write([]byte(k))
h.Write([]byte(eqByte))
h.Write([]byte(v))
h.Write([]byte(pipeByte))
}

return h.Sum64(), true
}

var (
eqByte = []byte("=")
pipeByte = []byte("|")
)
100 changes: 95 additions & 5 deletions pkg/formatters/event_group_by/event_group_by_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package event_group_by

import (
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -418,11 +419,9 @@ func TestEventGroupBy(t *testing.T) {
t.Errorf(" got: %v", outs)
return
}
for j := range outs {
if !reflect.DeepEqual(outs[j], item.output[j]) {
t.Errorf("failed at %s item %d, index %d, expected: %+v", name, i, j, item.output[j])
t.Errorf("failed at %s item %d, index %d, got: %+v", name, i, j, outs[j])
}
if !slicesEqual(outs, item.output) {
t.Errorf("failed at %s, expected: %+v", name, item.output)
t.Errorf("failed at %s, got: %+v", name, outs)
}
})
}
Expand All @@ -431,3 +430,94 @@ func TestEventGroupBy(t *testing.T) {
}
}
}

func generateMockEvents(numEvents, numTags int) []*formatters.EventMsg {
es := make([]*formatters.EventMsg, numEvents)
for i := 0; i < numEvents; i++ {
tags := make(map[string]string, numTags)
values := make(map[string]interface{}, numTags)
for j := 0; j < numTags; j++ {
tags[fmt.Sprintf("tag%d", j)] = fmt.Sprintf("value%d", j)
values[fmt.Sprintf("valueKey%d", j)] = fmt.Sprintf("value%d", j)
}
es[i] = &formatters.EventMsg{
Name: fmt.Sprintf("event%d", i%5), // Group some events by name
Timestamp: int64(i),
Tags: tags,
Values: values,
}
}
return es
}

func BenchmarkByTags(b *testing.B) {
p := &groupBy{Tags: []string{"tag1", "tag2"}}

// Generate mock event messages
es := generateMockEvents(100_000, 5)

b.Run("OldByTags", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = p.byTagsOld(es)
}
})

b.Run("NewByTags", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = p.byTags(es)
}
})
}

func slicesEqual(slice1, slice2 []*formatters.EventMsg) bool {
if len(slice1) != len(slice2) {
return false
}

// Create a map to track matches in slice2
used := make([]bool, len(slice2))

// Check that every item in slice1 has a match in slice2
for _, e1 := range slice1 {
found := false
for i, e2 := range slice2 {
if !used[i] && eventMsgEqual(e1, e2) {
used[i] = true
found = true
break
}
}
if !found {
return false // No match found for this item
}
}

return true
}

func eventMsgEqual(a, b *formatters.EventMsg) bool {
if a == nil || b == nil {
return a == b
}

if a.Name != b.Name || a.Timestamp != b.Timestamp {
return false
}

if !reflect.DeepEqual(a.Tags, b.Tags) {
return false
}
if !reflect.DeepEqual(a.Values, b.Values) {
return false
}
if a.Deletes == nil && b.Deletes == nil {
return true
}
if len(a.Deletes) == 0 && len(b.Deletes) == 0 {
return true
}
if !reflect.DeepEqual(a.Deletes, b.Deletes) {
return false
}
return true
}

0 comments on commit 5293235

Please sign in to comment.