Skip to content

Commit

Permalink
change reporting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pasha-codefresh committed Nov 7, 2023
1 parent 12dd9b2 commit 89f06b3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 15 deletions.
34 changes: 19 additions & 15 deletions server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,9 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
selector labels.Selector
err error
)

channelSelector := NewChannelPerApplicationChannelSelector()

q := application.ApplicationQuery{}
if err := yaml.Unmarshal(es.Config, &q); err != nil {
logCtx.WithError(err).Error("failed to unmarshal event-source config")
Expand Down Expand Up @@ -1121,22 +1124,23 @@ func (s *Server) StartEventSource(es *events.EventSource, stream events.Eventing
for {
select {
case event := <-eventsChannel:
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
continue
}
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, event.Application, event.Type, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
logCtx.Info("Closing event-source connection")
cancel()
return err
channelSelector.Subscribe(event.Application, event.Type, func(payload channelPayload) {
log.Infof("Processing callback for application %s", payload.Application.Name)
shouldProcess, ignoreResourceCache := s.applicationEventReporter.shouldSendApplicationEvent(event)
if !shouldProcess {
return
}
}
cancel()
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
err := sendIfPermitted(ctx, payload.Application, payload.Type, ts, ignoreResourceCache)
if err != nil {
logCtx.WithError(err).Error("failed to stream application events")
if strings.Contains(err.Error(), "context deadline exceeded") {
logCtx.Info("Closing event-source connection")
cancel()
}
}
})
case <-ticker.C:
var err error
ts := time.Now().Format("2006-01-02T15:04:05.000Z")
Expand Down
50 changes: 50 additions & 0 deletions server/application/application_event_sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package application

import (
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"
)

type channelPayload struct {
Application appv1.Application
Type watch.EventType
}

type ApplicationEventChannelSelector interface {
Subscribe(application appv1.Application, eventType watch.EventType, callback func(application channelPayload))
}

type channelPerApplicationChannelSelector struct {
channels map[string]chan channelPayload
}

func NewChannelPerApplicationChannelSelector() ApplicationEventChannelSelector {
return &channelPerApplicationChannelSelector{
channels: map[string]chan channelPayload{},
}
}

func (s *channelPerApplicationChannelSelector) Subscribe(application appv1.Application, eventType watch.EventType, callback func(application channelPayload)) {
log.Infof("Subscribing to application %s", application.Name)
if s.channels[application.Name] == nil {
s.channels[application.Name] = make(chan channelPayload, 1000)
go func(channel chan channelPayload) {
for {
select {
case app := <-channel:
log.Infof("Received event for application %s", app.Application.Name)
callback(app)
}
}
}(s.channels[application.Name])
}

go func() {
log.Infof("Publish to application %s", application.Name)
s.channels[application.Name] <- channelPayload{
Application: application,
Type: eventType,
}
}()
}

0 comments on commit 89f06b3

Please sign in to comment.