Golang Channel
设计原理
Go 语言中最常见的、也是经常被人提及的设计模式就是 —— 不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程冲突的问题,我们需要限制同一时间能够读写这些变量的线程数量,这与 Go 语言鼓励的方式并不相同。
|
|
虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,也就是通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Go 语言中的 Goroutine 会通过 Channel 传递数据。
|
|
上面中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。
先入先出
目前的 Channel 收发操作均遵循了先入先出(FIFO)的设计,具体规则如下:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
这种 FIFO 的设计是相对好理解的,但是 Go 语言稍早版本的实现却不是严格遵循这一语义的,runtime: make sure blocked channels run operations in FIFO order 中提出了有缓冲区的 Channel 在执行收发操作时没有遵循 FIFO 的规则2。
- 发送方会向缓冲区中写入数据,然后唤醒接收方,多个接收方会尝试从缓冲区中读取数据,如果没有读取到就会重新陷入休眠;
- 接收方会从缓冲区中读取数据,然后唤醒发送方,发送方会尝试向缓冲区写入数据,如果缓冲区已满就会重新陷入休眠;
这种基于重试的机制会导致 Channel 的处理不会遵循 FIFO 的原则。经过 runtime: simplify buffered channels 和 runtime: simplify chan ops, take 2 两个提交的修改,带缓冲区和不带缓冲区的 Channel 都会遵循先入先出对数据进行接收和发送。
无锁管道
锁是一种常见的并发控制技术,我们一般会将锁分成乐观锁和悲观锁,即乐观并发控制和悲观并发控制,无锁(lock-free)队列更准确的描述是使用乐观并发控制的队列。乐观并发控制也叫乐观锁,但是它并不是真正的锁,很多人都会误以为乐观锁是一种真正的锁,然而它只是一种并发控制的思想。
乐观并发控制本质上是基于验证的协议,我们使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。
Channel 在运行时的内部表示是 runtime.hchan
,该结构体中包含了一个用于保护成员变量的互斥锁,从某种程度上说,Channel 是一个用于同步和通信的有锁队列。使用互斥锁解决程序中可能存在的线程竞争问题是很常见的,我们能很容易地实现有锁队列。
然而锁导致的休眠和唤醒会带来额外的上下文切换,如果临界区过小,加锁解锁导致的额外开销就会成为性能瓶颈。1994 年的论文 Implementing lock-free queues 就研究了如何使用无锁的数据结构实现先进先出队列,而 Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:
- 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
- 异步 Channel — 基于环形缓存的传统生产者消费者模型;
chan struct{}
类型的异步 Channel —struct{}
类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义。
这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但是在实际的基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。
因为目前通过 CAS 实现的无锁 Channel 没有提供 FIFO 的特性,所以该提案暂时也被搁浅了。
数据结构
Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构体:
|
|
runtime.hchan
结构体中的五个字段 qcount
、dataqsiz
、buf
、sendx
、recv
构建底层的循环队列:
qcount
— Channel 中的元素个数;dataqsiz
— Channel 中的循环队列的长度;buf
— Channel 的缓冲区数据指针;sendx
— Channel 的发送操作处理到的位置;recvx
— Channel 的接收操作处理到的位置;
除此之外,elemsize
和 elemtype
分别表示当前 Channel 能够收发的元素类型和大小;sendq
和 recvq
存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq
表示,链表中所有的元素都是 runtime.sudog
结构:
runtime.sudog
表示一个在等待列表中的 Goroutine,该结构体中存储了阻塞的相关信息以及两个分别指向前后 runtime.sudog
的指针。
创建管道
Go 语言中所有 Channel 的创建都会使用 make
关键字。编译器会将 make(chan int, 10)
表达式被转换成 OMAKE
类型的节点,并在类型检查阶段将 OMAKE
类型的节点转换成 OMAKECHAN
类型:
这一阶段会对传入 make
关键字的缓冲区大小进行检查,如果我们不向 make
传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。
OMAKECHAN
类型的节点最终都会在 SSA 中间代码生成阶段之前被转换成调用 runtime.makechan
或者 runtime.makechan64
的函数:
|
|
runtime.makechan
和 runtime.makechan64
会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况,我们重点关注 runtime.makechan
函数:
|
|
上述代码根据 Channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan
结构体和缓冲区:
- 如果当前 Channel 中不存在缓冲区,那么就只会为
runtime.hchan
分配一段内存空间; - 如果当前 Channel 中存储的类型不是指针类型,就会为当前的 Channel 和底层的数组分配一块连续的内存空间;
- 在默认情况下会单独为
runtime.hchan
和缓冲区分配内存;
在函数的最后会统一更新 runtime.hchan
的 elemsize
、elemtype
和 dataqsiz
几个字段。
发送数据
当我们想要向 Channel 发送数据时,就需要使用 ch <- i
语句,编译器会将它解析成 OSEND
节点并在 cmd/compile/internal/gc.walkexpr
函数中转换成 runtime.chansend1
:
runtime.chansend1
只是调用了 runtime.chansend
并传入 Channel 和需要发送的数据。runtime.chansend
是向 Channel 中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑,如果我们在调用时将 block
参数设置成 true
,那么就表示当前发送操作是一个阻塞操作:
在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止发生竞争条件。如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报"send on closed channel"
错误并中止程序。
因为 runtime.chansend
函数的实现比较复杂,所以我们这里将该函数的执行过程分成以下的三个部分:
- 当存在等待的接收者时,通过
runtime.send
直接将数据发送给阻塞的接收者; - 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
- 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据;
直接发送
如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend
函数会从接收队列 recvq
中取出最先陷入等待的 Goroutine 并直接向它发送数据:
下图展示了 Channel 中存在等待数据的 Goroutine 时,向 Channel 发送数据的过程:
发送数据时会调用 runtime.send
,该函数的执行可以分成两个部分:
- 调用
runtime.sendDirect
函数将发送的数据直接拷贝到x = <-c
表达式中变量x
所在的内存地址上; - 调用
runtime.goready
将等待接收数据的 Goroutine 标记成可运行状态Grunnable
并把该 Goroutine 放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度时就会立刻唤醒数据的接收方;
需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext
中,程序没有立刻执行该 Goroutine。
缓冲区
如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,就会执行下面这段代码:
|
|
在这里我们首先会使用 chanbuf
计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove
将发送的数据拷贝到缓冲区中并增加 sendx
索引和 qcount
计数器。
如果当前 Channel 的缓冲区未满,向 Channel 发送的数据会存储在 Channel 中 sendx
索引所在的位置并将 sendx
索引加一,由于这里的 buf
是一个循环数组,所以当 sendx
等于 dataqsiz
时就会重新回到数组开始的位置。
阻塞发送
当 Channel 没有接收者能够处理数据时,向 Channel 发送数据就会被下游阻塞,当然使用 select
关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞地发送数据会执行下面的代码,我们可以简单梳理一下这段代码的逻辑:
|
|
- 调用
runtime.getg
获取发送数据使用的 Goroutine; - 执行
runtime.acquireSudog
函数获取runtime.sudog
结构体并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 Select 控制结构中和待发送数据的内存地址等; - 将刚刚创建并初始化的
runtime.sudog
加入发送等待队列,并设置到当前 Goroutine 的waiting
上,表示 Goroutine 正在等待该sudog
准备就绪; - 调用
runtime.goparkunlock
函数将当前的 Goroutine 陷入沉睡等待唤醒; - 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放
runtime.sudog
结构体;
在最后,函数会返回 true
表示这向 Channel 发送数据的结束。
小结
我们在这里可以简单梳理和总结一下使用 ch <- i
表达式向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的
recvq
上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前的 Goroutine 并将其设置成下一个运行的 Goroutine; - 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们就会直接将数据直接存储到当前缓冲区
sendx
所在的位置上; - 如果不满足上面的两种情况,就会创建一个
runtime.sudog
结构并将其加入 Channel 的sendq
队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的
runnext
属性,但是并不会立刻触发调度; - 发送数据时并没有找到接收方并且缓冲区已经满了,这时就会将自己加入 Channel 的
sendq
队列并调用runtime.goparkunlock
触发 Goroutine 的调度让出处理器的使用权;
接收数据
我们接下来继续介绍 Channel 操作的另一方 — 数据的接收。Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
这两种不同的方法经过编译器的处理都会变成 ORECV
类型的节点,后者会在类型检查阶段被转换成 OAS2RECV
类型。数据的接收操作遵循以下的路线图:
虽然不同的接收方式会被转换成 runtime.chanrecv1
和 runtime.chanrecv2
两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv
。
当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark
直接让出处理器的使用权。
|
|
如果当前 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会清除 ep
指针中的数据并立刻返回。
除了上述两种特殊情况,使用 runtime.chanrecv
从 Channel 接收数据时还包含以下三种不同情况:
- 当存在等待的发送者时,通过
runtime.recv
直接从阻塞的发送者或者缓冲区中获取数据; - 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
- 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据;
直接接收
当 Channel 的 sendq
队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send
函数,而接收数据时使用 runtime.recv
函数:
runtime.recv
函数的实现比较复杂:
|
|
该函数会根据缓冲区的大小分别处理不同的情况:
- 如果 Channel 不存在缓冲区;
- 调用
runtime.recvDirect
函数会将 Channel 发送队列中 Goroutine 存储的elem
数据拷贝到目标内存地址中;
- 调用
- 如果 Channel 存在缓冲区;
- 将队列中的数据拷贝到接收方的内存地址;
- 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
无论发生哪种情况,运行时都会调用 runtime.goready
函数将当前处理器的 runnext
设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
关闭管道
编译器会将用于关闭管道的 close
关键字转换成 OCLOSE
节点以及 runtime.closechan
的函数调用。
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接 panic
并抛出异常:
处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑了,下面这段代码的主要工作就是将 recvq
和 sendq
两个队列中的数据加入到 Goroutine 列表 gList
中,与此同时该函数会清除所有 sudog
上未被处理的元素:
|
|
该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready
触发调度。
总结
Channel 是 Go 语言能够提供强大并发能力的原因之一,我们在这一节中分析了 Channel 的设计原理、数据结构以及发送数据、接收数据和关闭 Channel 这些基本操作,相信能够帮助大家更好地理解 Channel 的工作原理。