锁是编程中常用的技术, 通常应用于共享内存, 多个线程向同一资源操作往往会发生很多问题, 为了防止这些问题只能用到锁解决. 虽然锁可以解决, 但是在高并发的场景下, 可能会造成性能瓶颈. 无锁编程目前大多数都是基于atomic实现, atomic能够保证数据的正确性, sync.Mutex也有 Lock-Free 的影子.
无锁编程是什么?
«The Art of Multiprocessor Programming»书中的定义:
“如果一个方法是无锁的,它保证线程无限次调用这个方法都能够在有限步内完成。”
成为无锁的条件:
是多线程.
多个线程访问共享内存.
不会令其它线程造成阻塞.
go 中如果有一个方法里操作栈数据, 如果没有锁肯定会导致竞争发生, 加上锁又不会是无锁. 无锁编程是一个既复杂又具有挑战性的活, 究竟如何写一个无锁代码?
https://gocn.vip/topics/9842
实现 Lock-Free
type Config struct {
sync.RWMutex
endpoint string
}
func BenchmarkPMutexSet(b *testing.B) {
config := Config{}
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
config.Lock()
config.endpoint = “api.example.com”
config.Unlock()
}
})
}
func BenchmarkPMutexGet(b *testing.B) {
config := Config{endpoint: “api.example.com”}
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
config.RLock()
_ = config.endpoint
config.RUnlock()
}
})
}
func BenchmarkPAtomicSet(b *testing.B) {
var config atomic.Value
c := Config{endpoint: “api.example.com”}
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
config.Store(c)
}
})
}
func BenchmarkPAtomicGet(b *testing.B) {
var config atomic.Value
config.Store(Config{endpoint: “api.example.com”})
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = config.Load().(Config)
}
})
}
看看结果
BenchmarkPMutexSet-8 19403011 61.6 ns/op 0 B/op 0 allocs/op
BenchmarkPMutexGet-8 35671380 32.7 ns/op 0 B/op 0 allocs/op
BenchmarkPAtomicSet-8 32477751 37.0 ns/op 48 B/op 1 allocs/op
BenchmarkPAtomicGet-8 1000000000 0.247 ns/op 0 B/op 0 allocs/op
比较结果相当明确, 确实是快. 上面只是一个最简单的实现, 看看 Lock-Free Stack.
实现 Lock-Free Stack
先看一下锁实现的栈
var mu sync.Mutex
type LStack struct {
Next *LStack
Item int
}
func (head *LStack) Push(i int) {
mu.Lock()
defer mu.Unlock()
new := &LStack{Item: i}
new.Next = head.Next
head.Next = new }
func (head *LStack) Pop() int {
mu.Lock()
defer mu.Unlock()
old := head.Next
if old == nil {
return 0
}
new := head.Next
head.Next = new
return old.Item } LStack实现Push和Pop方法, 两个方法都加上锁, 防止竞争.
下面是 Lock-Free Stack
type LFStack struct {
Next unsafe.Pointer
Item int
}
var lfhead unsafe.Pointer // 记录栈头信息
func (head *LFStack) Push(i int) *LFStack { // 强制逃逸
new := &LFStack{Item: i}
newptr := unsafe.Pointer(new)
for {
old := atomic.LoadPointer(&lfhead)
new.Next = old
if atomic.CompareAndSwapPointer(&lfhead, old, newptr) {
break
}
}
return new }
func (head *LFStack) Pop() int {
for {
time.Sleep(time.Nanosecond) // 可以让CPU缓一缓
old := atomic.LoadPointer(&lfhead)
if old == nil {
return 0
}
if lfhead == old {
new := (*LFStack)(old).Next
if atomic.CompareAndSwapPointer(&lfhead, old, new) {
return 1
}
}
} } LFStack也实现了Push和Pop方法, 虽然没有加锁, 也可以保证返回数据的正确性. 对比锁实现的方法来看, 是逻辑要复杂得多. 由于循环使 CPU 压力增大, 可以用time.Sleep暂停一下.
runtime/lfstack.go
最近在研究 gc 时发现 go 源码有用到 Lock-Free Stack, 在runtime/lfstack.go
type lfstack uint64
func (head lfstack) push(node *lfnode) {
node.pushcnt++
new := lfstackPack(node, node.pushcnt)
if node1 := lfstackUnpack(new); node1 != node {
print(“runtime: lfstack.push invalid packing: node=”, node, “ cnt=”, hex(node.pushcnt), “ packed=”, hex(new), “ -> node=”, node1, “\n”)
throw(“lfstack.push”)
}
for {
old := atomic.Load64((uint64)(head))
node.next = old
if atomic.Cas64((*uint64)(head), old, new) {
break
}
}
}
func (head lfstack) pop() unsafe.Pointer {
for {
old := atomic.Load64((uint64)(head))
if old == 0 {
return nil
}
node := lfstackUnpack(old)
next := atomic.Load64(&node.next)
if atomic.Cas64((*uint64)(head), old, next) {
return unsafe.Pointer(node)
}
}
}
func (head lfstack) empty() bool {
return atomic.Load64((uint64)(head)) == 0
}
func lfnodeValidate(node *lfnode) {
if lfstackUnpack(lfstackPack(node, ^uintptr(0))) != node {
printlock()
println(“runtime: bad lfnode address”, hex(uintptr(unsafe.Pointer(node))))
throw(“bad lfnode address”)
}
}
lfstack主要是用于对 gc 时保存灰色对象, 有兴趣的可以看看.
小结
Lock-Free 的实现还有很多种, Lock-Free Stack 只是其中之一. 在日常的编程中, 基本上用sync.Mutex可以满足需求, 不要强制项目使用 Lock-Free, 可以选择在负载高的方法考虑使用, 由于实现复杂有可能性能也不及锁. 在 benchmark 测试LFStack和LStack发现, 前者的性能不及后者, 所以不是无锁都好用. 如果大家有兴趣可以研究一下无锁队列.