sync.WaitGroup
可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
sync.WaitGroup
结构体中只包含两个成员变量,一个是计数器,一个是信号量,甚至连锁都没有
type WaitGroup struct {
noCopy noCopy // 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝
state1 uint64 // 实际为2个 uint32 计数器,调用 Add() 方法时修改第一个计数器,调用 Wait() 方法时增加第二个计数器,表示阻塞的协程数
state2 uint32 // 信号量
}
sync.noCopy
是一个特殊的私有结构体,tools/go/analysis/passes/copylock
包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy
或者实现了 Lock
和 Unlock
方法,如果包含该结构体或者实现了对应的方法就会报出以下错误:copies lock value: sync.WaitGroup
sync.WaitGroup
对外暴露了三个方法:sync.WaitGroup.Add
、sync.WaitGroup.Wait
和 sync.WaitGroup.Done
。
因为其中的 sync.WaitGroup.Done
只是向 sync.WaitGroup.Add
方法传入了 -1,所以我们重点分析另外两个方法,即 sync.WaitGroup.Add
和 sync.WaitGroup.Wait
。
Add()
方法做两件事:
- 增加或扣减计数器(
Add(正数)
时增加,Done()
时扣减) - 扣减计数器后检查计数器,在计数器扣减为
0
时唤醒所有睡眠的协程
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32) // 将传入的 delta 参数加到高32位,即第一个计数器中。由于结构体里没有锁,因此使用 atomic
v := int32(state >> 32) // 取高32位,第一个计数器
w := uint32(state) // 转为uint32取低32位,第二个计数器,表示阻塞的协程数
...
// 计数器1 >0,或者没有协程调用 Wait() 方法时,可直接返回
if v > 0 || w == 0 {
return
}
// 计数器1 ==0 且计数器2 >0,唤醒睡眠的各个协程
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Wait()
方法只做一件事:增加计数器并让协程进入休眠
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 取高32位,第一个计数器
// 计数器1 == 0时,可直接返回
if v == 0 {
return
}
// 否则给计数器2自增并睡眠
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 陷入睡眠
runtime_Semacquire(semap)
return
}
}
}
sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,我们可以向sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的协程- 可以同时有多个协程等待当前
sync.WaitGroup
计数器的归零,这些 Goroutine 会被同时唤醒;