From 25275913704c7cc5db61539fcfd9b5af7ce70190 Mon Sep 17 00:00:00 2001 From: yahavi Date: Fri, 22 Dec 2023 18:17:56 +0200 Subject: [PATCH 1/2] Allow resetting the finish notification of a runner --- parallel/runner.go | 13 ++++++++++++ parallel/runner_test.go | 46 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/parallel/runner.go b/parallel/runner.go index 5920ef6..ae8ccbb 100644 --- a/parallel/runner.go +++ b/parallel/runner.go @@ -22,6 +22,7 @@ type Runner interface { SetMaxParallel(int) GetFinishedNotification() chan bool SetFinishedNotification(bool) + ResetFinishNotification() } type TaskFunc func(int) error @@ -209,6 +210,18 @@ func (r *runner) SetFinishedNotification(toEnable bool) { r.finishedNotificationEnabled = toEnable } +// Recreates the finish notification channel. +// This method helps manage a scenario involving two runners: "1" assigns tasks to "2". +// Runner "2" might occasionally encounter periods without assigned tasks. +// As a result, the finish notification for "2" might be triggered. +// To tackle this issue, use the ResetFinishNotification after all tasks assigned to "1" have been completed. +func (r *runner) ResetFinishNotification() { + r.finishedNotifierLock.Lock() + defer r.finishedNotifierLock.Unlock() + r.finishedNotifier = make(chan bool, 1) + r.finishedNotifierChannelClosed = false +} + func (r *runner) SetMaxParallel(newVal int) { if newVal < 1 { newVal = 1 diff --git a/parallel/runner_test.go b/parallel/runner_test.go index bafd3c1..f7383ca 100644 --- a/parallel/runner_test.go +++ b/parallel/runner_test.go @@ -181,3 +181,49 @@ func TestMaxParallel(t *testing.T) { runner.Run() assert.Equal(t, uint32(capacity), runner.started) } + +func TestResetFinishNotification(t *testing.T) { + // Create 2 runners + const capacity = 10 + const parallelism = 3 + runnerOne := NewRunner(parallelism, capacity, false) + runnerOne.SetFinishedNotification(true) + runnerTwo := NewRunner(parallelism, capacity, false) + runnerTwo.SetFinishedNotification(true) + + // Add 10 tasks to runner one. Each task provides tasks to runner two. + for i := 0; i < capacity; i++ { + _, err := runnerOne.AddTask(func(int) error { + time.Sleep(time.Millisecond * 10) + _, err := runnerTwo.AddTask(func(int) error { + time.Sleep(time.Millisecond) + return nil + }) + assert.NoError(t, err) + return nil + }) + assert.NoError(t, err) + } + + // Create a goroutine waiting for the finish notification of the first runner before running "Done". + go func() { + <-runnerOne.GetFinishedNotification() + runnerOne.Done() + }() + + // Start running the second runner in a different goroutine to make it non-blocking. + go func() { + runnerTwo.Run() + }() + + // Run the first runner. This is a blocking method. + runnerOne.Run() + + // Reset runner two's finish notification to ensure we receive it only after all tasks assigned to runner two are completed. + runnerTwo.ResetFinishNotification() + + // Receive the finish notification and ensure that we have truly completed the task. + <-runnerTwo.GetFinishedNotification() + assert.Zero(t, runnerTwo.ActiveThreads()) + runnerTwo.Done() +} From e0b1ba98592ff9f781e04aec3dc1f5e755dca382 Mon Sep 17 00:00:00 2001 From: yahavi Date: Sat, 23 Dec 2023 09:02:52 +0200 Subject: [PATCH 2/2] Don't reset the finish notification if there are not active threads --- parallel/runner.go | 12 +++++++++--- parallel/runner_test.go | 6 +++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/parallel/runner.go b/parallel/runner.go index ae8ccbb..d051412 100644 --- a/parallel/runner.go +++ b/parallel/runner.go @@ -22,7 +22,7 @@ type Runner interface { SetMaxParallel(int) GetFinishedNotification() chan bool SetFinishedNotification(bool) - ResetFinishNotification() + ResetFinishNotificationIfActive() } type TaskFunc func(int) error @@ -214,10 +214,16 @@ func (r *runner) SetFinishedNotification(toEnable bool) { // This method helps manage a scenario involving two runners: "1" assigns tasks to "2". // Runner "2" might occasionally encounter periods without assigned tasks. // As a result, the finish notification for "2" might be triggered. -// To tackle this issue, use the ResetFinishNotification after all tasks assigned to "1" have been completed. -func (r *runner) ResetFinishNotification() { +// To tackle this issue, use the ResetFinishNotificationIfActive after all tasks assigned to "1" have been completed. +func (r *runner) ResetFinishNotificationIfActive() { r.finishedNotifierLock.Lock() defer r.finishedNotifierLock.Unlock() + + // If no active threads, don't reset + if r.activeThreads.Load() == 0 && r.totalTasksInQueue.Load() == 0 || r.cancel.Load() { + return + } + r.finishedNotifier = make(chan bool, 1) r.finishedNotifierChannelClosed = false } diff --git a/parallel/runner_test.go b/parallel/runner_test.go index f7383ca..b3e0293 100644 --- a/parallel/runner_test.go +++ b/parallel/runner_test.go @@ -182,7 +182,7 @@ func TestMaxParallel(t *testing.T) { assert.Equal(t, uint32(capacity), runner.started) } -func TestResetFinishNotification(t *testing.T) { +func TestResetFinishNotificationIfActive(t *testing.T) { // Create 2 runners const capacity = 10 const parallelism = 3 @@ -194,7 +194,7 @@ func TestResetFinishNotification(t *testing.T) { // Add 10 tasks to runner one. Each task provides tasks to runner two. for i := 0; i < capacity; i++ { _, err := runnerOne.AddTask(func(int) error { - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) _, err := runnerTwo.AddTask(func(int) error { time.Sleep(time.Millisecond) return nil @@ -220,7 +220,7 @@ func TestResetFinishNotification(t *testing.T) { runnerOne.Run() // Reset runner two's finish notification to ensure we receive it only after all tasks assigned to runner two are completed. - runnerTwo.ResetFinishNotification() + runnerTwo.ResetFinishNotificationIfActive() // Receive the finish notification and ensure that we have truly completed the task. <-runnerTwo.GetFinishedNotification()