From 86ab6e3ceee82af1eaba85536ba45a568b13b48e Mon Sep 17 00:00:00 2001 From: ankur22 Date: Tue, 24 Sep 2024 16:02:41 +0100 Subject: [PATCH] Add fix to prevent events from being dropped If we drop events due to the default case then we can end up in a situation where either the event system indefinitely waits since done events were not handled or the extension waits indefinitely if the event was dropped when sending it the event. --- event/system.go | 14 ++++++-------- event/system_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/event/system.go b/event/system.go index 416f8ac7742..a12f818dfc1 100644 --- a/event/system.go +++ b/event/system.go @@ -86,18 +86,16 @@ func (s *System) Emit(event *Event) (wait func(context.Context) error) { doneCh := make(chan struct{}, s.eventBuffer) doneFn := func() { origDoneFn() - select { - case doneCh <- struct{}{}: - default: - } + // The done must be read by the reading side to prevent + // a goroutine that waits indefinitely. + doneCh <- struct{}{} } event.Done = doneFn for _, evtCh := range s.subscribers[event.Type] { - select { - case evtCh <- event: - default: - } + // The event channel must read off the channel otherwise we would + // be dropping events. + evtCh <- event } s.logger.WithFields(logrus.Fields{ diff --git a/event/system_test.go b/event/system_test.go index 33aed5de008..b094b3cb7a0 100644 --- a/event/system_test.go +++ b/event/system_test.go @@ -169,6 +169,46 @@ func TestEventSystem(t *testing.T) { wg.Wait() }) + // This ensures that the system still works even when the buffer size of + // the event system is smaller than the numSubs. We had an issue where + // when all the sub were trying to call done it would fail since the buffer + // was full and the event would never fully complete and wait indefinitely. + t.Run("emit_and_wait/buffer", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + // Not buffered + es := NewEventSystem(0, logger) + + var ( + wg sync.WaitGroup + numSubs = 100 + ) + for i := 0; i < numSubs; i++ { + sid, evtCh := es.Subscribe(Exit) + wg.Add(1) + go func() { + defer wg.Done() + _, err := processEvents(ctx, es, sid, evtCh) + require.NoError(t, err) + }() + } + + var done uint32 + wait := es.Emit(&Event{Type: Exit, Done: func() { + atomic.AddUint32(&done, 1) + }}) + waitCtx, waitCancel := context.WithTimeout(ctx, time.Second) + defer waitCancel() + err := wait(waitCtx) + require.NoError(t, err) + assert.Equal(t, uint32(numSubs), done) + + wg.Wait() + }) + t.Run("emit_and_wait/error", func(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)