1.12及之前版本的sync.Pool有三个问题:
每次GC都回收所有对象,如果缓存对象数量太大,会导致STW1阶段的耗时增加。
每次GC都回收所有对象,导致缓存对象命中率下降,New方法的执行造成额外的内存分配消耗。
Pool.Get方法底层有锁,极端情况下,要尝试最多P次抢锁,也获取不到缓存对象,最后得执行New方法返回对象。
这些问题就对sync.Pool的室使用提出了要求,不满足时,性能并不会有大幅提升:
最好是高并发场景。(对应问题3)
最好两次GC之间的间隔足够长。(对应问题1,2)
首先sync.Pool 有两种使用方式,使用效果没有区别。
第一种,实例化的时候,实现New 函数即可:
package main
import(
“fmt”
“sync”
)
func main() {
p := &sync.Pool{
New: func() interface{} {
return 0
},
}
a := p.Get().(int)
p.Put(1)
b := p.Get().(int)
fmt.Println(a, b) }
第二种,get 取值的时候,判断是否为nil 即可。
package main
import(
“fmt”
“sync”
)
func main() {
p := &sync.Pool{}
a := p.Get()
if a == nil {
a = func() interface{} {
return 0
}
}
p.Put(1)
b := p.Get().(int)
fmt.Println(a, b)
}
这两种实现方式,最后的效果是一样的,也反应了pool 的特性,get 返回值是new 的对象,或者nil。
然后,pool 底层到底是怎样的数据结构?就是一个metux 和 slice?其实也是类似,只是加了些其他特性而已,下面数据结构:
type Pool struct {
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{} }
// Local per-P Pool appendix.
type poolLocal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
pad [128]byte // Prevents false sharing.
}
这里的local 是个poolLocal 的数组,localsize 是数组的大小。其中,从get 和put 方法看,为每个thread 维护了一个poolLocal 数据结构。不同线程取数据的时候,先判断下hash 到哪个线程去了,分别去对应的poolLocal 中去取数据,这是利用了分段锁的思想。
具体实现可以看get 方法:
func (p *Pool) Get() interface{} {
if raceenabled {
if p.New != nil {
return p.New()
}
return nil
}
l := p.pin() // 获取当前线程的poolLocal,也就是p.local[pid]。
x := l.private //判断临时变量是否有值,有值即返回
l.private = nil
runtime_procUnpin()
if x != nil {
return x
}
l.Lock() //临时对象没值到本地的缓存列表中去取
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x != nil {
return x
}
return p.getSlow() //当本线程的缓存对象已经没有,去其他线程缓存列表中取
}
这里代码的注释比较详尽了,本来维护一个mutex ,现在变成竞争多个mutex ,降低了锁的竞争。性能自然非常好。
最后是getSlow 方法,从其他线程的变量中去steal 偷。runtime 也喜欢搞这种事。。。
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ { //遍历其他线程的缓存队列
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
if x == nil && p.New != nil { //其他线程没有,那么new 一个
x = p.New()
}
return x }
最后,pool 还有个特性是当gc 的时候所有的缓存对象都要被清理,调用的是PoolCleanUp,没什么特别之处。但是这个特性要求了pool 绝对不能做有状态的缓存,类似socket的缓存池。
这里的分段锁,为每个线程bind 一个队列,还考虑到了均衡的情况,
sync.Pool设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。
Pool对外暴露的主要有三个接口:
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
New func() interface{}
Get 返回 Pool 中的任意一个对象。如果 Pool 为空,则调用 New 返回一个新创建的对象。
底层数据结构
sync.Pool 是一个临时对象池。一句话来概括,sync.Pool 管理了一组临时对象,当需要时从池中获取,使用完毕后从再放回池中,以供他人使用。
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local,固定大小per-P池, 实际类型为 [P]poolLocal
localSize uintptr // local array 的大小
// New 方法在 Get 失败的情况下,选择性的创建一个值, 否则返回nil
New func() interface{} }
type poolLocal struct {
poolLocalInternal
// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
// 每个缓存行具有 64 bytes,即 512 bit
// 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // 只能被局部调度器P使用
shared []interface{} // 所有P共享
Mutex // 访问共享数据域的锁
}
一个poolLocal与一个P绑定,也就是说一个P持有一个poolLocal。每个 poolLocal 的大小均为缓存行的偶数倍,包含一个 private 私有对象、shared 共享对象 slice 以及一个 Mutex 并发锁。
Put
Put的过程就是将临时对象放进 Pool 里面。
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// 获取 localPool
l := p.pin()
// 优先放入 private
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
// 如果不能放入 private 则放入 shared
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
}
Put的策略相对简单:
首先获取当前goroutine所运行的P持有的localPool
优先放入 private
如果 private 已经有值,即不能放入则放入 shared
前面还有两个细节:
怎么获取到当前P持有的localPool
runtime_procUnpin() 函数的作用
具体细节在后面分析。
Get
Get操作相对复杂一点,在从池中获取对象的时候,会先从 per-P 的 poolLocal slice 中选取一个 poolLocal。
源码如下:
func (p *Pool) Get() interface{} {
// 首先获取 poolLocal
l := p.pin()
// 先从private取
x := l.private
l.private = nil
runtime_procUnpin()
// private不存在再从shared里面去
if x == nil {
// 加锁,从 shared 获取
l.Lock()
// 从 shared 尾部取缓存对象
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
// 如果取不到,则获取新的缓存对象
x = p.getSlow()
}
}
// 如果 getSlow 还是获取不到,则 New 一个
if x == nil && p.New != nil {
x = p.New()
}
return x
}
优先从 private 中选择对象
若取不到,则对 shared slice 加锁,取最后一个
若取不到,则尝试从其他线程中 steal
若还是取不到,则使用 New 方法新建
这里同样涉及到两个细节:
怎么获取到当前P持有的localPool
getSlow() 的steal是怎么实现的
细节
pin()函数获取per-P的localPool
还是先看源码:
// pin函数会将当前 goroutine绑定的P, 禁止抢占(preemption) 并从 poolLocal 池中返回 P 对应的 poolLocal
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// 在 pinSlow 中会存储 localSize 后再存储 local,因此这里反过来读取
// 因为我们已经禁用了抢占,这时不会发生 GC
// 因此,我们必须观察 local 和 localSize 是否对应
// 观察到一个全新或很大的的 local 是正常行为
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// 因为可能存在动态的 P(运行时调整 P 的个数)procresize/GOMAXPROCS
// 如果 P.id 没有越界,则直接返回
if uintptr(pid) < s {
return indexLocal(l, pid)
}
// 没有结果时,涉及全局加锁
// 例如重新分配数组内存,添加到全局列表
return p.pinSlow()
}
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
//go:nosplit
func procPin() int {
g := getg()
mp := g.m
mp.locks++
return int(mp.p.ptr().id) }
根据注释:pin函数首先会调用运行时实现获得当前 P 的 id,然后设置P禁止抢占(避免GC)。然后检查 pid 与 p.localSize 的值 来确保从 p.local 中取值不会发生越界。如果不会发生,则调用 indexLocal() 完成取值。否则还需要继续调用 pinSlow()。
这里调用了 runtime_procPin() 来实现获取runtime的P,并设置禁止抢占,然后返回P的id。
在这个过程中我们可以看到在 runtime 调整 P 的大小的代价。如果此时 P 被调大,而没有对应的 poolLocal 时, 必须在取之前创建好,从而必须依赖全局加锁,这对于以性能著称的池化概念是比较致命的,因此这也是 pinSlow() 函数的由来。
pinSlow()
因为需要对全局进行加锁,pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁:
var (
allPoolsMu Mutex
allPools []*Pool
)
1
2
3
4
这里可以看到,Pool里面有全局变量持有了所有的Pool, 然后也有一个全局锁来保护数据域的可靠性。
pinSlow源码:
func (p *Pool) pinSlow() *poolLocal {
// 这时取消 P 的禁止抢占,因为使用 mutex 时候 P 必须可抢占
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 当锁住后,再次固定 P 取其 id
pid := runtime_procPin()
// 并再次检查是否符合条件,因为可能中途已被其他线程调用
// 当再次固定 P 时 poolCleanup 不会被调用
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
// 如果数组为空,新建
// 将其添加到 allPools,垃圾回收器从这里获取所有 Pool 实例
if p.local == nil {
allPools = append(allPools, p)
}
// 根据 P 数量创建 slice,如果 GOMAXPROCS 在 GC 间发生变化
// 我们重新分配此数组并丢弃旧的
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁。
当完成加锁后,再重新固定 P ,取其 pid。
因为中途可能已经被其他的线程调用,因此这时候需要再次对 pid 进行检查。 如果 pid 在 p.local 大小范围内,则不再此时创建,直接返回。
如果 p.local 为空,则将 p 扔给 allPools 并在垃圾回收阶段回收所有 Pool 实例。
最后再完成对 p.local 的创建(彻底丢弃旧数组)
getSlow() steal from other per-P localPool
现在我们获取到了 poolLocal。Get操作就回到了我们从localPool中取值的过程。在取对象的过程中,我们仍然会面对当前localPool中没有缓存的对象了,也就是既不能从 private 取、也不能从 shared 中取得尴尬境地。这时候就来到了 getSlow(),也就是steal
如果我们在本地的 P 中取不到值,就从别的P那里偷一个,总会比创建一个新的要快。 因此,我们再次固定 P,并取得当前的 P.id 来从其他 P 中偷值,那么我们需要先获取到其他 P 对应的 poolLocal。假设 size 为数组的大小,local 为 p.local,那么尝试遍历其他所有 P:
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
// 获取目标 poolLocal, 引入 pid 保证不是自身
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
这里证明一下确实不会发生取到自身的情况:不妨设:pid = (pid+i+1)%size则 pid+i+1 = asize+pid 。
即:asize = i+1 ,其中 a 为整数。由于 i<size ,于是 asize = i+1 < size+1,则:(a-1)size < 1 ==> size < 1 / (a-1),由于 size 为非负整数,这是不可能的。
Runtime 垃圾回收Hook
前面讲到了sync.Pool 的垃圾回收发生在运行时 GC 开始之前。
我们看看 sync.Pool 的 init 函数:
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func runtime_registerPoolCleanup(cleanup func())
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
……
}
func gcStart(trigger gcTrigger){
…….
clearpools()
…….
}
从链路的追踪可以看到,在开始GC的时候回调用Pool的回收。
下面看看Pool的清理函数poolCleanup()是怎么清理Pool的:
func poolCleanup() {
// 该函数会注册到运行时 GC 阶段(前),此时为 STW 状态,不需要加锁
// 它必须不处理分配且不调用任何运行时函数,防御性的将一切归零,有以下两点原因:
// 1. 防止整个 Pool 的 false retention
// 2. 如果 GC 发生在当有 goroutine 与 l.shared 进行 Put/Get 时,它会保留整个 Pool.
// 那么下个 GC 周期的内存消耗将会翻倍。
// 遍历所有 Pool 实例,接触相关引用,交由 GC 进行回收
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
实际上就是将所有的对象置为 nil,等着GC做自动回收。
整个设计充分利用了go.runtime的调度器优势:一个P下goroutine竞争的无锁化;
一个goroutine固定在一个局部调度器P上,从当前 P 对应的 poolLocal 取值, 若取不到,则从对应的 shared 数组上取,若还是取不到;则尝试从其他 P 的 shared 中偷。 若偷不到,则调用 New 创建一个新的对象。池中所有临时对象在一次 GC 后会被全部清空。
1、缓存对象的数量和期限
上面我们可以看到 pool 创建的时候是不能指定大小的,所有 sync.Pool 的缓存对象数量是没有限制的(只受限于内存),因此使用 sync.pool 是没办法做到控制缓存对象数量的个数的。另外 sync.pool 缓存对象的期限是很诡异的,先看一下 src/pkg/sync/pool.go 里面的一段实现代码:
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
可以看到 pool 包在 init 的时候注册了一个 poolCleanup 函数,它会清除所有的 pool 里面的所有缓存的对象,该函数注册进去之后会在每次 gc 之前都会调用,因此 sync.Pool 缓存的期限只是两次 gc 之间这段时间。例如我们把上面的例子改成下面这样之后,输出的结果将是 0 0。正因 gc 的时候会清掉缓存对象,也不用担心 pool 会无限增大的问题。
a := p.Get().(int)
p.Put(1)
runtime.GC()
b := p.Get().(int)
fmt.Println(a, b)
这是很多人错误理解的地方,正因为这样,我们是不可以使用sync.Pool去实现一个socket连接池的。
2、缓存对象的开销
如何在多个 goroutine 之间使用同一个 pool 做到高效呢?官方的做法就是尽量减少竞争,因为 sync.pool 为每个 P(对应 cpu,不了解的童鞋可以去看看 golang 的调度模型介绍)都分配了一个子池,如下图:
当执行一个 pool 的 get 或者 put 操作的时候都会先把当前的 goroutine 固定到某个P的子池上面,然后再对该子池进行操作。每个子池里面有一个私有对象和共享列表对象,私有对象是只有对应的 P 能够访问,因为一个 P 同一时间只能执行一个 goroutine,因此对私有对象存取操作是不需要加锁的。共享列表是和其他 P 分享的,因此操作共享列表是需要加锁的。
获取对象过程是:
1)固定到某个 P,尝试从私有对象获取,如果私有对象非空则返回该对象,并把私有对象置空;
2)如果私有对象是空的时候,就去当前子池的共享列表获取(需要加锁);
3)如果当前子池的共享列表也是空的,那么就尝试去其他P的子池的共享列表偷取一个(需要加锁);
4)如果其他子池都是空的,最后就用用户指定的 New 函数产生一个新的对象返回。
可以看到一次 get 操作最少 0 次加锁,最大 N(N 等于 MAXPROCS)次加锁。
归还对象的过程:
1)固定到某个 P,如果私有对象为空则放到私有对象;
2)否则加入到该 P 子池的共享列表中(需要加锁)。
可以看到一次 put 操作最少 0 次加锁,最多 1 次加锁。
由于 goroutine 具体会分配到那个 P 执行是 golang 的协程调度系统决定的,因此在 MAXPROCS>1 的情况下,多 goroutine 用同一个 sync.Pool 的话,各个 P 的子池之间缓存的对象是否平衡以及开销如何是没办法准确衡量的。但如果 goroutine 数目和缓存的对象数目远远大于 MAXPROCS 的话,概率上说应该是相对平衡的。
总的来说,sync.Pool 的定位不是做类似连接池的东西,它的用途仅仅是增加对象重用的几率,减少 gc 的负担,而开销方面也不是很便宜的。
原生sync.Pool的问题是,Pool中的对象会被GC清理掉,这使得sync.Pool只适合做简单地对象池,不适合作连接池。
为何不适合作连接池
对象的数量和期限
pool创建时不能指定大小,没有数量限制。pool中对象会被GC清掉,只存在于两次GC之间。实现是pool的init方法注册了一个poolCleanup()函数,这个方法在GC之前执行,清空pool中的所有缓存对象。
池对象Get/Put开销
为使多协程使用同一个POOL。最基本的想法就是每个协程,加锁去操作共享的POOL,这显然是低效的。而进一步改进,类似于ConcurrentHashMap(JDK7)的分Segment,提高其并发性可以一定程度性缓解。
注意到pool中的对象是无差异性的,加锁或者分段加锁都不是较好的做法。go的做法是为每一个绑定协程的P都分配一个子池。每个子池又分为私有池和共享列表。共享列表是分别存放在各个P之上的共享区域,而不是各个P共享的一块内存。协程拿自己P里的子池对象不需要加锁,拿共享列表中的就需要加锁了。
Get对象过程:
goroutine固定到某一个P后,先从当前子池私区拿。并置私有对象为空。
拿不到再从当前子池共享列表拿,需要加锁。
仍拿不到从其它子池共享列表拿,需要加锁。
仍拿不到,sync.pool.New闭包非空,则New一个对象。
所以最坏的情况下遍历其它P才拿到对象,最大值为MACPROCS。
Put过程:
固定P中私有对象为空,则放到私有对象。
否则放入当前子池的共享列表,加锁实现。
开销为最多一次加锁。
如何解决Get最坏情况遍历所有P才获取得对象呢:
能够设置加锁期间遍历其它P的最大次数,遍历不到就直接创建,减少加锁占用pool的时间。
使各子池共享列表中的对象数量尽量平均化,从而避免最坏的情况发生。
方法1止前sync.pool并没有这样的设置。方法2由于goroutine被分配到哪个P由调度器调度不可控,无法确保其平衡。
由于不可控的GC导致生命周期过短,且池大小不可控,因而不适合作连接池。仅适用于增加对象重用机率,减少GC负担。2
pool在一定的使用条件下提高并发性能,条件1是协程数远大于GOMAXPROCS,条件2是池中对象远大于GOMAXPROCS。归结成一个原因就是使对象在各个P中均匀分布。
关于何时回收Pool
池pool和缓存cache的区别。池的意思是,池内对象是可以互换的,不关心具体值,甚至不需要区分是新建的还是从池中拿出的。缓存指的是KV映射,缓存里的值互不相同,清除机制更为复杂。缓存清除算法如LRU、LIRS缓存算法。
池空间回收的几种方式。一些是GC前回收,一些是基于时钟或弱引用回收。最终确定在GC时回收Pool内对象,即不回避GC。用java的GC解释弱引用。GC的四种引用:强引用、弱引用、软引用、虚引用。虚引用即没有引用,弱引用GC但有空间则保留,软引用GC即清除。ThreadLocal的值为弱引用的例子。
Pool其它场景
regexp包为了保证并发时使用同一个正则,而维护了一组状态机。
fmt包做字串拼接,从sync.pool拿[]byte对象。避免频繁构建再GC效率高很多。
var ppFree = sync.Pool{
New: func() interface{} { return new(pp) },
}
pool关键作用:
减轻GC的压力。
复用对象内存。有时不一定希望复用内存,单纯是想减轻GC压力也可主动给pool塞对象。
Pool’s purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.
原理简述
sync.Pool就是围绕New字段、Get和Put方法来使用。用过都懂,比较简单就不介绍了。
Go是提供goroutine进行并发编程,在并发环境下,sync.Pool的使用不会造成严重性能问题是它的设计考虑点。
容易想到的方法是Pool对象为每个P都分配一个空间,这样在P上运行的G进行Get和Put操作时,就可以在P本地的空间上读写。这样方法比Pool对象维护一个全局空间有明显好处,全局空间的读写肯定要加锁。
即使每个P都有了自己的本地空间,也不是说就可以完全避免锁使用。不要忘了Pool提供了内存复用功效,每个P上的G都使用的是P本地的空间的话,那内存复用就有局限性,只能局限在一个P上。
而pool提供的内存复用是覆盖所有P。意思是,一个G在执行Get方法时,发生G所在的P上,没有可复用的对象。这时就到别的P那儿去偷。偷这个动作就要加锁了。因为偷取别人可复用对象时候,别人也可能同时在读写。
前面开始说每个P有自己的空间,作用是避免锁,后面又说到别的P上偷对象,又要加锁。是不是矛盾了。
不矛盾,让我们来看看sync.Pool的实现原理。
sync.Pool对象底层两个关键字段,local和localSize,前者是指向一个数组,数组大小存在localSize。localSize的大小跟P个数保持一致。数组每个元素就是代表每个P自己的本地空间,类型是poolLocal。
poolLocal类型有两个关键字段,private和shared:
shared是一个数组,读写要加锁。
private只能存一个对象,读写不加锁。
来理一下在Pool对象上读写的逻辑:
Get操作时,先返回本地P上的private上的对象。
如果private为空,继续从本地P上的shared找,这里要加锁。
如果shared也没有,就到别的P那儿,从shared里偷。
所有其它P都遍历过了,没有任何对象可偷。就返回nil或调用New函数。
Put操作时,优先放private。
private已经被放了,那就放到shared的最后。
ync.Pool的特性
无大小限制。
自动清理,每次GC前会清掉Pool里的所有对象。所以不适用于做连接池。
每个P都会有一个本地的poolLocal,Get和Put优先在当前P的本地poolLocal操作。其次再进行跨P操作。
所以Pool的最大个数是runtime.GOMAXPROCS(0)。
sync.Pool的缺点
pool的Get()并非成本低廉,最坏情况可能会上锁runtime.GOMAXPROCS(0)次。
所以,多Goroutine与多P的情况下,使用Pool的效果才会突显。否则要经历无谓的锁成本。
简单的常用场景
bytes.Buffer作为临时对象放在池子里,这样减轻每次都需要创建的消耗。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type Dao struct {
bp sync.Pool
}
func New(c *conf.Config) (d *Dao) {
d = &Dao{
bp: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
return
}
func (d *Dao) Infoc(args …string) (value string, err error) {
if len(args) == 0 {
return
}
// fetch a buf from bufpool
buf, ok := d.bp.Get().(*bytes.Buffer)
if !ok {
return "", ErrType
}
// append first arg
if _, err := buf.WriteString(args[0]); err != nil {
return "", err
}
for _, arg := range args[1:] {
// append ,arg
if _, err := buf.WriteString(defaultSpliter); err != nil {
return "", err
}
if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil {
return "", err
}
}
value = buf.String()
buf.Reset()
d.bp.Put(buf)
return
}
带注释的源码
sync.Pool数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// pool 的数据结构
type Pool struct {
noCopy noCopy
// 指向一个数组,个数与P相等,每个元素的类型为poolLocalInternal
local unsafe.Pointer
// local数组的大小
localSize uintptr
// 创建pool对象时,用户必须提供的new函数
New func() interface{}
}
type poolLocalInternal struct {
// 私有对象,每个P都有,用于不同g执行get和put可以无锁操作
private interface{}
// 共享对象数组,每个P都有一个,同一个P上不同g可以多次执行put方法,需要有地方能存储。并且别的p上的g可能过来偷,所以要加锁
shared []interface{}
// 对shared进行加锁,private不用加锁
Mutex
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } Get方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// …
// 拿到当前P对应的pool
l := p.pin()
if l.private == nil {
// 私有区有位置的话直接放私有区
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
// 否则放在共享区里
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
// …
}
func (p *Pool) pin() *poolLocal {
// 拿到当前P的ID
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
// 定义pool对象时,s取值为0。只有经过pinSlow后,p.localSize的值才被设置
// 如果local数组已经初始化,就可以把对应P的本地pool返回
return indexLocal(l, pid)
}
// 否则就得重建local
return p.pinSlow()
}
func (p *Pool) pinSlow() *poolLocal {
runtime_procUnpin()
// 锁上所有的pool对象
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
// pinSlow是一个创建local的方法。在获得allPoolsMu锁前,可能被别的P先获取,这种情况下local就已经被初始化了
// 所以在获得allPoolsMu锁后需要再检查一次uintptr(pid) < s
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// local的大小默认就是P的个数
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // 设置local
atomic.StoreUintptr(&p.localSize, uintptr(size)) // 设置localSize
return &local[pid]
}
Put方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// …
// 拿到当前P对应的pool
l := p.pin()
if l.private == nil {
// 私有区有位置的话直接放私有区
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
// 否则放在共享区里
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
// …
}
runtime_procPin和runtime_procUnpin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
//go:nosplit
func procPin() int {
g := getg()
mp := g.m
mp.locks++
return int(mp.p.ptr().id) }
//go:linkname sync_atomic_runtime_procUnpin sync/atomic.runtime_procUnpin
//go:nosplit
func sync_atomic_runtime_procUnpin() {
procUnpin()
}
//go:nosplit
func procUnpin() {
g := getg()
g.m.locks–
}
sync.Pool对象内部为每个P都分配了一个private区和shared区。
private区只能存放一个可复用对象,因为每个P在任意时刻只运行一个G,所以在private区上写入和取出对象是不用加锁的。
shared区可以放多个可复用对象,它本身是slice。进shared区就append,出shared区就slice[:last-1]。但shared区上写入和取出对象要加锁,因为别的G可能过来偷对象。
type poolLocalInternal struct {
// 私有对象,每个P都有,用于不同G执行get和put可以无锁操作
private interface{}
// 共享对象数组,每个P都有一个,同一个P上不同G可以多次执行put方法,需要有地方能存储。并且别的P上的G可能过来偷,所以要加锁
shared []interface{}
// 对shared进行加锁,private不用加锁
Mutex
}
问题3 就是由于shared区是一个带锁的后进先出队列造成的。每次Pool.Get方法在调用时,执行顺序是:
先看当前P的private区是否为空。
加锁,看当前P的shared区是否为空。
加锁,循环遍历看其他P的shared区是否为空。
只要上面三步任意一步就不为空,就可以把缓存对象返回了。但若都为空,最后就得调用New方法返回对象。
// 遍历一次其他P的共享区,偷一个,每次尝试偷都得上锁
for i := 0; i < int(size); i++ {
// 定位到某个P上的shared区
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
// 如果有缓存对象,就返回,并解锁
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
// 没有缓存对象,解锁,继续遍历下一个P
l.Unlock()
}
这一顿的加锁操作和Mutex锁自带的阻塞唤醒开销,Get方法在极端情况下就会有性能问题。
问题1和2 都是由于每次GC时,遍历清空所有缓存对象造成的。
sync.Pool在init()中向runtime注册了一个cleanup方法,它在STW1阶段被调用的。如果它执行过久,就会硬生生延长STW1阶段耗时。
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
这个cleanup方法干的事情是遍历所有的sync.Pool对象,再遍历每个sync.Pool对象中的每个P的shared区,把shared区每个缓存对象设置为nil。代码中就是三层for循环,简单粗暴时间复杂度高。
func poolCleanup() {
// …
for i, p := range allPools {
// 有多少个Sync.Pool对象,遍历多少次
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
// 有多少个P,遍历多少次
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
// 清空shared区中每个缓存对象
l.shared[j] = nil
}
l.shared = nil
}
// …
}
// …
}
好消息是1.13beta1已经解决了这三个问题。注意是beta版本,而不是stable版本。
接下来主要看1.13通过什么思路解决这些问题的。
不排除未来的stable版本或1.13的小版本会对这块实现做小改动。
取消每次GC默认对全部对象进行回收
解决问题1和2的思路就是不能每次全部回收。但该回收多少呢?
@aclements 提出了一种思路。这轮在sync.Pool中的对象,最快也在下轮GC才被回收。
https://github.com/golang/go/issues/22950#issuecomment-352935997
还记得上面说过每个P都有private区和shared区吗?现在每个P里两个区合在一起构成数组,给个名字叫local(其实也是源码中的实现)。1.13版本的实现中再引入一个victim,它结构与local一致。
// 1.13版本源码
type Pool struct {
local unsafe.Pointer // 实际指向[P]poolLocal
localSize uintptr // P的个数
victim unsafe.Pointer // 指向上轮的local
victimSize uintptr // 指向上轮的localSize
New func() interface{}
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private interface{}
shared poolChain
}
有了victim,Get和Put方法的步骤就有所变化:
Get时,先从local里尝试取出缓存对象(包括所有的P)。如果失败,就尝试从victim里取。
victim里也取对象失败,就调用New方法。
Put时,只放local里。
新的数据结构下,cleanup方法策略也有所变化,改为每次只把victim里的对象回收掉。然后victim再指向当前的local。
var (
// 所有sync.Pool对象
allPools []Pool
// 待回收的所有sync.Pool对象
oldPools []Pool
)
func poolCleanup() {
for _, p := range oldPools {
// 每次只回收victim
p.victim = nil
p.victimSize = 0
}
for _, p := range allPools {
// victim指向当前的local
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
显然这样好处就是这轮的缓存对象在GC时不会立马回收,而是存放起来,滞后一轮。这样下一轮能得到复用机会,提高了缓存对象的命中率。并且回收对象时,由对shared区O(n)的遍历操作,变成O(1)。
从benchmark感受这个优化带来的性能提升:
BenchmarkPoolSTW-8 p96-ns/STW 285485 p50-ns/STW 190467
BenchmarkPoolSTW-8 p96-ns/STW 7720 p50-ns/STW 4979
1.9.7版本的STW1阶段耗时TP96线是285485ns,而1.13beta1是7720ns。
Benchmark代码参考1.13beat1源码src/sync/pool_test.go.BenchmarkPoolSTW方法
使用无锁队列替换shared区
问题3 是因为在shared的访问加了一把Mutex锁造成的。如果不消除这把锁,引入victim区也是徒劳。因为此时victim的访问也得加锁。
旧实现中shared区是单纯的带锁后进先出队列,1.13beta版本改成了单生产者,多消费者的双端无锁环形队列。
单生产者是指,每个P上运行的G,执行Put方法时,就往队列里存放缓存对象(别的P上运行的G不能往里放),并且只能放在队列头部。由于每个P任意时刻只有一个G被运行,所以存放缓存对象不需要加锁。
多消费者分两种角色,一是在P上运行的G,执行Get方法时,从队列头部取出缓存对象。同上,取对象不用加锁;二是在其他P上运行的G,执行Get方法时,本地没有缓存对象,就到别的P上偷。此时盗窃者G只能从队列尾部取出对象,因为盗窃者可能有多个,所以尾部取数据用CAS来实现无锁。
注意,每个P都持有自己无锁队列,并且队列也可能有多个
每个P持有的循环队列初始化多大呢?增长和收缩策略呢?
shared区改用双向链表,每个链表节点指向一个无锁环形队列。
链表节点必须在头部插入。
当前P上的G取缓存对象时,只从头部链表节点指向的无锁队列里取。取不到,沿着prev指针到下一个无锁队列上重复操作,也没有的话。就到别的P上偷。
盗窃者G在偷缓存对象时,只从尾部链表节点指向的无锁队列里取。取不到,沿着next指针到下一个无锁队列上重复操作,也没有的话。就到别的P上继续偷,直到都偷不着,就调用New方法。
链表首次插入节点时,指向无锁队列初始化大小为8,增长策略为在头部插入新节点,指向的无锁队列大小为旧头部节点指向无锁队列大小的两倍,始终保持2的n次方大小。
假如在链表长度为3的情况下。尾部节点指向的无锁队列里缓存对象被偷光了。那么尾部节点会沿着next指针前移,把旧的无锁队列内存释放掉。此时链表长度变为2,这就是链表的收缩策略。最小时剩下一个节点,不会收缩成空链表。
无锁队列的自身最大的大小是2**30。达到上限时,再执行Put操作就放不进去,也不报错。
1.sync.Pool是什么?
Golang在 1.3 版本的时候,在sync包中加入一个新特性:Pool。 简单的说:就是一个临时对象池。
2.为什么需要sync.Pool?
保存和复用临时对象,减少内存分配,降低GC压力。
(对象越多GC越慢,因为Golang进行三色标记回收的时候,要标记的也越多,自然就慢了)
3.如何使用sync.Pool?
func main() {
// 初始化一个pool
pool := &sync.Pool{
// 默认的返回值设置,不写这个参数,默认是nil
New: func() interface{} {
return 0
},
}
// 看一下初始的值,这里是返回0,如果不设置New函数,默认返回nil
init := pool.Get()
fmt.Println(init)
// 设置一个参数1
pool.Put(1)
// 获取查看结果
num := pool.Get()
fmt.Println(num)
// 再次获取,会发现,已经是空的了,只能返回默认的值。
num = pool.Get()
fmt.Println(num) }
复制代码
使用较为简单。 总的思路就是:搞一个池子,预先放入临时产生的对象,然后取出使用。
可能有同学问了,这个玩意儿官方出的,那他自己有在用吗? 答案是有的,其实你也一直在用。
就是fmt包啦,由于fmt总是需要很多[]byte对象,索性就直接建了一个[]byte对象的池子,来走一波代码。
type buffer []byte
// printer状态的结构体()
type pp struct {
…
}
// pp的对象池, 《====这里用到了。
var ppFree = sync.Pool{
New: func() interface{} { return new(pp) },
}
// 每次需要pp结构体的时候,都过sync.Pool进行获取。
func newPrinter() pp {
p := ppFree.Get().(pp)
p.panicking = false
p.erroring = false
p.fmt.init(&p.buf)
return p
}
复制代码
4.走一波源码
4.1 基础数据结构
type Pool struct {
// noCopy,防止当前类型被copy,是一个有意思的字段,后文详说。
noCopy noCopy
// [P]poolLocal 数组指针
local unsafe.Pointer
// 数组大小
localSize uintptr
// 选填的自定义函数,缓冲池无数据的时候会调用,不设置默认返回nil
New func() interface{} //新建对象函数 }
type poolLocalInternal struct {
// 私有缓存区
private interface{}
// 公共缓存区
shared []interface{}
// 锁
Mutex
}
type poolLocal struct {
// 每个P对应的pool
poolLocalInternal
// 这个字段很有意思,是为了防止“false sharing/伪共享”,后文详讲。
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } 复制代码 来一张全景图,更有利于全局角度看这个结构体:
在这里插入图片描述
这边有两个小问题:
noCopy的作用?
poolLocal中pad的作用?
如何确定要获取的数据在哪个poolLocal里头?
带着问题,继续往下看,看完就能懂这两个小问题拉。
4.2 pin
在介绍get/put前,关键的基础函数pin需要先了解一下。 一句话说明用处:确定当前P绑定的localPool对象 (这里的P,是MPG中的P,如果看不懂请点这里:关于goroutine的一些小理解)
func (p *Pool) pin() *poolLocal {
// 返回当前 P.id && 设置禁止抢占(避免GC)
pid := runtime_procPin()
// 根据locaSize来获取当前指针偏移的位置
s := atomic.LoadUintptr(&p.localSize)
l := p.local
// 有可能在运行中动调调整P,所以这里进行需要判断是否越界
if uintptr(pid) < s {
// 没越界,直接返回
return indexLocal(l, pid)
}
// 越界时,会涉及全局加锁,重新分配poolLocal,添加到全局列表
return p.pinSlow() }
var (
allPoolsMu Mutex
allPools []*Pool
)
func (p *Pool) pinSlow() *poolLocal {
// 取消P的禁止抢占(因为后面要进行metux加锁)
runtime_procUnpin()
// 加锁
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 返回当前 P.id && 设置禁止抢占(避免GC)
pid := runtime_procPin()
// 再次检查是否符合条件,有可能中途已被其他线程调用
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
// 如果数组为空,则新建Pool,将其添加到 allPools,GC以此获取所有 Pool 实例
if p.local == nil {
allPools = append(allPools, p)
}
// 根据 P 数量创建 slice
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 将底层数组起始指针保存到 Pool.local,并设置 P.localSize
// 这里需要关注的是:如果GOMAXPROCS在GC间发生变化,则会重新分配的时候,直接丢弃老的,等待GC回收。
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
// 返回本次所需的 poolLocal
return &local[pid] }
// 根据数据结构的大小来计算指针的偏移量
func indexLocal(l unsafe.Pointer, i int) poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
流程小记:
禁止抢占GC -> 寻找偏移量 -> 检查越界 ->返回poolLocal
->加锁重建pool,并添加到allPool
4.3 put
先说结论:优先放入private空间,后面再放入shared空间 现在开始分析:
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// 这段代码,不需要关心,降低竞争的
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 获取当前的poolLocal
l := p.pin()
// 如果private为nil,则优先进行设置,并标记x
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
// 如果标记x不为nil,则将x设置到shared中
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
// 设置竞争可用了。
if race.Enabled {
race.Enable()
} } 复制代码 4.4 get 先说结论:优先从private空间拿,再加锁从shared空间拿,还没有再从其他的PoolLocal的shared空间拿,还没有就直接new一个返回。 现在进行分析:
func (p *Pool) Get() interface{} {
// 竞争相关的设置
if race.Enabled {
race.Disable()
}
// 获取当前的poolLocal
l := p.pin()
// 从private中获取
x := l.private
l.private = nil
runtime_procUnpin()
// 不存在,则继续从shared空间拿,
if x == nil {
// 加锁了,防止并发
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
// 从尾巴开始拿起
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
// 从其他的poolLocal中的shared空间看看有没有可返回的。
x = p.getSlow()
}
}
// 竞争解除
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 如果还是没有的话,就直接new一个了
if x == nil && p.New != nil {
x = p.New()
}
return x }
func (p *Pool) getSlow() (x interface{}) {
// 获取poolLocal数组的大小
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// 尝试从其他procs获取一个P对象
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
// 获取一个poolLocal,注意这里是从当前的local的位置开始获取的,目的是防止取到自身
l := indexLocal(local, (pid+i+1)%int(size))
// 加锁从尾部获取shared的数据
l.Lock()
last := len(l.shared) - 1
// 若长度大于1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x }
复制代码
5.源码关键点解析
5.1 定时清理
Q:这里的pool的是永久保存的吗?还是? A:是会进行清理的,时间就是两次GC间隔的时间。
// 注册清理函数,随着runtime进行的,也就是每次GC都会跑一下
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// 清理函数也很粗暴,直接遍历全局维护的allPools将private和shared置为nil
func poolCleanup() {
// 遍历allPools
for i, p := range allPools {
// pool置为nil
allPools[i] = nil
// 遍历localSIze的数量次
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
// private置为nil
l.private = nil
// 遍历shared,都置为nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
// allPools重置
allPools = []*Pool{} }
所以呢,这也说明为什么sync.Pool不适合放做“数据库连接池”等带持久性质的数据,因为它会定期回收啊~
5.2 为什么获取shared要加锁,而private不用?
我们知道golang是MPG的方式运行的,(关于goroutine的一些小理解)
大概这么个感觉吧:
M——P—– poolLocal
|
G - G
|
G
…
M——P—– poolLocal
|
G—G
|
G
…
复制代码
也就是说,每个P都分配一个localPool,在同一个P下面只会有一个Gouroutine在跑,所以这里的private,在同一时间就只可能被一个Gouroutine获取到。
而shared就不一样了,有可能被其他的P给获取走,在同一时间就只可能被多个Gouroutine获取到,为了保证数据竞争,必须加一个锁来保证只会被一个G拿走。
5.3 noCopy的作用?
防止Pool被拷贝,因为Pool 在Golang是全剧唯一的
这里又衍生一个问题,这里的noCopy如何实现被防止拷贝的???
Golang中没有原生的禁止拷贝的方式,所以结构体不希望被拷贝,所以go作者做了这么一个约定:只要包含实现 sync.Locker 这个接口的结构体noCopy,go vet 就可以帮我们进行检查是否被拷贝了。
5.4 pad的作用?
这个挺有意思的,源代码出现这么一个词:false sharing,翻译为“伪共享”。 也就是说这个字段,主要就是用来防止“伪共享”的。
为什么会有false sharing?
简单说明一下:缓存系统中是以缓存行为单位存储的。缓存行通常是 64 字节,当缓存行加载其中1个字节时候,其他的63个也会被加载出来,加锁的话也会加锁整个缓存行,当下图所示x、y变量都在一个缓存行的时候,当进行X加锁的时候,正好另一个独立线程要操作Y,这会儿Y就要等X了,此时就不无法并发了。
由于这里的竞争冲突来源自共享,所以称之为伪共享。
如何防止?
补齐缓存行,让每个数据都是独立的缓存行就不会出现false sharding了。
5.5 怎么确定我的数据应该存储在LocalPool数组的哪个单元?
根据数据结构的大小来计算指针的偏移量,进而算出是LocalPool数组的哪个。
5.6 sync.Pool的设计哲学?
Goroutine能同一时刻在并行的数量有限,是由runtime.GOMAXPROCS(0)设置的,这里的Pool将数据与P进行绑定了,分散在了各个真正并行的线程中,每个线程优先从自己的poolLocal中获取数据,很大程度上降低了锁竞争。