22 Go 并发模型和常见并发模式
不要通过共享内存来通信,而应该通过通信来共享内存 - Rob Pike,Go 语言之父
在前面的内容中,我们说过:传统的编程语言(比如:C++、Java、Python 等)并非面向并发而生,因此他们面对并发的逻辑多是基于操作系统的线程。并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如:共享内存、信号(signal)、管道(pipe)、消息队列、套接字(socket)等。在这些通信原语中,使用最多最广泛(也是最高效的)是结合了线程同步原语(比如:锁以及更为低级的原子操作)的共享内存方式,因此,我们可以说传统语言的并发模型是基于对内存的共享的。
图6-2-1:基于线程同步原语和共享内存的并发模型
不幸的是,这种传统的基于共享内存的并发模型是难用和易错的,尤其是在大型或复杂程序中。开发人员在设计并发程序时需要根据线程模型对程序进行建模,同时规划线程之间的通信方式。如果选择的是高效的基于共享内存的机制,那么他们还要花费大量心思设计线程间的同步机制,并且在设计同步机制的时候,还要考虑多线程间复杂的内存管理以及如何防止死锁等。开发人员承受着巨大的心智负担,并且基于此类传统并发模型的程序难于编写、阅读、理解和维护。一旦程序发生问题,查找 Bug 的过程更是漫长和艰辛。
Go 语言从设计伊始就将解决上述传统并发模型的问题作为 Go 的一个目标,并在新并发模型设计中借鉴了著名计算机科学家Tony Hoare提出的 CSP(Communicationing Sequential Processes,通信顺序进程) 并发模型。
Tony Hoare 的 CSP 模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。Tony Hoare 认为输入输出应该是基本的编程原语,数据处理逻辑(即 CSP 中的 P)仅需调用输入原语获取数据,顺序地处理数据,并将结果数据通过输出原语输出即可。因此,在 Tony Hoare 眼中,一个符合 CSP 模型的并发程序应该是一组通过输入输出原语连接起来的 P 的集合。从这个角度来看,CSP 理论不仅是一个并发参考模型,也是一种并发程序的程序组织方法。其组合思想与 Go 的设计哲学不谋而合。Tony Hoare 的 CSP 理论中的 P,即“Process(进程)”,是一个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他 P 的输出获取),并生产出可以被其他 P 消费的输出数据。
图6-2-2:CSP并发模型
P 并不一定与操作系统的进程或线程划等号。在 Go 中,与“Process”对应的是 goroutine,但 Go 语言中 goroutine 的执行逻辑并不一定是顺序的,goroutine 也可以创建其他 goroutine 以并发地完成工作。
为了实现 CSP 并发模型中的输入和输出原语,Go 引入了 goroutine之间的通信原语channel
。goroutine 可以从 channel 获取输入数据,再将处理后得到的结果数据通过 channel 输出。通过 channel 将 goroutine§组合连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰,我们无需再为那些传统共享内存并发模型中的问题而伤脑筋了。
虽然 CSP 模型已经成为 Go 语言支持的主流并发模型,但 Go 也支持传统的基于共享内存的并发模型,并提供了基本的低级别同步原语(主要是 sync 包中的互斥锁、条件变量、读写锁、原子操作等)。那么我们在实践中应该选择哪个模型的并发原语呢?是使用 channel 还是在低级同步原语保护下的共享内存呢?毫无疑问,从程序的整体结构来看,就像本节开头引述 Rob Pike 的那句话一样,Go 始终推荐以 CSP 并发模型风格构建并发程序,尤其是在复杂的业务层面,这将提升程序的逻辑清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,可以使用更为高效的低级同步原语(如 mutex)保证 goroutine 对数据的同步访问。
我们知道:在语言层面,Go 针对 CSP 并发模型提供了三种并发原语:
- goroutine:对应 CSP 模型中的P,封装了数据的处理逻辑,是 Go 运行时调度的基本执行单元;
- channel:对应 CSP 模型中的输入/输出原语,用于 goroutine 之间的通信和同步;
- select:用于应对多路输入/输出,可以让 goroutine 同时协调处理多个 channel 操作。
接下来,我们就来深入了解一下实践中这些原语的常见组合方式,即并发模式。
Go 语言通过go关键字+函数/方法
的方式创建一个 goroutine:
go fmt.Println("I am a goroutine")
// $GOROOT/src/net/http/server.go
c := srv.newConn(rw)
go c.serve(connCtx)
但在稍复杂一些的并发程序中,我们需要考虑通过 CSP 模型输入/输出原语的承载体channel在 goroutine 之间建立联系。为了满足这一需求,我们通常使用下面的方式来创建一个 goroutine:
type T struct {...}
func spawn(f func()) chan T {
c := make(chan T)
go func() {
// 使用channel变量c(通过闭包方式)与调用spawn的goroutine通信
... ...
f()
... ...
}()
return c
}
func main() {
c := spawn(func(){})
// 使用channel变量c与新创建的goroutine通信
}
这个在内部创建一个 goroutine 并返回一个 channel 类型变量的函数就是 Go 中最常见的 goroutine 创建模式。spawn 函数创建的新 goroutine 与调用 spawn 函数的 goroutine 之间通过一个 channel 建立起了联系:两个 goroutine 可以通过这个 channel 进行通信。spawn 函数的实现也益于 channel 作为 Go 语言**一等公民(first-class citizen)**的存在:channel 可以像变量一样被初始化、传递和赋值。上面例子中的 spawn 只返回了一个 channel 变量,大家可以根据需要自行定义返回的 channel 个数和用途。
goroutine 的使用代价很低,Go 官方也推荐大家多多使用 goroutine。多数情况下,我们无需考虑对 goroutine 的退出进行控制:goroutine 的执行函数返回,即意味着 goroutine 退出。但一些常驻的后台服务程序可能会对 goroutine 有着优雅退出的要求,在这里我们就分类说明一下 goroutine 的几种退出模式。
这里借鉴了一些线程模型中的术语,比如分离(detached)模式。分离模式是使用最为广泛的 goroutine 退出方式。所谓分离的 goroutine,即创建它的 goroutine 不需要关心它的退出,这类 goroutine 启动后与其创建者彻底分离(detached),其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。通常,这类 goroutine 有两个常见用途:
- 一次性任务:顾名思义,新创建的 goroutine 用来执行一个简单的任务,执行后即退出。比如下面标准库中的代码:
// $GOROOT/src/net/dial.go
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
... ...
if oldCancel := d.Cancel; oldCancel != nil {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
}
... ...
}
我们看到在 DialContext 方法中创建了一个 goroutine,用来监听两个 channel 是否有数据,一旦有数据,处理后即退出。
- 常驻后台执行一些特定任务,如:监视(monitor)、观察(watcher)等。其实现通常采用
for {...}
或for { select{... } }
代码段形式,并多以定时器(timer)或事件(event)驱动执行。
Go 为每个 P 内置的 GC goroutine 就是这种类型的:
// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
// Background marking is performed by per-P G's. Ensure that
// each P has a background GC G.
for _, p := range allp {
if p.gcBgMarkWorker == 0 {
go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorker
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
}
}
}
func gcBgMarkWorker(_p_ *p) {
gp := getg()
... ...
for { // 常驻后台处理GC事宜
... ...
}
}
在线程模型中,父线程可以通过 pthread_join 来等待子线程结束并获取子线程的结束状态。在 Go 中,我们有时候也有类似的需求:goroutine 的创建者需要等待新 goroutine 的结束。笔者为这样的 goroutine 退出模式起名为**“join 模式”**。
- 等待一个 goroutine 退出
我们从一个简单的场景开始,先来看看如何等待一个 goroutine 结束。下面是模拟该场景的一段示例代码:
// go-concurrency-pattern-1.go
package main
import "time"
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
go func() {
f(args...)
c <- struct{}{}
}()
return c
}
func main() {
done := spawn(worker, 5)
println("spawn a worker goroutine")
<-done
println("worker done")
}
在上面代码中,spawn 函数使用典型的 goroutine 的创建模式创建了一个 goroutine,main goroutine 作为创建者通过 spawn 函数返回的 channel 与新 goroutine 建立联系,这个 channel 的用途就是在两个 goroutine 之间建立退出事件的“信号”通信机制。main goroutine 在创建完新 goroutine 后便在该 channel 上阻塞等待,直到新 goroutine 退出前向该 channel 发送了一个“信号”。
运行该示例:
$ go run go-concurrency-pattern-1.go
spawn a worker goroutine
worker done
- 获取 goroutine 的退出状态
如果新 goroutine 的创建者不仅仅要等待 goroutine 的退出,还要精准获取其结束状态,我们可以同样可以通过自定义类型的 channel 来实现这一场景需求。下面是基于上面代码改造后的示例:
// go-concurrency-pattern-2.go
package main
import (
"errors"
"fmt"
"time"
)
var OK = errors.New("ok")
func worker(args ...interface{}) error {
if len(args) == 0 {
return errors.New("invalid args")
}
interval, ok := args[0].(int)
if !ok {
return errors.New("invalid interval arg")
}
time.Sleep(time.Second * (time.Duration(interval)))
return OK
}
func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
c := make(chan error)
go func() {
c <- f(args...)
}()
return c
}
func main() {
done := spawn(worker, 5)
println("spawn worker1")
err := <-done
fmt.Println("worker1 done:", err)
done = spawn(worker)
println("spawn worker2")
err = <-done
fmt.Println("worker2 done:", err)
}
我们将 channel 中承载的类型由struct{}
改为了error
,这样 channel 承载的信息就不仅仅是一个“信号”了,还携带了“有价值”的信息:新 goroutine 的结束状态。运行上述示例:
$go run go-concurrency-pattern-2.go
spawn worker1
worker1 done: ok
spawn worker2
worker2 done: invalid args
- 等待多个 goroutine 退出
有些场景中,goroutine 的创建者可能会创建不止一个 goroutine,并且需要等待全部新 goroutine 退出。我们可以通过 Go 语言提供的sync.WaitGroup
实现等待多个 goroutine 退出的模式:
// go-concurrency-pattern-3.go
package main
import (
"fmt"
"sync"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
name := fmt.Sprintf("worker-%d:", i)
f(args...)
println(name, "done")
wg.Done() // worker done!
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
done := spawnGroup(5, worker, 3)
println("spawn a group of workers")
<-done
println("group workers done")
}
我们看到通过sync.WaitGroup
,spawnGroup
每创建一个 goroutine,都会调用wg.Add(1)
,新创建的 goroutine 会在退出前调用wg.Done
。我们在spawnGroup
中还创建了一个用于监视的 goroutine,该 goroutine 调用sync.WaitGroup
的Wait
方法来等待所有 goroutine 退出。当所有新创建的 goroutine 退出后,Wait
方法返回,该监视 goroutine 会向done
这个 channel 写入一个“信号”,这时 main goroutine 才会从阻塞在done
channel 上的状态中恢复,继续往下执行。
运行上述示例代码:
$go run go-concurrency-pattern-3.go
spawn a group of workers
worker-2: done
worker-1: done
worker-0: done
worker-4: done
worker-3: done
group workers done
- 支持超时机制的等待
有时候,我们不想无限阻塞等待所有新创建 goroutine 的退出,而是仅等待一个合理的时间。如果在这个时间范围内 goroutine 没有退出,则创建者会继续向下执行或主动退出。下面的示例代码是在等待多个 goroutine 退出的例子之上增加了超时机制:
// go-concurrency-pattern-4.go
... ...
func main() {
done := spawnGroup(5, worker, 30)
println("spawn a group of workers")
timer := time.NewTimer(time.Second * 5)
defer timer.Stop()
select {
case <-timer.C: // wait 5 seconds for other goroutines
println("wait group workers exit timeout!")
case <-done:
println("group workers done")
}
}
在上述代码中,我们通过一个定时器(time.Timer)设置了超时等待时间,并通过select
原语同时 timer 和done
channel,哪个先返回数据就执行哪个 case 分支。
运行上述示例代码:
$ go run go-concurrency-pattern-4.go
spawn a group of workers
wait group workers exit timeout!
前面的几个场景中,goroutine 的创建者都是在被动地等待着新 goroutine 的退出。但很多时候,goroutine 创建者需要主动通知那些新 goroutine 退出,尤其是当 main goroutine 作为创建者时。main goroutine 退出意味着 Go 程序的终止,而粗暴地直接让 main goroutine 退出的方式可能会导致业务数据的损坏、不完整或丢失。我们可以通过“notify-and-wait(通知并等待)”模式来满足这一场景的要求。虽然这一模式也不能完全避免“损失”,但是它给了各个 goroutine 一个“挽救数据”的机会,可以尽可能地减少损失的程度。
- 通知并等待一个 goroutine 退出
我们先从一个简单的“通知并等待一个 goroutine 退出”场景入手,下面是满足该场景要求的示例代码:
// go-concurrency-pattern-5.go
package main
import "time"
func worker(j int) {
time.Sleep(time.Second * (time.Duration(j)))
}
func spawn(f func(int)) chan string {
quit := make(chan string)
go func() {
var job chan int // 模拟job channel
for {
select {
case j := <-job:
f(j)
case <-quit:
quit <- "ok"
}
}
}()
return quit
}
func main() {
quit := spawn(worker)
println("spawn a worker goroutine")
time.Sleep(5 * time.Second) // give every goroutine 5 seconds to execute task
// 通知新创建的goroutine退出
println("notify the worker to exit...")
quit <- "exit"
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-quit:
println("worker done:", status)
case <-timer.C:
println("wait worker exit timeout")
}
}
在上述示例代码中,使用创建模式创建 goroutine 的spawn
函数返回的 channel 的作用发生了变化,从原先的只是用于新 goroutine 发送退出“信号”给创建者,变成了一个双向的数据通道:既承载创建者发送给新 goroutine 的“退出信号”,也承载新 goroutine 返回给创建者的“退出状态”。
运行上述示例代码:
$go run go-concurrency-pattern-5.go
spawn a worker goroutine
notify the worker to exit...
worker done: ok
- 通知并等待多个 goroutine 退出
下面是“通知并等待多个 goroutine 退出”的场景。Go 语言的 channel 有一个特性,那就是当使用 close 函数关于 channel 时,所有阻塞到该 channel 上的 goroutine 都会得到“通知”,我们就利用这一特性实现满足这一场景的模式:
// go-concurrency-pattern-6.go
package main
import (
"fmt"
"sync"
"time"
)
func worker(j int) {
time.Sleep(time.Second * (time.Duration(j)))
}
func spawnGroup(n int, f func(int)) chan struct{} {
quit := make(chan struct{})
job := make(chan int)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done() // 保证wg.Done在goroutine退出前被执行
name := fmt.Sprintf("worker-%d:", i)
for {
j, ok := <-job
if !ok {
println(name, "done")
return
}
// do the job
worker(j)
}
}(i)
}
go func() {
<-quit
close(job) // 广播给所有新goroutine
wg.Wait()
quit <- struct{}{}
}()
return quit
}
func main() {
quit := spawnGroup(5, worker)
println("spawn a group of workers")
time.Sleep(5 * time.Second)
// notify the worker goroutine group to exit
println("notify the worker group to exit...")
quit <- struct{}{}
timer := time.NewTimer(time.Second * 5)
defer timer.Stop()
select {
case <-timer.C:
println("wait group workers exit timeout!")
case <-quit:
println("group workers done")
}
}
上面这个示例代码的关键就是创建者直接利用了 worker goroutine 接收任务(job)的 channel 来“广播”退出通知,而实现这一“广播”的代码就是close(job)
。此时各个 worker goroutine 监听 job channel,当创建者关闭 job channel 时,通过“comma ok”模式获取的 ok 值为false,也就是表明该 channel 已经被关闭,于是 worker goroutine 执行退出逻辑(退出前wg.Done
被执行)。
运行上述示例代码:
$go run go-concurrency-pattern-6.go
spawn a group of workers
notify the worker group to exit...
worker-3: done
worker-0: done
worker-4: done
worker-2: done
worker-1: done
group workers done
很多时候,我们在程序中要启动多个 goroutine 协作完成应用的业务逻辑,比如:
func main() {
go producer.Start()
go consumer.Start()
go watcher.Start()
... ...
}
但这些 goroutine 的运行形态很可能不同:有些扮演服务端,有些可能是客户端等,因此似乎很难用一种统一的框架全面管理他们的启动、运行和退出。我们尝试将问题范围缩小,聚焦在实现一个“超时等待退出”框架以统一解决各种运行形态 goroutine 的优雅退出问题。
我们来定义一个接口:
// go-concurrency-pattern-7.go
type GracefullyShutdowner interface {
Shutdown(waitTimeout time.Duration) error
}
这样,凡是实现了该接口的类型均可在程序退出时得到退出的通知和调用,从而有机会做退出前的最后清理工作。这里还提供了一个类似http.HandlerFunc
的类型ShutdownerFunc
,用于将普通函数转化为实现了 GracefullyShutdowner 接口的类型实例(得益于函数在 Go 中为“一等公民”的特质):
// go-concurrency-pattern-7.go
type ShutdownerFunc func(time.Duration) error
func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
return f(waitTimeout)
}
一组 goroutine 的退出总体上有两种情况。一种是并发退出,在这类退出方式下,各个 goroutine 的退出先后次序对数据处理无影响,因此各个 goroutine 可以并发执行退出逻辑;另外一种则是串行退出,即各个 goroutine 之间的退出是按照一定次序逐个进行的。次序若错了可能会导致程序的状态混乱和错误。
我们先来说并发退出:
// go-concurrency-pattern-7.go
func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
c := make(chan struct{})
go func() {
var wg sync.WaitGroup
for _, g := range shutdowners {
wg.Add(1)
go func(shutdowner GracefullyShutdowner) {
defer wg.Done()
shutdowner.Shutdown(waitTimeout)
}(g)
}
wg.Wait()
c <- struct{}{}
}()
timer := time.NewTimer(waitTimeout)
defer timer.Stop()
select {
case <-c:
return nil
case <-timer.C:
return errors.New("wait timeout")
}
}
如上述代码所示,我们将各个 GracefullyShutdowner 接口的实现以一个变长参数的形式传入 ConcurrentShutdown 函数。ConcurrentShutdown 函数实现也很简单(类似上面的超时等待多个 goroutine 退出的模式):
- 为每个传入的
GracefullyShutdowner
接口实现的实例启动一个 goroutine 来执行退出逻辑,并将 timeout 参数传入每个实例的Shutdown
方法中; - 通过
sync.WaitGroup
在外层等待每个 goroutine 的退出; - 通过 select 监听一个退出通知 channel 和一个 timer channel 以决定到底是正常退出还是超时退出。
下面是该并发退出函数对应的测试用例,通过该用例我们也可以直观了解到该函数的使用方法:
// go-concurrency-pattern-7_test.go
package main
import (
"testing"
"time"
)
func shutdownMaker(processTm int) func(time.Duration) error {
return func(time.Duration) error {
time.Sleep(time.Second * time.Duration(processTm))
return nil
}
}
func TestConcurrentShutdown(t *testing.T) {
f1 := shutdownMaker(2)
f2 := shutdownMaker(6)
err := ConcurrentShutdown(10*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
if err != nil {
t.Errorf("want nil, actual: %s", err)
return
}
err = ConcurrentShutdown(4*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
if err == nil {
t.Error("want timeout, actual nil")
return
}
}
在上面测试中,我们通过一个工具函数shutdownMaker
“制作”出通过ShutdownerFunc
转型即可满足接口 GracefullyShutdowner 的类型实例,并分别测试了ConcurrentShutdown
函数的正常和等待超时两种状况。运行上面测试用例:
$ go test -v ./go-concurrency-pattern-7_test.go ./go-concurrency-pattern-7.go
=== RUN TestConcurrentShutdown
--- PASS: TestConcurrentShutdown (10.00s)
PASS
ok command-line-arguments 10.001s
有了并发退出作为基础,串行退出的实现也就很简单了:
// go-concurrency-pattern-7.go
func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
start := time.Now()
var left time.Duration
timer := time.NewTimer(waitTimeout)
for _, g := range shutdowners {
elapsed := time.Since(start)
left = waitTimeout - elapsed
c := make(chan struct{})
go func(shutdowner GracefullyShutdowner) {
shutdowner.Shutdown(left)
c <- struct{}{}
}(g)
timer.Reset(left)
select {
case <-c:
//continue
case <-timer.C:
return errors.New("wait timeout")
}
}
return nil
}
串行退出的一个问题是:waitTimeout 值的确定,因为这个超时时间是所有 goroutine 的退出时间之和。在上述代码里,我把每次的 left(剩余时间)传入下一个要执行的 goroutine 的 Shutdown 方法中。select 也同样使用这个 left 作为 timeout 的值(通过timer.Reset
重新设置 timer 定时器周期)。对照 ConcurrentShutdown,SequentialShutdown 更简单,这里就不详细说了。
很多 Go 初学者在初次看到 Go 提供的并发原语:channel
时,很容易就联想到Unix/Linux
平台上的管道机制,比如下面就是一个利用管道机制实现过滤出当前路径下的以".go"为结尾文件列表的命令:
$ls -l|grep "\.go"
Unix/Linux 的管道机制就是将前面程序的输出数据作为输入数据传递给后面的程序,比如:上面的命令就是将ls -l
的结果数据通过管道传递给grep
程序。
管道是 Unix/Linux 上一种典型的并发程序设计模式,也是 Unix 崇尚“组合”设计哲学的具体体现。Go 中没有定义管道,但是具有深厚 Unix 文化背景的 Go 语言缔造者们显然借鉴了 Unix 的设计哲学,在 Go 中引入了channel
这种并发原语,而channel
原语使构建管道并发模式变得容易且自然。
图6-2-3:管道模式
我们看到在 Go 中管道模式被实现成了由channel
连接的一条“数据流水线”。该流水线中,每个数据处理环节都由一组相同功能的 goroutine完成。在每个数据处理环节,goroutine 都要从数据输入 channel 获取前一个环节生产的数据,然后对这些数据进行处理,并将处理后的结果数据通过数据输出 channel 发往下一个环节。
下面是一个使用了管道模式的示例:
// go-concurrency-pattern-8.go
package main
func newNumGenerator(start, count int) <-chan int {
c := make(chan int)
go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return 0, false
}
return in, true
}
func square(in int) (int, bool) {
return in * in, true
}
func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}()
return out
}
func main() {
in := newNumGenerator(1, 20)
out := spawn(square, spawn(filterOdd, in))
for v := range out {
println(v)
}
}
这条流水线管道可以被称为“偶数的平方”。我们看到这条流水线管道有四个处理环节:
- 第一个环节就是生成最初的数据序列,这个由
newNumGenerator
创建的 goroutine 负责生成并发送到输出 channel 中,在序列全部发送完毕后,该 goroutine 关闭 channel 并退出; - 第二个环节是从序列中过滤奇数。由 spawn 函数创建的 goroutine 从第一个环节的输出 channel 读取数据,并交由
filterOdd
函数处理。如果是奇数,则丢弃;如果是偶数,则发到该 goroutine 的输出 channel 中;当全部数据发送完毕后,该 goroutine 关闭 channel 并退出; - 第三个环节是将序列中的数据进行平方运算处理。由 spawn 函数创建的 goroutine 从第二个环节的输出 channel 读取数据,并交由
square函数
处理。处理后的数据被发到该 goroutine 的输出 channel 中;当全部数据发送完毕后,该 goroutine 关闭 channel 并退出; - 第四个环节是将序列中的数据输出到控制台(console)。main goroutine 从第三个环节的输出 channel 中读取数据,并将数据通过
println
输出到控制台上。当全部数据都读取并展示完毕后,main goroutine 退出。
运行上述示例代码:
$go run go-concurrency-pattern-8.go
4
16
36
64
100
144
196
256
324
400
管道模式具有良好的可扩展性。如果我们要在上面示例代码的基础上在最开始处新增一个处理环节,比如过滤掉所有大于 100 的数(filterNumOver100,我们可以像下面代码这样扩展我们的管道流水线:
in := newNumGenerator(1, 20)
out := spawn(square, spawn(filterOdd, spawn(filterNumOver10, in))
下面我们再来了解两种基于管道模式的扩展模式。
- 扇出模式(fan-out)
在某一处理环节中,多个功能相同的 goroutine 从同一个 channel 读取数据并处理,直到该 channel 关闭,这种情况被称为扇出(fan-out)。使用扇出模式可以在一组 goroutine 中均衡分配工作量,从而可以更好地并行化对 CPU 和 I/O 的使用。
- 扇入模式(fan-in)
在某个处理环节,处理程序面对不止一个输入 channel。我们把所有输入 channel 的数据汇聚到一个统一的输入 channel,然后处理程序再从这个汇聚后的 channel 读取数据并处理,直到该汇聚 channel 因所有输入 channel 关闭而关闭。这种情况被称为扇入(fan-in)。
下图直观的展示了扇出和扇入模式:
图6-2-4:扇出和扇入模式
下面我们来看看扇出和扇入模式的实现示例:
// go-concurrency-pattern-9.go
package main
import (
"fmt"
"sync"
"time"
)
func newNumGenerator(start, count int) <-chan int {
c := make(chan int)
go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return 0, false
}
return in, true
}
func square(in int) (int, bool) {
return in * in, true
}
func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
groupOut := make(chan int)
var outSlice []chan int
for i := 0; i < num; i++ { // fan out
out := make(chan int)
go func(i int) {
name := fmt.Sprintf("%s-%d:", name, i)
fmt.Printf("%s begin to work...\n", name)
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
fmt.Printf("%s work done\n", name)
}(i)
outSlice = append(outSlice, out)
}
// Fan-in pattern
//
// out --\
// \
// out ---- --> groupOut
// /
// out --/
//
go func() {
var wg sync.WaitGroup
for _, out := range outSlice { //fan in
wg.Add(1)
go func(out <-chan int) {
for v := range out {
groupOut <- v
}
wg.Done()
}(out)
}
wg.Wait()
close(groupOut)
}()
return groupOut
}
func main() {
in := newNumGenerator(1, 20)
out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))
time.Sleep(3 * time.Second) //为了输出更直观的结果,这里等上面的goroutine都就绪
for v := range out {
fmt.Println(v)
}
}
上面代码中,我们通过spawnGroup
函数实现了"fan-out"模式,针对每个输入 channel,我们都建立多个功能相同的 goroutine 从这个共同的输入 channel 读取数据并处理,直至 channel 被关闭。在 spawnGroup 函数的结尾处,我们将多个 goroutine 的 out channel 聚合到一个 groupOut channel 中,这就是"fan-in"模式的实现。
运行上述示例代码:
$ go run go-concurrency-pattern-9.go
square-1: begin to work...
filterOdd-1: begin to work...
square-0: begin to work...
filterOdd-2: begin to work...
filterOdd-0: begin to work...
filterOdd-1: work done
4
16
36
100
64
144
324
400
256
196
filterOdd-2: work done
filterOdd-0: work done
square-0: work done
square-1: work done
我们经常会使用 Go 编写向服务发起请求并获取应答结果的客户端应用。这里我们就来看一个这样的例子:我们要编写一个从气象数据服务中心获取气象信息的客户端。该客户端每次会并发向从三个气象数据服务中心发起数据查询请求,并以返回最快的那个响应信息作为此次请求的应答返回值。下面的代码是这个例子的第一版实现:
// go-concurrency-pattern-10.go
package main
import (
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"time"
)
type result struct {
value string
}
func first(servers ...*httptest.Server) (result, error) {
c := make(chan result, len(servers))
queryFunc := func(server *httptest.Server) {
url := server.URL
resp, err := http.Get(url)
if err != nil {
log.Printf("http get error: %s\n", err)
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
c <- result{
value: string(body),
}
}
for _, serv := range servers {
go queryFunc(serv)
}
return <-c, nil
}
func fakeWeatherServer(name string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s receive a http request\n", name)
time.Sleep(1 * time.Second)
w.Write([]byte(name + ":ok"))
}))
}
func main() {
result, err := first(fakeWeatherServer("open-weather-1"),
fakeWeatherServer("open-weather-2"),
fakeWeatherServer("open-weather-3"))
if err != nil {
log.Println("invoke first error:", err)
return
}
log.Println(result)
}
我们使用 httptest 包的NewServer
函数创建了三个模拟器的“气象数据服务中心”,然后将这三个“气象数据服务中心”的实例传入 first 函数。后者创建了三个 goroutine,每个 goroutine 对应向一个“气象数据服务中心”发起查询请求。三个发起查询的 goroutine 都会将应答结果写入同一个 channel 中,first 获取第一个结果数据后就返回了。
我们运行一下这段示例代码:
$go run go-concurrency-pattern-10.go
2020/01/21 21:57:04 open-weather-3 receive a http request
2020/01/21 21:57:04 open-weather-1 receive a http request
2020/01/21 21:57:04 open-weather-2 receive a http request
2020/01/21 21:57:05 {open-weather-3:ok}
上述的例子运行在一种较为“理想”的情况下,但现实中网络情况错综复杂,远程服务的状态也不甚明朗,很可能出现服务端长时间没有响应的情况,这时为了保证良好的用户体验,我们需要对客户端的行为进行精细化的控制,比如:我们只等待 500ms,超过 500ms 仍然没有收到哪怕是一个“气象数据服务中心”的响应,我们的 first 函数就返回失败,以保证等待的时间在人类的忍耐力承受范围之内。我们在上述例子的基础上对 first 函数做的调整如下:
// go-concurrency-pattern-11.go
func first(servers ...*httptest.Server) (result, error) {
c := make(chan result, len(servers))
queryFunc := func(server *httptest.Server) {
url := server.URL
resp, err := http.Get(url)
if err != nil {
log.Printf("http get error: %s\n", err)
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
c <- result{
value: string(body),
}
}
for _, serv := range servers {
go queryFunc(serv)
}
select {
case r := <-c:
return r, nil
case <-time.After(500 * time.Millisecond):
return result{}, errors.New("timeout")
}
}
我们通过增加一个定时器,并通过 select 原语监视该定时器事件和响应 channel 上的事件。如果响应 channel 上长时间没有数据返回,则当定时器事件触发后,first 函数返回:
$ go run go-concurrency-pattern-11.go
2020/01/21 22:41:02 open-weather-1 receive a http request
2020/01/21 22:41:02 open-weather-2 receive a http request
2020/01/21 22:41:02 open-weather-3 receive a http request
2020/01/21 22:41:02 invoke first error: timeout
加上了 “超时模式” 的版本依然有一个明显的问题,那就是即便 first 函数因超时返回,三个已经创建的 goroutine 可能依然处在向“气象数据服务中心”请求或等待应答中,没有返回,也没有被回收,资源仍然在占用,即使它们的存在已经没有了任何意义。一种合理的解决思路是让这三个 goroutine 支持“取消”操作。这种情况下,我们一般使用 Go 的 context 包来实现“取消”模式。context 包是谷歌内部关于 Go 的一个最佳实践,Go 在 1.7 版本将 context 包引入到标准库中。下面是利用 context 包实现“取消模式”的代码:
// go-concurrency-pattern-12.go
package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"time"
)
type result struct {
value string
}
func first(servers ...*httptest.Server) (result, error) {
c := make(chan result)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queryFunc := func(i int, server *httptest.Server) {
url := server.URL
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)
return
}
req = req.WithContext(ctx)
log.Printf("query goroutine-%d: send request...\n", i)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("query goroutine-%d: get return error: %s\n", i, err)
return
}
log.Printf("query goroutine-%d: get response\n", i)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
c <- result{
value: string(body),
}
return
}
for i, serv := range servers {
go queryFunc(i, serv)
}
select {
case r := <-c:
return r, nil
case <-time.After(500 * time.Millisecond):
return result{}, errors.New("timeout")
}
}
func fakeWeatherServer(name string, interval int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s receive a http request\n", name)
time.Sleep(time.Duration(interval) * time.Millisecond)
w.Write([]byte(name + ":ok"))
}))
}
func main() {
result, err := first(fakeWeatherServer("open-weather-1", 200),
fakeWeatherServer("open-weather-2", 1000),
fakeWeatherServer("open-weather-3", 600))
if err != nil {
log.Println("invoke first error:", err)
return
}
fmt.Println(result)
time.Sleep(10 * time.Second)
}
在这版实现中,我们利用context.WithCancel
创建了一个可以取消的 context.Context 变量,在每个发起查询请求的 goroutine 中,我们用该变量更新了 request 中的 ctx 变量,使其支持“被取消”。这样在 first 函数中,无论是成功得到某个查询 goroutine 的返回结果,还是超时失败返回,通过defer cancel()
设定 cancel 函数在 first 函数返回前被执行,那些尚未返回的在途(on-flight)查询的 goroutine 都将收到 cancel 事件并退出(http 包支持利用 context.Context 的超时和 cancel 机制)。下面是运行该示例的结果:
$go run go-concurrency-pattern-12.go
2020/01/21 23:20:32 query goroutine-1: send request...
2020/01/21 23:20:32 query goroutine-0: send request...
2020/01/21 23:20:32 query goroutine-2: send request...
2020/01/21 23:20:32 open-weather-3 receive a http request
2020/01/21 23:20:32 open-weather-2 receive a http request
2020/01/21 23:20:32 open-weather-1 receive a http request
2020/01/21 23:20:32 query goroutine-0: get response
{open-weather-1:ok}
2020/01/21 23:20:32 query goroutine-1: get return error: Get http://127.0.0.1:56437: context canceled
2020/01/21 23:20:32 query goroutine-2: get return error: Get http://127.0.0.1:56438: context canceled
我们看到 first 函数在得到open-weather-1
这个“气象数据服务中心”的响应后,执行了 cancel 函数,其余两个尚未返回的查询 goroutine 中的http.DefaultClient.Do
调用便取消了请求,返回了context canceled
的错误,进而这两个 goroutine 得以退出。
本节要点:
- 了解基于 CSP 的并发模型与传统基于共享内存的并发模型的区别;
- 了解 Go 为实现 CSP 并发模型而提供的并发原语及功能;
- 掌握常见的并发模式,包括创建模式、管道模式、多种退出模式、超时和取消模式等