Go 语言中的并发
简单介绍 Go 中的并发编程。涉及内容主要为 goroutine,goroutine 间的通信(主要是 channel)和并发控制(等待和退出)。
Goroutine
语法
在一个函数调用前加上 go
即可。语法很简单,可以说是并发写起来最简单的程序语言了。
goroutine 与线程
开始可能会把 goroutine 当做线程来看,在我们这的计算密集型任务中,确实可以认为和线程差不多,但在 I/O 比较多的任务中,就能看到作为协程的一面了。在 Go 中,goroutine 数与线程数可以是 m 对 n 的关系,即 m 个 goroutine 运行在 n 个线程上,可以认为一个线程能调度执行多个 goroutine。线程内部调度 goroutine 比线程间的切换调度开销小很多,这也是协程的优势。和 python 那样的协程比起来,goroutine 除了能通过阻塞、系统调用让出线程之外,还能被调度(抢占式调度),避免一些 goroutine 执行时间过长,导致其他 goroutine 饥饿。
而 python 由于 GIL 全局锁的缘故,是无法真正意义上做到并发的。
GMP 模型动态演示

提示
不过在计算密集型的任务中协程并没有什么优势,要计算的任务量是固定的,过多的协程调度反而降低效率。所以在我们这写代码的时候,一般是把 goroutine 当作线程来用的,根据 CPU 核数来创建 goroutine,这要根据具体的任务类型来考虑。
通信
从创建 goroutine 的语法可以看到,并没有一个对应函数返回值的方法。如果想在创建 goroutine 的协程中获取返回值需要进行 goroutine 间的通信,常用的为 channel 和基于共享变量的通信。
闭包
一个函数和其词法环境的引用绑定在一起,是一个闭包。
go playground
package main
func closure() func() int {
tmp := 1
return func() int {
tmp++
fmt.Println(tmp)
return tmp
}
}
func main() {
test1 := closure()
test2 := closure()
test1() // 2
test1() // 3
test2() // 2
}
其中 tmp
本来是 closure
函数中的一个局部变量,但是 closure
的返回值是一个闭包函数,其中引用了 tmp
,那 tmp
就不能随着 closure
的结束而销毁,会逃逸到堆上。有点像创建了一个对象,对象中有个成员变量 tmp
,成员方法执行时会引用该变量。
Go 的闭包用着也挺方便的,不过局部变量逃逸到堆上也会引起一些额外开销,本来在栈上创建变量,随着栈销毁,变量也自动销毁,但如果逃逸到堆上就需要通过 GC 来回收。除了闭包也会有其他一些情况引起逃逸,如使用了 interface{}
动态类型,栈空间不足等。
闭包也容易引起一些问题,在闭包中引用的变量,可以认为是使用了它的引用(指针),这样就容易引发一些错误。
go playground
package main
func main() {
s := []int{1,2,3,4}
for _,elem := range s {
go func() {
fmt.Println(elem) // 引用的都是elem的地址
}()
}
runTime()
}
func runTime() {
start := time.Now()
defer fmt.Println(time.Since(start)) // 0
defer func() {
fmt.Println(time.Since(start)) // 预期的时间
}()
}
基于共享变量的通信
和其他编程语言类似,可以通过加锁的方式来比较安全地对变量进行并发方法,sync.Mutex
和 sync.RWMutex
。要注意的是锁被创建之后就不能拷贝了,要传递锁(作为参数等)只能传引用,这和 Go 的实现有关,要传引用也可以理解,要保证大家用的是同一把锁,才能起到控制访问的功能。
基于 Channel
channel 是 Go 中推荐使用的通信方式,一个 channel 可以认为是一个线程安全的消息队列,先进先出。
语法。
一些特殊情况。
- 向已经关闭的 channel 或为 nil 的 channel 中写,会引发 panic。
- 从为 nil 的 channel 中读,会永久阻塞。
- 从已经关闭的 channel 中读,如果 channel 内已经没有数据了,会返回相应零值,可以用
elem,ok := <-ch
,使用ok
来判断获取的值是不是有效值。
非阻塞式收发。 正常使用 channel 进行数据的收发都是阻塞式的,如果 channel 缓存已满,再往里写就会阻塞。如果 channel 中没有数据,尝试读的话也会引起阻塞。要实现非阻塞式的 channel 访问,使用
select
。select
是 Go 中一个特殊语法,看起来和switch
有点像。
select {
case ele := <-readCh:
case e -> writeCh:
case <-checkCh:
default:
...
}
select
语句的效果是看各个 case
的 channel 操作是否可以完成(不会被阻塞)。如果有,从所有可以执行的 case
中随机选一个执行。如果没有看有没有 default
语句,有的话执行 defalut
语句,如果还是没有的话挂起,等待可执行条件。
注意
一些特殊情况:
- 空的
select
语句,也就是select{}
会使当前 goroutine 直接挂起,永远无法被唤醒。 - 只有一个
case
,和直接使用 channel 效果是一样的。 - 从已关闭的 channel 中读,是直接可执行的。
可以简单了解一下 select
语句的实现,一些特殊情况会单独处理,常规逻辑是这样的:
- 以一定顺序锁定所有
case
中的 channel,再根据随机生成的轮询顺序,遍历各个case
查找是否有可以立即执行的case
,有的话选定对应的case
执行,解锁各 channel。 - 如果没有可以立即执行的
case
,也没有default
,将当前 goroutine 加入到所有相关 channel 的收发队列中,将自己挂起。 - 当该 goroutine 再次被唤醒时,再锁定各个
case
,如此循环。
并发控制
退出
一个 goroutine 不能直接停止另外一个 goroutine,如果可以的话可能会导致 goroutine 间的共享变量落在未定义的状态上,所以只能让 goroutine 自己退出。
- 利用
select
和被关闭的 channel 的性质,能实现简单的退出。
go playground
package main
import (
"fmt"
"time"
)
func main() {
control := make(chan struct{})
inData := make(chan int)
go func() {
for {
select {
case <-control:
fmt.Println("exit")
return
case data,ok := <-inData:
if ok {
fmt.Println("inData data:",data)
} else {
fmt.Println("inData closed")
return
}
}
}
}()
inData <- 1
inData <- 2
close(inData) // 先关闭 inData 通道
time.Sleep(time.Second) // 等待 goroutine 处理完
close(control) // 然后再关闭 control 通道
}
- 使用
Context
Context
在本质上和上面的做法是类似的,通过关闭 channel 来进行消息传递,不过做了些封装,使用更方便一些。
go playground
package main
func main() {
ctx,cancel := context.WithTimeout(context.Background(),1*time.Second)
defer cancel()
go handle(ctx,1500*time.Millisecond)
select {
case <-ctx.Done():
fmt.Println("main",ctx.Err())
}
}
func handle(ctx context.Context,duration time.Duration) {
select {
case <-ctx.Done():
fmt.Println("handle",ctx.Err())
case <-time.After(duration):
fmt.Println("process request with",duration)
}
}
等待
很多时候一个 goroutine 要等待其他一些 goroutine 结束之后再执行后续流程,比如两个任务有前后依赖关系,可以利用 channel 的阻塞进行等待。
- goroutine 结束之后发送完成信号。
workerNum := 10
finishCh := make(chan struct{},workerNum)
worker := func() {
// do something
finish <- struct{}{}
}
for i := 0; i < workerNum; i++ {
go worker()
}
// 等待
for i := 0; i < workerNum; i++ {
<-finish
}
- 利用
sync.WaitGroup
(类似信号量)。
workerNum := 10
wg := &sync.WaitGroup{}
// 要稍微注意一下Add和Done的位置
wg.Add(workerNum)
worker := func() {
// do something
wg.Done()
}
for i := 0; i < workerNum; i++ {
go worker()
}
// 等待
wg.Wait()
- 利用用
select
语句。
系统中的一些应用
- map-reduce。
- 几个任务流顺序执行。
- 递归中的并发数控制。
性能分析工具
- benchmark 基准测试。
- pprof。