From 89f06b3296823907c811cc67b5e753419c9a4a0f Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Tue, 7 Nov 2023 16:00:45 +0200 Subject: [PATCH] change reporting logic --- server/application/application.go | 34 +++++++------ .../application/application_event_sharding.go | 50 +++++++++++++++++++ 2 files changed, 69 insertions(+), 15 deletions(-) create mode 100644 server/application/application_event_sharding.go diff --git a/server/application/application.go b/server/application/application.go index 011b09b9e6035..a0654c0a2ba69 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -1040,6 +1040,9 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing selector labels.Selector err error ) + + channelSelector := NewChannelPerApplicationChannelSelector() + q := application.ApplicationQuery{} if err := yaml.Unmarshal(es.Config, &q); err != nil { logCtx.WithError(err).Error("failed to unmarshal event-source config") @@ -1121,22 +1124,23 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing for { select { case event := <-eventsChannel: - shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) - if !shouldProcess { - continue - } - ts := time.Now().Format("2006-01-02T15:04:05.000Z") - ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute) - err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache) - if err != nil { - logCtx.WithError(err).Error("failed to stream application events") - if strings.Contains(err.Error(), "context deadline exceeded") { - logCtx.Info("Closing event-source connection") - cancel() - return err + channelSelector.Subscribe(event.Application, event.Type, func(payload channelPayload) { + log.Infof("Processing callback for application %s", payload.Application.Name) + shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event) + if !shouldProcess { + return } - } - cancel() + ts := time.Now().Format("2006-01-02T15:04:05.000Z") + ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute) + err := sendIfPermitted(ctx, payload.Application, payload.Type, ts, ignoreResourceCache) + if err != nil { + logCtx.WithError(err).Error("failed to stream application events") + if strings.Contains(err.Error(), "context deadline exceeded") { + logCtx.Info("Closing event-source connection") + cancel() + } + } + }) case <-ticker.C: var err error ts := time.Now().Format("2006-01-02T15:04:05.000Z") diff --git a/server/application/application_event_sharding.go b/server/application/application_event_sharding.go new file mode 100644 index 0000000000000..a9ea0429effb5 --- /dev/null +++ b/server/application/application_event_sharding.go @@ -0,0 +1,50 @@ +package application + +import ( + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/watch" +) + +type channelPayload struct { + Application appv1.Application + Type watch.EventType +} + +type ApplicationEventChannelSelector interface { + Subscribe(application appv1.Application, eventType watch.EventType, callback func(application channelPayload)) +} + +type channelPerApplicationChannelSelector struct { + channels map[string]chan channelPayload +} + +func NewChannelPerApplicationChannelSelector() ApplicationEventChannelSelector { + return &channelPerApplicationChannelSelector{ + channels: map[string]chan channelPayload{}, + } +} + +func (s *channelPerApplicationChannelSelector) Subscribe(application appv1.Application, eventType watch.EventType, callback func(application channelPayload)) { + log.Infof("Subscribing to application %s", application.Name) + if s.channels[application.Name] == nil { + s.channels[application.Name] = make(chan channelPayload, 1000) + go func(channel chan channelPayload) { + for { + select { + case app := <-channel: + log.Infof("Received event for application %s", app.Application.Name) + callback(app) + } + } + }(s.channels[application.Name]) + } + + go func() { + log.Infof("Publish to application %s", application.Name) + s.channels[application.Name] <- channelPayload{ + Application: application, + Type: eventType, + } + }() +}