Skip to content

Commit c8b7177

Browse files
authored
deduplicate subsequent component events (#242)
1 parent cbcf643 commit c8b7177

File tree

3 files changed

+114
-2
lines changed

3 files changed

+114
-2
lines changed

internal/events/recorder.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and component-operator-runtime contributors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package events
7+
8+
import (
9+
"fmt"
10+
"sync"
11+
"time"
12+
13+
"k8s.io/client-go/tools/record"
14+
"sigs.k8s.io/controller-runtime/pkg/client"
15+
)
16+
17+
type DeduplicatingRecorder struct {
18+
recorder record.EventRecorder
19+
mutex sync.Mutex
20+
events map[string]event
21+
}
22+
23+
type event struct {
24+
digest string
25+
timestamp time.Time
26+
}
27+
28+
func NewDeduplicatingRecorder(recorder record.EventRecorder) *DeduplicatingRecorder {
29+
return &DeduplicatingRecorder{
30+
recorder: recorder,
31+
events: make(map[string]event),
32+
}
33+
}
34+
35+
func (r *DeduplicatingRecorder) Event(object client.Object, eventType string, reason string, message string) {
36+
if r.isDuplicate(object, nil, eventType, reason, message) {
37+
return
38+
}
39+
r.recorder.Event(object, eventType, reason, message)
40+
}
41+
42+
func (r *DeduplicatingRecorder) Eventf(object client.Object, eventType string, reason string, messageFmt string, args ...any) {
43+
if r.isDuplicate(object, nil, eventType, reason, fmt.Sprintf(messageFmt, args...)) {
44+
return
45+
}
46+
r.recorder.Eventf(object, eventType, reason, messageFmt, args...)
47+
}
48+
49+
func (r *DeduplicatingRecorder) AnnotatedEventf(object client.Object, annotations map[string]string, eventType string, reason string, messageFmt string, args ...any) {
50+
if r.isDuplicate(object, annotations, eventType, reason, fmt.Sprintf(messageFmt, args...)) {
51+
return
52+
}
53+
r.recorder.AnnotatedEventf(object, annotations, eventType, reason, messageFmt, args...)
54+
}
55+
56+
func (r *DeduplicatingRecorder) isDuplicate(object client.Object, annotations map[string]string, eventType, reason, message string) bool {
57+
uid := string(object.GetUID())
58+
digest := calculateDigest(annotations, eventType, reason, message)
59+
now := time.Now()
60+
exp := time.Now().Add(-5 * time.Minute)
61+
62+
r.mutex.Lock()
63+
defer r.mutex.Unlock()
64+
for uid, event := range r.events {
65+
if event.timestamp.Before(exp) {
66+
delete(r.events, uid)
67+
}
68+
}
69+
if r.events[uid].digest == digest {
70+
return true
71+
} else {
72+
r.events[uid] = event{
73+
digest: digest,
74+
timestamp: now,
75+
}
76+
return false
77+
}
78+
}

internal/events/util.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and component-operator-runtime contributors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package events
7+
8+
import (
9+
"crypto/sha256"
10+
"encoding/hex"
11+
"encoding/json"
12+
)
13+
14+
// TODO: consolidate all the util files into an internal reuse package
15+
16+
func must[T any](x T, err error) T {
17+
if err != nil {
18+
panic(err)
19+
}
20+
return x
21+
}
22+
23+
func sha256hex(data []byte) string {
24+
sum := sha256.Sum256(data)
25+
return hex.EncodeToString(sum[:])
26+
}
27+
28+
func calculateDigest(values ...any) string {
29+
// note: this must() is ok because the input values are expected to be JSON values
30+
return sha256hex(must(json.Marshal(values)))
31+
}

pkg/component/reconciler.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141

4242
"github.com/sap/component-operator-runtime/internal/backoff"
4343
"github.com/sap/component-operator-runtime/internal/clientfactory"
44+
"github.com/sap/component-operator-runtime/internal/events"
4445
"github.com/sap/component-operator-runtime/internal/metrics"
4546
"github.com/sap/component-operator-runtime/pkg/cluster"
4647
"github.com/sap/component-operator-runtime/pkg/manifests"
@@ -134,6 +135,7 @@ type Reconciler[T Component] struct {
134135
groupVersionKind schema.GroupVersionKind
135136
controllerName string
136137
client cluster.Client
138+
eventRecorder events.DeduplicatingRecorder
137139
resourceGenerator manifests.Generator
138140
statusAnalyzer status.StatusAnalyzer
139141
options ReconcilerOptions
@@ -328,9 +330,9 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
328330
// such as the flux notfication recorder; should we therefore send the events asynchronously, or start synchronously and continue asynchronous
329331
// after a little while?
330332
if state == StateError {
331-
r.client.EventRecorder().AnnotatedEventf(component, eventAnnotations, corev1.EventTypeWarning, reason, message)
333+
r.eventRecorder.AnnotatedEventf(component, eventAnnotations, corev1.EventTypeWarning, reason, "%s", message)
332334
} else {
333-
r.client.EventRecorder().AnnotatedEventf(component, eventAnnotations, corev1.EventTypeNormal, reason, message)
335+
r.eventRecorder.AnnotatedEventf(component, eventAnnotations, corev1.EventTypeNormal, reason, "%s", message)
334336
}
335337

336338
if skipStatusUpdate {
@@ -617,6 +619,7 @@ func (r *Reconciler[T]) SetupWithManagerAndBuilder(mgr ctrl.Manager, blder *ctrl
617619
}
618620
r.client = clnt
619621
}
622+
r.eventRecorder = *events.NewDeduplicatingRecorder(r.client.EventRecorder())
620623

621624
component := newComponent[T]()
622625
r.groupVersionKind, err = apiutil.GVKForObject(component, r.client.Scheme())

0 commit comments

Comments
 (0)