[译] Go语言的有缓冲channel和无缓冲channel

Go中的channel十分强大,理解channel的内部机制后再去使用它可以发挥出更大威力。另外,选择使用有缓冲channel还是无缓冲channel会影响到我们程序的行为表现,以及性能。

无缓冲channel

无缓冲channel在消息发送时需要接收者就绪。声明无缓冲channel的方式是不指定缓冲大小。以下是一个列子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"sync"
"time"
)

func main() {
c := make(chan string)

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
c <- `foo`
}()

go func() {
defer wg.Done()

time.Sleep(time.Second * 1)
println(`Message: `+ <-c)
}()

wg.Wait()
}

第一个协程会在发送消息foo时阻塞,原因是接收者还没有就绪:这个特性在标准文档中描述如下:

如果缓冲大小设置为0或者不设置,channel为无缓冲类型,通信成功的前提是发送者和接收者都处于就绪状态。

effective Go文档也有相应的描述:

无缓冲channel,发送者会阻塞直到接收者接收了发送的值。

为了更好的理解channel的特性,接下来我们分析channel的内部结构。

内部结构

channel的结构体hchan被定义在runtime包中的chan.go文件中。以下是无缓冲channel的内部结构(本小节先介绍无缓冲channel,所以暂时忽略了hchan结构体中和缓冲相关的属性):

hchan结构

channel中持有两个链表,接收者链表recvq和发送者链表sendq,它们的类型是waitq。链表中的元素为sudog结构体类型,它包含了发送者或接收者的协程相关的信息。通过这些信息,Go可以在发送者不存在时阻塞住接收者,反之亦然。

以下是我们前一个例子的流程:

  1. 创建一个发送者列表和接收者列表都为空的channel。
  2. 第一个协程向channel发送foo变量的值,第16行。
  3. channel从池中获取一个sudog结构体变量,用于表示发送者。sudog结构体会保持对发送者所在协程的引用,以及foo的引用。
  4. 发送者加入sendq队列。
  5. 发送者协程进入等待状态。
  6. 第二个协程将从channel中读取一个消息,第23行。
  7. channel将sendq列表中等待状态的发送者出队列。
  8. chanel使用memmove函数将发送者要发送的值进行拷贝,包装入sudog结构体,再传递给channel接收者的接收变量。
  9. 在第五步中被挂起的第一个协程将恢复运行并释放第三步中获取的sudog结构体。

如流程所描述,发送者协程阻塞直至接收者就绪。但是,必要的时候,我们可以使用有缓冲channel来避免这种阻塞。

有缓冲channel

简单修改前面的例子,为channel添加缓冲,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"sync"
"time"
)

func main() {
c := make(chan string, 2)

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

c <- `foo`
c <- `bar`
}()

go func() {
defer wg.Done()

time.Sleep(time.Second * 1)
println(`Message: `+ <-c)
println(`Message: `+ <-c)
}()

wg.Wait()
}

通过这个例子,我们来分析hchan结构体中与缓冲相关的属性:

带缓冲属性的hchan结构

缓冲相关的五个属性:

  • qcount 当前缓冲中元素个数
  • dataqsize 缓冲最大数量
  • buf 指向缓冲区内存,这块内存空间可容纳dataqsize个元素
  • sendx 缓冲区中下一个元素写入时的位置
  • recvx 缓冲区中下一个被读取的元素的位置

通过sendxrecvx,缓冲区工作机制类似于环形队列

channel中的循环队列

环形队列使得我们可以保证缓冲区有序,并且不需要在每次取出元素时对缓冲区重新排序。

当缓冲区满了时,向缓冲区添加元素的协程将被加入sender链表中,并且切换到等待状态,就像我们在上一节描述的那样。之后,当程序读取缓冲区时,recvx位置的元素将被返回,等待状态的协程将恢复执行,它要发送的值将被存入缓冲区。这使得channel能够保证先进先出的特性。

缓存区不足引起的延时

创建channel时指定的缓冲区大小,可能会对性能造成巨大的影响。下面是对不同缓冲区大小的channel做的压力测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package bench

import (
"sync"
"sync/atomic"
"testing"
)

func BenchmarkWithNoBuffer(b *testing.B) {
benchmarkWithBuffer(b, 0)
}

func BenchmarkWithBufferSizeOf1(b *testing.B) {
benchmarkWithBuffer(b, 1)
}

func BenchmarkWithBufferSizeEqualsToNumberOfWorker(b *testing.B) {
benchmarkWithBuffer(b, 5)
}

func BenchmarkWithBufferSizeExceedsNumberOfWorker(b *testing.B) {
benchmarkWithBuffer(b, 25)
}

func benchmarkWithBuffer(b *testing.B, size int) {
for i := 0; i < b.N; i++ {
c := make(chan uint32, size)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for i := uint32(0); i < 1000; i++ {
c <- i%2
}
close(c)
}()

var total uint32
for w := 0; w < 5; w++ {
wg.Add(1)
go func() {
defer wg.Done()

for {
v, ok := <-c
if !ok {
break
}
atomic.AddUint32(&total, v)
}
}()
}

wg.Wait()
}
}

在这个测试程序中,包含一个生产者,向channel中发送整型元素;包含多个消费者,从channel中读取数据,并将它们原子的加入变量total中。

运行这个测试十次,并通过benchstat分析结果:

1
2
3
4
5
name                                    time/op
WithNoBuffer-8 306µs ± 3%
WithBufferSizeOf1-8 248µs ± 1%
WithBufferSizeEqualsToNumberOfWorker-8 183µs ± 4%
WithBufferSizeExceedsNumberOfWorker-8 134µs ± 2%

说明合适的缓冲区大小确实会使得程序执行得更快!让我们来分析测试程序以确认耗时反生在何处。

追踪耗时

通过Go工具trace中的synchronization blocking profile来查看测试程序被同步原语阻塞所消耗的时间。接收时的耗时对比:无缓冲channel为9毫秒,缓冲大小为50的channel为1.9毫秒。

profile图

发送时的耗时对比:有缓冲channel将耗时缩小了五倍。

profile图

可以得出结论,缓冲区的大小确实在程序性能方面扮演了重要角色。

英文原文: Go: Buffered and Unbuffered Channels by Vincent Blanchon (https://medium.com/@blanchon.vincent/go-buffered-and-unbuffered-channels-29a107c00268)

本文完,作者yoko,尊重劳动人民成果,转载请注明原文出处: https://pengrl.com/p/21027/

0%