golang源码阅读之定时器以及避坑指南

本文分为三部分:
第一部分为阅读源码后的总结。
第二部分为高性能场景使用定时器需要注意的地方。
第三部分为系统库源码以及我写的注释。

本文基于go version 1.11.4

先放总结

所有业务层的timer对象都被底层的全局容器变量所持有及管理。这里说的全局容器是一个桶(bucket)数组,数组大小固定为64,数组的每个元素为一个桶对象,每个桶内包含一个最小堆和一个loop循环协程(以下简称桶协程)。

timer对象归哪个桶管理取决于申请该timer对象时G所在的P(通过P的id取余64作为桶数组下标)。
(关于golang线程调度模型中G P M的概念超出了本文的讨论范围。这里只简单理解G为当前goroutine,P为当前goroutine所属的任务队列。)

由于hash算法和P的id相关,所以一个程序最多有min(64, GOMAXPROCS)个桶在使用。
另外,和桶一对一关联的桶协程是懒开启的,只在桶被初次使用时(即有timer对象hash到了这个桶)才开启,开启后桶协程内部的循环永远不会退出。

不将桶数量直接设置为GOMAXPROCS是因为那样的话数组需要动态申请。
桶数量设置为64是权衡在不同环境下(GOMAXPROCS不同)内存使用以及性能间的一种经验值。

每个桶都有一个最小堆,根据桶内所有timer的超时触发绝对时间点做调整。
关于数据结构最小堆的详细介绍读者可以自行查找资料,这里你只需要知道堆的底层使用数组实现,插入和删除的时间复杂度都是O(logn),并且插入和删除后,最小堆始终保持最小的元素在堆顶位置,所以获取最小元素是O(1)的。
事实上,golang定时器中的最小堆使用的是四叉树实现,相较于常见的二叉树实现,在节点数量比较多时,四叉树对底层数组的访问路径的局部性更好,CPU cache更友好些。

当桶内没timer时,桶协程被挂起。即rescheduling状态。
当桶内还有timer时,桶内协程睡眠直到最小超时触发时间点后再唤醒。即sleeping状态。
当往桶内加入新timer而该timer的超时触发时间点正好是当前桶内最小的,则唤醒桶协程。让桶协程重新判断,设置新的最小超时触发时间点后进入sleeping状态。

由于桶数量是固定的,所以hash桶的操作是无锁的。
但是桶内有互斥锁,因为桶协程业务层调用Timer的接口可能并行操作桶内的最小堆和各种标志等变量。

使用timer时,以下几点开销要做到心里有数,桶内互斥锁的开销,最小堆容器管理的开销,协程调度的开销,创建timer对象时、超时触发返回当前时间时、桶协程内部都会有获取当前时间调用的开销。

高性能场景如何使用

阅读源码的目的,是学习别人写的好的地方,以及保证正确的使用姿势。

你能看出下面这段伪代码存在的问题吗?

1
2
3
4
5
6
7
8
9
func consume() {
t := new time.NewTimer(5 * time.Second)
select {
case <- ch:
// 做相关的业务
case <- t:
// 超时了,做超时处理
}
}

这是timer常见的一种用法,为某个消费者设置消费超时时间。
如果在超时时间内消费ch成功了,则timer对象在业务层没有被触发。
那么问题来了,底层从最小堆中删除timer只有两种情况,要么在业务层显式调用Stop方法停止定时器,要么底层判断timer已经到达超时触发时间。刚才这种情况,底层只能等到超时触发时间(伪代码中为5秒后)才能从容器中移除该timer。即资源被延时释放了。
作为写业务层代码的人,很可能会误认为业务层已经不再使用且不再持有该timer了,资源就被释放了。
如果我们的生产消费非常的频繁,底层容器将堆积大量的timer,从而浪费大量内存和CPU资源。

另外,假设你在其它场景使用了time.Ticker(不同于Timer只在超时后触发一次,Ticker将周期性触发超时)而没有调用Stop(即使业务层已不再持有Ticker对象了),情况将更糟糕,底层容器将一直持有Ticker对象,并周期性触发超时,然后修改下次超时时间点。资源将永远得不到释放,内存和CPU将永久性的泄漏。

正确的做法应该是:

Ticker对象不再使用后,显式调用Stop方法。

Timer对象不再使用后,在高性能场景下,也应该显式调用Stop方法,及时释放资源。
那么这又分为两种情况,Timer是否已经在业务层触发超时了。
通过阅读系统库源码我们可以得知,对已超时的Timer调用Stop方法内部有变量保护,是安全的。但是这种保护需要拿一次桶内的互斥锁,高性能场景下也需要考虑这个消耗。
所以正确释放Timer对象的做法是,简单点就在上面伪代码的select结束后统一调用Stop,精细点就在ch得到消费时调用Stop。

我之后会再写一篇文章,关于在某些特定场景下如何自己实现一个简易timer,牺牲部分我们不需要的精确度来大幅提高超时业务逻辑的性能。

部分源码的说明

涉及到文件为:

  • src/time/sleep.go
  • src/time/tick.go
  • src/runtime/time.go
  • 其它一些runtime中的代码

首先看time/sleep.go,里面有time.Timer的实现,time.Timer比较简单,只是对runtime包中timer的一层wrap。这层自身实现的最核心功能是将底层的超时回调转换为发送channel消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 这里可以看到是对runtimeTimer的wrap
type Timer struct {
C <-chan Time
r runtimeTimer
}

func NewTimer(d Duration) *Timer {
// 注意,这里的channel是带缓冲的,保证了业务层如果不接收这个channel,底层的
// 桶协程不会因为发送channel而被阻塞
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
// 向底层timer传入sendTime回调函数
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

// 将底层的超时回调转化为channel发送,并写入了当前时间
func sendTime(c interface{}, seq uintptr) {
// Non-blocking send of time on c.
// Used in NewTimer, it cannot block anyway (buffer).
// Used in NewTicker, dropping sends on the floor is
// the desired behavior when the reader gets behind,
// because the sends are periodic.
select {
case c.(chan Time) <- Now():
default:
}
}

// After就是匿名Timer
func After(d Duration) <-chan Time {
return NewTimer(d).C
}

接下来我们看runtime/time.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// timer结构体
type timer struct {
tb *timersBucket // timer所属的桶
i int // 最小堆中的下标,为-1时则不可用了

// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(arg, now) in the timer goroutine, so f must be
// a well-behaved function and not block.
when int64 // 超时时间点
period int64 // 如果是Ticker,会有这个值,周期性触发
f func(interface{}, uintptr) // 回调
arg interface{} // time.Timer会传入channel变量,一会回调时把channel带回去
seq uintptr // 这个变量目前没有用
}

// 桶数量固定为64
const timersLen = 64

// 全局桶数组,还对cache伪共享做了优化
var timers [timersLen]struct {
timersBucket

// The padding should eliminate false sharing
// between timersBucket values.
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}

// addTimer时,首先P id取余64获取timer所属的bucket
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}

func (tb *timersBucket) addtimerLocked(t *timer) bool {
// 负数参数保护性代码
if t.when < 0 {
t.when = 1<<63 - 1
}
// 最小堆插入操作
t.i = len(tb.t)
tb.t = append(tb.t, t)
if !siftupTimer(tb.t, t.i) {
return false
}
// 下标为0,说明该timer的触发时间为当前桶中最早的
if t.i == 0 {
// 桶协程在sleep,唤醒它
if tb.sleeping {
tb.sleeping = false
notewakeup(&tb.waitnote)
}
// 桶协程被挂起了,重新调度
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0)
}
}
// 如果timer所属的桶还没有创建,创建并开启桶协程
if !tb.created {
tb.created = true
go timerproc(tb)
}
return true
}

// 桶协程,注意,这里有两层for循环,最外面的for是永远不会退出的
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
// 进互斥锁
lock(&tb.lock)
// 睡眠标志修改
tb.sleeping = false
// 获取当前时间
now := nanotime()
delta := int64(-1)
for {
// 如果桶内没有timer,直接退出内层for
if len(tb.t) == 0 {
delta = -1
break
}
// 获取最早触发timer,并检查是否到达触发时间
t := tb.t[0]
delta = t.when - now
// 还没到时间,直接退出内层for
if delta > 0 {
break
}
ok := true
// 如果是period有值,说明需要周期性触发,我们将该timer修改触发时间后,重新
// 插入最小堆中
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
if !siftdownTimer(tb.t, 0) {
ok = false
}
} else {
// 从最小堆中删除
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
if !siftdownTimer(tb.t, 0) {
ok = false
}
}
// 下标设置为-1,deltimer时发现下标为-1则不用删除了
t.i = -1 // mark as removed
}
// 把t中变量拷贝出来,就可以出锁了
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
// 堆调整时如果下标设置越界了,则丢到这里来处理,badTimer会直接panic
if !ok {
badTimer()
}
// 如果开了race检查的话
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&tb.lock)
}
// 如果桶内没有timer了,把协程挂起
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
tb.rescheduling = true
goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
// 如果还有协程,睡眠直到桶内最早触发时间点到达后唤醒
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
notetsleepg(&tb.waitnote, delta)
}
}

// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
if t.tb == nil {
// t.tb can be nil if the user created a timer
// directly, without invoking startTimer e.g
// time.Ticker{C: c}
// In this case, return early without any deletion.
// See Issue 21874.
return false
}

tb := t.tb

lock(&tb.lock)
// t may not be registered anymore and may have
// a bogus i (typically 0, if generated by Go).
// Verify it before proceeding.
i := t.i
last := len(tb.t) - 1
// 如果已经触发过或已经被删除了,则返回false告知调用方
if i < 0 || i > last || tb.t[i] != t {
unlock(&tb.lock)
return false
}
if i != last {
tb.t[i] = tb.t[last]
tb.t[i].i = i
}
tb.t[last] = nil
tb.t = tb.t[:last]
ok := true
if i != last {
if !siftupTimer(tb.t, i) {
ok = false
}
if !siftdownTimer(tb.t, i) {
ok = false
}
}
unlock(&tb.lock)
if !ok {
badTimer()
}
return true
}

本文完,作者yoko,尊重劳动人民成果,转载请注明原文出处: https://pengrl.com/p/62835/

0%