diff --git a/event/system.go b/event/system.go index 416f8ac7742..a12f818dfc1 100644 --- a/event/system.go +++ b/event/system.go @@ -86,18 +86,16 @@ func (s *System) Emit(event *Event) (wait func(context.Context) error) { doneCh := make(chan struct{}, s.eventBuffer) doneFn := func() { origDoneFn() - select { - case doneCh <- struct{}{}: - default: - } + // The done must be read by the reading side to prevent + // a goroutine that waits indefinitely. + doneCh <- struct{}{} } event.Done = doneFn for _, evtCh := range s.subscribers[event.Type] { - select { - case evtCh <- event: - default: - } + // The event channel must read off the channel otherwise we would + // be dropping events. + evtCh <- event } s.logger.WithFields(logrus.Fields{ diff --git a/event/system_test.go b/event/system_test.go index 33aed5de008..b094b3cb7a0 100644 --- a/event/system_test.go +++ b/event/system_test.go @@ -169,6 +169,46 @@ func TestEventSystem(t *testing.T) { wg.Wait() }) + // This ensures that the system still works even when the buffer size of + // the event system is smaller than the numSubs. We had an issue where + // when all the sub were trying to call done it would fail since the buffer + // was full and the event would never fully complete and wait indefinitely. + t.Run("emit_and_wait/buffer", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + // Not buffered + es := NewEventSystem(0, logger) + + var ( + wg sync.WaitGroup + numSubs = 100 + ) + for i := 0; i < numSubs; i++ { + sid, evtCh := es.Subscribe(Exit) + wg.Add(1) + go func() { + defer wg.Done() + _, err := processEvents(ctx, es, sid, evtCh) + require.NoError(t, err) + }() + } + + var done uint32 + wait := es.Emit(&Event{Type: Exit, Done: func() { + atomic.AddUint32(&done, 1) + }}) + waitCtx, waitCancel := context.WithTimeout(ctx, time.Second) + defer waitCancel() + err := wait(waitCtx) + require.NoError(t, err) + assert.Equal(t, uint32(numSubs), done) + + wg.Wait() + }) + t.Run("emit_and_wait/error", func(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)