Skip to content

Commit

Permalink
add a flexible wait group
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 20, 2023
1 parent 9a442d1 commit 644f91c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 9 deletions.
15 changes: 6 additions & 9 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

Expand All @@ -40,10 +41,7 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues
// Controller is used to manage all schedulers.
type Controller struct {
sync.RWMutex
// we cannot use waitGroup here because addScheduler will be called in any time
// but `new Add calls must happen after all previous Wait calls have returned`
// so we need to use a counter to avoid this
count atomic.Int32
wg *syncutil.FlexibleWaitGroup
ctx context.Context
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
Expand All @@ -60,6 +58,7 @@ type Controller struct {
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
return &Controller{
ctx: ctx,
wg: syncutil.NewFlexibleWaitGroup(),
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
Expand All @@ -70,9 +69,7 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e

// Wait waits on all schedulers to exit.
func (c *Controller) Wait() {
for c.count.Load() > 0 {
time.Sleep(10 * time.Millisecond)
}
c.wg.Wait()
}

// GetScheduler returns a schedule controller by name.
Expand Down Expand Up @@ -189,7 +186,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
return err
}

c.count.Add(1)
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.Scheduler.GetName()] = s
c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args)
Expand Down Expand Up @@ -325,7 +322,7 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) {

func (c *Controller) runScheduler(s *ScheduleController) {
defer logutil.LogPanic()
defer c.count.Add(-1)
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)

ticker := time.NewTicker(s.GetInterval())
Expand Down
55 changes: 55 additions & 0 deletions pkg/utils/syncutil/flexible_wait_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncutil

import (
"sync"
)

// FlexibleWaitGroup is a flexible wait group.
// Note: we can't use sync.WaitGroup because it doesn't support to add before wait.
type FlexibleWaitGroup struct {
sync.Mutex
count int
done chan struct{}
}

// NewFlexibleWaitGroup creates a FlexibleWaitGroup.
func NewFlexibleWaitGroup() *FlexibleWaitGroup {
return &FlexibleWaitGroup{
done: make(chan struct{}),
}
}

// Add adds delta to the FlexibleWaitGroup counter.
func (fwg *FlexibleWaitGroup) Add(delta int) {
fwg.Lock()
defer fwg.Unlock()

fwg.count += delta
if fwg.count <= 0 {
close(fwg.done)
}
}

// Done decrements the FlexibleWaitGroup counter.
func (fwg *FlexibleWaitGroup) Done() {
fwg.Add(-1)
}

// Wait blocks until the FlexibleWaitGroup counter is zero.
func (fwg *FlexibleWaitGroup) Wait() {
<-fwg.done
}
65 changes: 65 additions & 0 deletions pkg/utils/syncutil/flexible_wait_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncutil

import (
"testing"
"time"
)

func TestFlexibleWaitGroup(t *testing.T) {
fwg := NewFlexibleWaitGroup()
go func() {
for i := 0; i < 20; i++ {
fwg.Add(1)
go func(i int) {
defer fwg.Done()
time.Sleep(time.Millisecond * time.Duration(i*50))
}(i)
}
}()
fwg.Wait()
}

func TestAddAfterWait(t *testing.T) {
fwg := NewFlexibleWaitGroup()
startWait := make(chan struct{})
addTwice := make(chan struct{})
done := make(chan struct{})

// First goroutine: Adds a task, then waits for the second task to be added before finishing.
go func() {
defer fwg.Done()
fwg.Add(1)
<-addTwice
}()

// Second goroutine: adds a second task after ensure the third goroutine has started to wait
// and triggers the first goroutine to finish.
go func() {
defer fwg.Done()
<-startWait
fwg.Add(1)
addTwice <- struct{}{}
}()

// Third goroutine: waits for all tasks to be added, then finishes.
go func() {
startWait <- struct{}{}
fwg.Wait()
done <- struct{}{}
}()
<-done
}

0 comments on commit 644f91c

Please sign in to comment.