调度需要解决的两个核心问题:
- 调度时机:什么时候会发生调度?
- 调度策略:使用什么策略来挑选下一个进入运行的
goroutine
- 新建
M
后,启动M
的mstart()
函数里触发schedule()
- 新建
G
并执行后,协程执行完毕退出时的goexit0()
函数里触发schedule()
gopark
:channel
这种结构在等待读写时,会使用gopark()
休眠协程后会触发schedule()
- 阻塞性系统调用,比如文件 IO,网络IO
Golang
重写了所有系统调用,封装了syscall
,在其前后增加了entersyscall
和exitsyscall
,进入系统调用前暂停当前协程、完成系统调用后在exitsyscall()
里恢复其资源、进度,并触发schedule()
- GC 触发抢占:GC 需要 STW 暂停所有
G
,GC 时会发送抢占信号。注册函数收到信号后会执行preemptPark() -> schecule()
- 其他抢占:注册函数收到抢占信号后执行
gopreempt_m() -> goschedImpl() -> schedule()
- 主动调用
runtime.Gosched() -> goschedImpl()
,这个很少见,一般调试才用
该函数会调用 mcall
保存现场到 G.sched
字段,然后修改 G
状态为 _Gwaiting
,并解绑 G
和 M
,使之暂停运行。然后调用 schedule()
调度别的 G
起来运行
// runtime/proc.go
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
mcall(park_m)
}
func park_m(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gwaiting)
dropg() // 解绑 M 和 G
schedule() // 调度别的 G 起来运行
}
// runtime/asm_amd64.s
// 该函数会保存当前现场到 g.sched、然后切换到 g0 栈调用函数
TEXT runtime·mcall(SB), NOSPLIT, $0-4
当通过 gopark()
休眠的协程满足条件、可以唤醒后(比如从 channel
中收到了数据),运行时会调用 goready()
唤醒协程
goready
会直接将 G
放回 P.runnext
,使之以最高优先级恢复运行
// runtime/proc.go
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
_g_ := getg()
casgstatus(gp, _Gwaiting, _Grunnable) // 修改 G 的状态
runqput(_g_.m.p.ptr(), gp, next) // 将 G 放回 P 的 runnext 字段
// 有空闲的 P 而且没有正在偷窃 的 M(自旋的M),则需要唤醒或新建一个 M 出来运行这个空闲 P
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}
func wakep() {
startm(nil, true) // 找一个 M 执行 P 的协程
}
注册函数处理抢占信号 或 我们使用 runtime.Gosched()
主动暂停当前协程时都会调用 goschedImpl
函数。该函数逻辑和 gopark
差不多,只是比它多了一步 globrunqput
,会将当前协程放入全局队列
func goschedImpl(gp *g) {
...
casgstatus(gp, _Grunning, _Grunnable)
dropg() // 设置当前m.curg = nil, gp.m = nil
lock(&sched.lock)
globrunqput(gp) // 把gp放入sched的全局运行队列runq
unlock(&sched.lock)
schedule() // 进入新一轮调度
}
系统调用也会触发运行时调度器的调度,为了处理特殊的系统调用,我们甚至在 Goroutine 中加入了 _Gsyscall
状态,Go 语言通过 syscall.Syscall
和 syscall.RawSyscall
等使用汇编语言编写的方法封装操作系统提供的所有系统调用,其中 syscall.Syscall
的实现如下:
TEXT ·Syscall(SB),NOSPLIT,$0-28
CALL runtime·entersyscall(SB)
...
CALL runtime·exitsyscall(SB)
RET
在 exitsyscall()
里会触发 schedule()
函数进行调度
schedule()
函数是调度逻辑的核心部分,该函数会暂停当前 G
,保存现场、取下个 G
执行。
总体思路是:依次从 P
的本地 G
队列取、全局 G
队列取、其他 P
里偷 G
:
- 调度器每调度
61
次,从全局队列里取一次G
(以避免全局队列里有的G
始终不被取出、饿死) - 调用
runqget
函数从P
本地的运行队列中查找待执行的G
- 调用
findrunnable
函数从其他地方取G
(依次从本地队列、全局队列、netpoll、从其他P
里偷窃取G
)- 先再次尝试从本地队列获取
G
- 去全局队列获取(因为前面仅仅是1/61的概率)
- 执行
netpoll
,检查网络读写事件,判断是否有 IO 就绪的G
- 如果还是没有,那么随机选择一个
P
,偷其runqueue
里的一半到自己的队列里- 这里的随机用到了一种质数算法,保证既随机,每个
P
又都能被访问到 - 偷窃前会将
M
的自旋状态设为true
,偷窃后再改回去 - 如果多次尝试偷
P
都失败了,M
会把P
放回sched
的 空闲P
数组,自身休眠(放回空闲M
列表)
- 这里的随机用到了一种质数算法,保证既随机,每个
- 先再次尝试从本地队列获取
wakep
: 另一种情况是,M
太忙了,如果P
池子里有空闲的P
,会唤醒其他sleep
状态的M
一起干活。如果没有sleep
状态的M
,runtime
会新建一个M
。execute
,执行代码。
// runtime/proc.go
// 找出一个 G 来执行
func schedule() {
top:
// 检查定时器
checkTimers(pp, 0)
// 每调度 61次,从全局队列里取 G。防止全局队列里的 G 一直不被调度而饿死,保证公平性
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock) // 全局队列需要加锁
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从本地队列里取 G,优先使用 P.runnext 字段
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
// 通过 findrunnable 函数从其他可能得地方寻找可执行 G,具体见下
if gp == nil {
gp, inheritTime = findrunnable()
}
// 执行任务函数
execute(gp, inheritTime)
}
依次从 本地队列、全局队列、netpoll、其他 P
获取可运行的 G
。偷窃优先级最低,因为会影响其他 P
执行(需要加锁)。
// runtime/proc.go
// 寻找可执行的 G
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
now, pollUntil, _ := checkTimers(_p_, 0) // 检查 P 中的定时器,参考 Timer 的实现那篇文章
// runqget 从本地队列取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// globrunqget 从全局队列里取
if sched.runqsize != 0 {
lock(&sched.lock) // 全局队列需要加锁
gp := globrunqget(_p_, 0) // 从全局队列队头弹出;另外会分一部分 G 给当前 P
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 检查是否有就绪的 netpoller
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 略
}
// stealWork 从其他 P 偷窃
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
gp, inheritTime, tnow, w, newWork := stealWork(now)
}
...
// 如果实在找不到 G,将 P 放回 sched.pidle 空闲队列,休眠 M
pidleput(_p_)
stopm()
}
从全局队列里取可用的 G
除返回一个可用 G
外,还会批量转移一批 G
到 P
的本地队列(按 P
的数量均分,最多取 P
本地队列的一半)。这样就省的频繁加锁来全局队列取。
// 从全局队列列取 G
// 另外还会分一部分 G 给 P(分的 G 个数为 Min(全局队列长度/P个,或者P本地队列/2))
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
// 分给 P 的 G 个数为(全局队列长度/P个数,P本地队列长度/2,二者最小值)
// 将全局队列按 P 个数等分
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
// 不能超过 runq 数组长度的一半
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n // 调整计数
gp := sched.runq.pop() // 从全局队列头部弹出 G
n--
for ; n > 0; n-- { // 分 n 个 G 到当前 P 的本地队列
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
从其他 P
里偷一半的 G
过来
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr()
const stealTries = 4
for i := 0; i < stealTries; i++ {
// 这里使用了一种质数算法,保证每个 P 都能被随机到
// 随机一个起点,然后选取一个和长度互质的数,遍历时下标一直+这个质数并取模长度,就能不重复地遍历所有的元素
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
p2 := allp[enum.position()]
// 这里也会偷窃 p2 的定时器
tnow, w, ran := checkTimers(p2, now)
// 偷窃 p2 的 G
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}
// 调用 runqgrab
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
}
// 从 P 里偷窃 G
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 先判断要偷的 P 本地队列长度
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
// n 为要偷的长度,偷目标队列的一半
n = n - n/2
// 如果长度为0,看看它的 runnext 字段里有没有 G
if n == 0 {
if stealRunNextG {
if next := _p_.runnext; next != 0 {
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) { // n 大于 P 长度的一半,说明该 P 的本地队列正在急剧增长,重试
continue
}
// 偷一半 G 放到 batch 切片里
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 修改 P 本地队列的头指针
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
schedule()
调度中会通过 excute()
执行函数,函数执行完毕后在退出函数 goexit0()
再次触发调度,这样就完成了一次循环:
schedule() -> execute() -> gogo() -> g2() -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()
schedule()
函数开始之后的一系列函数永远都不会返回,每次执行 mcall
会切换到 g0
栈运行,会切换到 g0.sched.sp
所指的固定位置,即重用这些函数上一轮调度时所使用过的栈内存,因此不会耗尽空间。
- 调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换
- 不需要频繁的进行内存的分配与释放。在用户态维护着一块大的内存池, 不直接调用系统的
malloc
函数(除非内存池需要改变) - 另一方面充分利用了多核的硬件资源,近似的把若干
goroutine
均分在物理线程上 - 再加上本身
goroutine
的超轻量,以上种种保证了go
调度方面的性能