sync.Cond
是 Go 语言对条件锁的实现。它可以让一组 Goroutine 在不满足条件时阻塞等待,满足条件时被唤醒。
内嵌了 Locker
接口(一般用 Mutex
结构作为 Locker
就可以),和一个链表作为等待者队列。
type Cond struct {
noCopy noCopy // 用于保证结构体不会在编译期间拷贝
checker copyChecker // 用于禁止运行期间发生的拷贝
L Locker // 接口,需要实现 Lock 和 Unlock 方法,一般使用 Mutex 作为 Locker 即可
notify notifyList // 等待的协程链表
}
// 这个 wait 和 notify 字段是只增不减的,相当于自增ID。不是很理解为什么有链表了还需要索引
type notifyList struct {
wait uint32 // 正在等待的协程索引
notify uint32 // 已经通知到的协程索引
lock mutex
head *sudog
tail *sudog
}
var status int64
func main() {
m := &sync.Mutex{}
c := sync.NewCond(m) // 传入一个锁作为 Cond 的参数
// 起10个监听协程
for i := 0; i < 10; i++ {
go listen(c)
}
// 1秒后起一个协程广播
// time.Sleep(1 * time.Second)
go broadcast(c)
// 阻塞主协程,方便观察输出
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt) // 监听系统信号
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast() // Broadcast() 方法会唤醒所有等待的协程
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait() // Wait() 方法会释放锁,并将当前协程放入等待者列表中
}
fmt.Println("listen")
c.L.Unlock()
}
这个程序会在1秒后输出10个 "listen"
对外提供三个接口:Wait(), Signal() 与 Broadcast()
Wait()
方法会释放当前协程拿到的锁,并将当前协程放到等待者队列里
func (c *Cond) Wait() {
c.checker.check() // 检查是否发生运行时拷贝
t := runtime_notifyListAdd(&c.notify) // 等待者计数器+1,返回索引
c.L.Unlock() // 释放锁
runtime_notifyListWait(&c.notify, t) // 将当前协程放入等待者队列,并给它记上索引
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1 // 等待者计数器+1,返回索引
}
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg() // 获取当前协程
s.ticket = t // 赋值索引
if l.tail == nil {
l.head = s // 等待者队列为空,则把当前协程设为链表表头
} else {
l.tail.next = s // 否则放入队尾
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) // 休眠当前协程,让出当前处理器的使用权并等待调度器的唤醒
releaseSudog(s)
}
Signal()
与 Broadcast()
就是用来唤醒陷入休眠的 Goroutine 的方法
Signal()
方法会唤醒队列最前面的协程Broadcast()
方法会唤醒队列中全部的协程
// 唤醒队列最前面的协程
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// 唤醒队列全部协程
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// 遍历等待者队列,取第一个协程出来唤醒
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1) // 更新当前遍历到的索引下标
// 遍历链表,找到 notify 字段表示的索引位置上的协程。不是很理解为什么不直接取表头元素
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n // 将协程从链表中剔除
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4) // 唤醒
return
}
}
}
// 遍历等待者队列,全部唤醒
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait)) // 更新 notify 索引进度
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
sync.Cond
不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {}
进行忙碌等待相比,sync.Cond
能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:
sync.Cond.Wait
在调用之前一定要传入互斥锁作为参数,否则会触发程序崩溃;sync.Cond.Signal
唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;sync.Cond.Broadcast
会按照一定顺序广播通知等待的全部 Goroutine;