Skip to content

Go 语言中的并发

约 2043 字大约 7 分钟

Go

2023-09-28

简单介绍 Go 中的并发编程. 涉及内容主要为 goroutine, goroutine 间的通信(主要是 channel), 并发控制(等待和退出).

Goroutine

语法

在一个函数调用前加上 go 即可. 语法很简单, 可以说是并发写起来最简单的程序语言了.

goroutine 与线程

开始可能会把 goroutine 当做线程来看, 在我们这的计算密集型任务中, 确实可以认为和线程差不多, 但在 I/O 比较多的任务中, 就能看到作为协程的一面了. 在 Go 中, goroutine 数与线程数可以是 m 对 n 的关系, 即 m 个 goroutine 运行在 n 个线程上, 可以认为一个线程能调度执行多个 goroutine. 线程内部调度 goroutine 比线程间的切换调度开销小很多, 这也是协程的优势. 和 python 那样的协程比起来, goroutine 除了能通过阻塞、系统调用让出线程之外, 还能被调度(抢占式调度), 避免一些 goroutine 执行时间过长, 导致其他 goroutine 饥饿. 而 python 由于 GIL 全局锁的缘故, 是无法真正意义上做到并发的.

GMP 模型动态演示

GMP
GMP

提示

不过在计算密集型的任务中协程并没有什么优势, 要计算的任务量是固定的, 过多的协程调度反而降低效率. 所以在我们这写代码的时候, 一般是把 goroutine 当作线程来用的, 根据 CPU 核数来创建 goroutine, 这要根据具体的任务类型来考虑.

通信

从创建 goroutine 的语法可以看到, 并没有一个对应函数返回值的方法. 如果想在创建 goroutine 的协程中获取返回值需要进行 goroutine 间的通信, 常用的为 channel 和基于共享变量的通信.

闭包

一个函数和其词法环境的引用绑定在一起, 是一个闭包.

go playground

package main

func closure() func() int {
	tmp := 1
	return func() int {
		tmp++
		fmt.Println(tmp)
		return tmp
	}
}

func main() {
	test1 := closure()
	test2 := closure()
	test1()  // 2
	test1()  // 3
	test2()  // 2
}

其中 tmp 本来是 closure 函数中的一个局部变量, 但是 closure 的返回值是一个闭包函数, 其中引用了 tmp, 那 tmp 就不能随着 closure 的结束而销毁, 会逃逸到堆上. 有点像创建了一个对象, 对象中有个成员变量 tmp, 成员方法执行时会引用该变量.

Go 的闭包用着也挺方便的, 不过局部变量逃逸到堆上也会引起一些额外开销, 本来在栈上创建变量, 随着栈销毁, 变量也自动销毁, 但如果逃逸到堆上就需要通过 GC 来回收. 除了闭包也会有其他一些情况引起逃逸, 如使用了 interface{} 动态类型, 栈空间不足等.

闭包也容易引起一些问题, 在闭包中引用的变量, 可以认为是使用了它的引用(指针), 这样就容易引发一些错误.

go playground

package main

func main() {	
	s := []int{1, 2, 3, 4}
	for _, elem := range s {
        go func() {
			fmt.Println(elem)  // 引用的都是elem的地址
		}()
	}
	runTime()
}

func runTime() {
    start := time.Now()
    defer fmt.Println(time.Since(start))  // 0
    defer func() {
        fmt.Println(time.Since(start))  // 预期的时间
    }()
}

基于共享变量的通信

和其他编程语言类似, 可以通过加锁的方式来比较安全地对变量进行并发方法, sync.Mutexsync.RWMutex. 要注意的是锁被创建之后就不能拷贝了, 要传递锁(作为参数等)只能传引用, 这和 Go 的实现有关, 要传引用也可以理解, 要保证大家用的是同一把锁, 才能起到控制访问的功能.

基于 Channel

channel 是 Go 中推荐使用的通信方式, 一个 channel 可以认为是一个线程安全的消息队列, 先进先出.

  1. 语法.

    Go Channel 详解

  2. 一些特殊情况.

    • 向已经关闭的 channel 或为 nil 的 channel 中写, 会引发 panic.
    • 从为 nil 的 channel 中读, 会永久阻塞.
    • 从已经关闭的 channel 中读, 如果 channel 内已经没有数据了, 会返回相应零值, 可以用 elem, ok := <-ch, 使用 ok 来判断获取的值是不是有效值.
  3. 非阻塞式收发. 正常使用 channel 进行数据的收发都是阻塞式的, 如果 channel 缓存已满, 再往里写就会阻塞. 如果 channel 中没有数据, 尝试读的话也会引起阻塞. 要实现非阻塞式的 channel 访问, 使用 select. select 是 Go 中一个特殊语法, 看起来和 switch 有点像.

select {
	case ele := <-readCh:
	case e -> writeCh:
	case <-checkCh:
	default:
		...
}

select 语句的效果是看各个 case 的 channel 操作是否可以完成(不会被阻塞). 如果有, 从所有可以执行的 case 中随机选一个执行. 如果没有看有没有 default 语句, 有的话执行 defalut 语句, 如果还是没有的话挂起, 等待可执行条件.

注意

一些特殊情况:

  1. 空的 select 语句, 也就是 select{} 会使当前 goroutine 直接挂起, 永远无法被唤醒.
  2. 只有一个 case, 和直接使用 channel 效果是一样的.
  3. 从已关闭的 channel 中读, 是直接可执行的.

可以简单了解一下 select 语句的实现, 一些特殊情况会单独处理, 常规逻辑是这样的:

  1. 以一定顺序锁定所有 case 中的 channel, 再根据随机生成的轮询顺序, 遍历各个 case 查找是否有可以立即执行的 case, 有的话选定对应的 case 执行, 解锁各 channel.
  2. 如果没有可以立即执行的 case, 也没有 default, 将当前 goroutine 加入到所有相关 channel 的收发队列中, 将自己挂起.
  3. 当该 goroutine 再次被唤醒时, 再锁定各个 case, 如此循环.

并发控制

退出

一个 goroutine 不能直接停止另外一个 goroutine, 如果可以的话可能会导致 goroutine 间的共享变量落在未定义的状态上, 所以只能让 goroutine 自己退出.

  1. 利用 select 和被关闭的 channel 的性质, 能实现简单的退出.

go playground

package main

import (
	"fmt"
	"time"
)

func main() {
	control := make(chan struct{})
	inData := make(chan int)

	go func() {
		for {
			select {
			case <-control:
				fmt.Println("exit")
				return
			case data, ok := <-inData:
				if ok {
					fmt.Println("inData data:", data)
				} else {
					fmt.Println("inData closed")
					return
				}
			}
		}
	}()

	inData <- 1
	inData <- 2
	close(inData) // 先关闭 inData 通道
	time.Sleep(time.Second) // 等待 goroutine 处理完
	close(control) // 然后再关闭 control 通道
}
  1. 使用 ContextContext 在本质上和上面的做法是类似的, 通过关闭 channel 来进行消息传递, 不过做了些封装, 使用更方便一些.

go playground

package main

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go handle(ctx, 1500*time.Millisecond)
	select {
	case <-ctx.Done():
		fmt.Println("main", ctx.Err())
	}
}

func handle(ctx context.Context, duration time.Duration) {
	select {
	case <-ctx.Done():
		fmt.Println("handle", ctx.Err())
	case <-time.After(duration):
		fmt.Println("process request with", duration)
	}
}

等待

很多时候一个 goroutine 要等待其他一些 goroutine 结束之后再执行后续流程, 比如两个任务有前后依赖关系, 可以利用 channel 的阻塞进行等待.

  1. goroutine 结束之后发送完成信号.
workerNum := 10
finishCh := make(chan struct{}, workerNum)
worker := func() {
	// do something
	finish <- struct{}{}
}
for i := 0; i < workerNum; i++ {
	go worker()
}
// 等待
for i := 0; i < workerNum; i++ {
	<-finish
}
  1. 利用 sync.WaitGroup (类似信号量).
workerNum := 10
wg := &sync.WaitGroup{}
// 要稍微注意一下Add和Done的位置
wg.Add(workerNum)
worker := func() {
	// do something
	wg.Done()
}
for i := 0; i < workerNum; i++ {
	go worker()
}
// 等待
wg.Wait()
  1. 利用用 select 语句.

系统中的一些应用

  1. map-reduce.
  2. 几个任务流顺序执行.
  3. 递归中的并发数控制.

性能分析工具

  1. benchmark 基准测试.
  2. pprof.




变更历史

最后更新于: 查看全部变更历史
  • first commit

    于 2024/9/20
  • 调整博客分类+修改about-me.md

    于 2024/9/24
  • 给予文件夹顺序

    于 2024/9/24
  • 升级版本+规整文档中的格式

    于 2024/10/11
  • 升级主题+新增文章+修改格式

    于 2024/10/14
  • 新增文字+CRLF全部替换为LF

    于 2024/10/17
  • update

    于 2024/10/17
  • 开启go 在线运行

    于 2024/10/25
  • 整理文章格式

    于 2024/10/29
  • 升级主题+整理文章格式

    于 2024/11/1
  • 整理图片和文章

    于 2024/11/7
  • update

    于 2024/11/7
  • docs: update docs

    于 2024/11/19
  • docs: update docs

    于 2024/12/2
  • modify(docs): remanage folders and rename files

    于 2024/12/17

Keep It Simple