diff --git a/parallel/runner.go b/parallel/runner.go index 5920ef6..d051412 100644 --- a/parallel/runner.go +++ b/parallel/runner.go @@ -22,6 +22,7 @@ type Runner interface { SetMaxParallel(int) GetFinishedNotification() chan bool SetFinishedNotification(bool) + ResetFinishNotificationIfActive() } type TaskFunc func(int) error @@ -209,6 +210,24 @@ func (r *runner) SetFinishedNotification(toEnable bool) { r.finishedNotificationEnabled = toEnable } +// Recreates the finish notification channel. +// This method helps manage a scenario involving two runners: "1" assigns tasks to "2". +// Runner "2" might occasionally encounter periods without assigned tasks. +// As a result, the finish notification for "2" might be triggered. +// To tackle this issue, use the ResetFinishNotificationIfActive after all tasks assigned to "1" have been completed. +func (r *runner) ResetFinishNotificationIfActive() { + r.finishedNotifierLock.Lock() + defer r.finishedNotifierLock.Unlock() + + // If no active threads, don't reset + if r.activeThreads.Load() == 0 && r.totalTasksInQueue.Load() == 0 || r.cancel.Load() { + return + } + + r.finishedNotifier = make(chan bool, 1) + r.finishedNotifierChannelClosed = false +} + func (r *runner) SetMaxParallel(newVal int) { if newVal < 1 { newVal = 1 diff --git a/parallel/runner_test.go b/parallel/runner_test.go index bafd3c1..b3e0293 100644 --- a/parallel/runner_test.go +++ b/parallel/runner_test.go @@ -181,3 +181,49 @@ func TestMaxParallel(t *testing.T) { runner.Run() assert.Equal(t, uint32(capacity), runner.started) } + +func TestResetFinishNotificationIfActive(t *testing.T) { + // Create 2 runners + const capacity = 10 + const parallelism = 3 + runnerOne := NewRunner(parallelism, capacity, false) + runnerOne.SetFinishedNotification(true) + runnerTwo := NewRunner(parallelism, capacity, false) + runnerTwo.SetFinishedNotification(true) + + // Add 10 tasks to runner one. Each task provides tasks to runner two. + for i := 0; i < capacity; i++ { + _, err := runnerOne.AddTask(func(int) error { + time.Sleep(time.Millisecond * 100) + _, err := runnerTwo.AddTask(func(int) error { + time.Sleep(time.Millisecond) + return nil + }) + assert.NoError(t, err) + return nil + }) + assert.NoError(t, err) + } + + // Create a goroutine waiting for the finish notification of the first runner before running "Done". + go func() { + <-runnerOne.GetFinishedNotification() + runnerOne.Done() + }() + + // Start running the second runner in a different goroutine to make it non-blocking. + go func() { + runnerTwo.Run() + }() + + // Run the first runner. This is a blocking method. + runnerOne.Run() + + // Reset runner two's finish notification to ensure we receive it only after all tasks assigned to runner two are completed. + runnerTwo.ResetFinishNotificationIfActive() + + // Receive the finish notification and ensure that we have truly completed the task. + <-runnerTwo.GetFinishedNotification() + assert.Zero(t, runnerTwo.ActiveThreads()) + runnerTwo.Done() +}