Go 语言的 sync
包中提供了带权重的信号量 semaphore.Weight
,能够控制并发访问的资源数量,比如协程数。先看个例子:
func main() {
urls := []string{"1", "2", "3", "4"}
s := semaphore.NewWeighted(3) // 声明配额为3的信号量
var w sync.WaitGroup
for _, u := range urls {
w.Add(1)
go func(u string) {
s.Acquire(context.Background(), 1) // 占用1个信号量资源
doSomething(u)
s.Release(Weight) // 解除信号量占用
w.Done()
}(u)
}
w.Wait()
fmt.Println("All Done")
}
这样就可以限制最多3个协程在跑了。
这个结构体对外暴露了四个方法:
NewWeighted()
用于创建新的信号量Acquire()
阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待,即P
操作TryAcquire()
非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回false
Release()
用于释放指定权重的资源,即V
操作
其 Acquire()
方法用途类似于 Mutex
结构体的 Lock()
,都是申请资源、加锁,但是可以 Acquire
指定数量的资源;Release()
方法类似于 Unlock()
,都是释放资源、解锁。
我们来看一下信号量 semaphore.Weighted
的数据结构:
type Weighted struct {
size int64 // 最大资源数
cur int64 // 当前已被使用的资源
mu sync.Mutex // 互斥锁,对字段的保护
waiters list.List // 等待的调用者队列,先进先出
}
type waiter struct {
n int64 // 需要占用的配额
ready chan<- struct{} // 用于唤醒调用者
}
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock() // 加锁保护临界区
// 1. 有剩余配额,则扣减配额后直接返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 2. 请求的资源数大于能提供的最大的资源数,这个任务处理不了,报错并返回
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 3. 现存资源不够, 需要把调用者加入到等待队列中
ready := make(chan struct{}) // 创建了一个 chan, 这样可以通过 close(chan) 的方式对其通知
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w) // 把 w 放到队尾
s.mu.Unlock()
// 等待其他协程释放资源后唤醒
select {
...
case <-ready: // 等待者被唤醒了
return nil
}
}
其实就是 Acquire
代码里的逻辑1,去除了等待的逻辑。
Release
方法会将占用资源放回,并调用 notifyWaiters
方法,唤醒等待队列中的调用者
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n // 释放信号量
s.notifyWaiters() // 通知等待者
s.mu.Unlock()
}
notifyWaiters
方法会逐个检查队列里等待的调用者,如果现存资源够等待者请求的数量n,或者是没有等待者了,就返回。
func (s *Weighted) notifyWaiters() {
for { // 遍历所有 waiter
next := s.waiters.Front()
if next == nil {
break // 没有等待者了,直接返回
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 如果现有资源不够队列头调用者请求的资源数,比如配额还剩10,但要占用11,就退出。所有等待者会继续等待
// 这里还是按照先入先出的方式处理是为了避免饥饿
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready) // close(chan),唤醒等待者
}
}
notifyWaiters
方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使队列后面有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。
在 Go
语言中信号量有时候也会被 Channel
类型所取代,因为一个 buffered chan
也可以代表 n 个资源。不过既然 Go
语言通过golang.orgx/sync
扩展库对外提供了 semaphore.Weight
这一种信号量实现,遇到使用信号量的场景时还是尽量使用官方提供的实现。在使用的过程中我们需要注意以下的几个问题:
Acquire
和TryAcquire
方法都可以用于获取资源,前者会阻塞地获取信号量。后者会非阻塞地获取信号量,如果获取不到就返回false
。Release
归还信号量后,会以先进先出的顺序唤醒队列中的等待者。如果现有资源不够队头的调用者请求的资源数,所有等待者会继续等待。- 如果一个
goroutine
申请较多的资源,由于上面说的归还后唤醒等待者的策略,它可能会等待比较长的时间。