Skip to content

Commit

Permalink
Add fix to prevent events from being dropped
Browse files Browse the repository at this point in the history
If we drop events due to the default case then we can end up in a
situation where either the event system indefinitely waits since done
events were not handled or the extension waits indefinitely if the
event was dropped when sending it the event.
  • Loading branch information
ankur22 committed Sep 25, 2024
1 parent 48fdc05 commit 86ab6e3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
14 changes: 6 additions & 8 deletions event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,16 @@ func (s *System) Emit(event *Event) (wait func(context.Context) error) {
doneCh := make(chan struct{}, s.eventBuffer)
doneFn := func() {
origDoneFn()
select {
case doneCh <- struct{}{}:
default:
}
// The done must be read by the reading side to prevent
// a goroutine that waits indefinitely.
doneCh <- struct{}{}
}
event.Done = doneFn

for _, evtCh := range s.subscribers[event.Type] {
select {
case evtCh <- event:
default:
}
// The event channel must read off the channel otherwise we would
// be dropping events.
evtCh <- event
}

s.logger.WithFields(logrus.Fields{
Expand Down
40 changes: 40 additions & 0 deletions event/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,46 @@ func TestEventSystem(t *testing.T) {
wg.Wait()
})

// This ensures that the system still works even when the buffer size of
// the event system is smaller than the numSubs. We had an issue where
// when all the sub were trying to call done it would fail since the buffer
// was full and the event would never fully complete and wait indefinitely.
t.Run("emit_and_wait/buffer", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
logger := logrus.New()
logger.SetOutput(io.Discard)
// Not buffered
es := NewEventSystem(0, logger)

var (
wg sync.WaitGroup
numSubs = 100
)
for i := 0; i < numSubs; i++ {
sid, evtCh := es.Subscribe(Exit)
wg.Add(1)
go func() {
defer wg.Done()
_, err := processEvents(ctx, es, sid, evtCh)
require.NoError(t, err)
}()
}

var done uint32
wait := es.Emit(&Event{Type: Exit, Done: func() {
atomic.AddUint32(&done, 1)
}})
waitCtx, waitCancel := context.WithTimeout(ctx, time.Second)
defer waitCancel()
err := wait(waitCtx)
require.NoError(t, err)
assert.Equal(t, uint32(numSubs), done)

wg.Wait()
})

t.Run("emit_and_wait/error", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down

0 comments on commit 86ab6e3

Please sign in to comment.