go atomic.value并发安全的另一种玩法
atomic · Go语言学习笔记
使用atomic.Value的Golang无锁值
不使用锁来保证Golang的并发安全 - nsq 源码
从go1.4开始,标准库提供了一种通过atomic.Value类型,实现任意类型的线程安全的替代方法
所以我看完这句话也就继续在项目中使用锁和channel来处理并发安全,后来一个机会我看了下nsq的源码,发现里面大量的使用atomic.Value 于是在1.6之后我也开始在项目中大量的使用atomic.Value
package main
import (
"fmt"
"sync/atomic"
)
type Value struct {
Key string
Val interface{}
}
type Noaway struct {
Movies atomic.Value
Total atomic.Value
}
func NewNoaway() *Noaway {
n := new(Noaway)
n.Movies.Store(&Value{Key: "movie", Val: "Wolf Warrior 2"})
n.Total.Store("$2,539,306")
return n
}
func main() {
n := NewNoaway()
val := n.Movies.Load().(*Value)
total := n.Total.Load().(string)
fmt.Printf("Movies %v domestic total as of Aug. 27, 2017: %v \n", val.Val, total)
}
package value_test
import (
"sync"
"sync/atomic"
"testing"
)
type Config struct {
sync.RWMutex
endpoint string
}
func BenchmarkPMutexSet(b *testing.B) {
config := Config{}
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
config.Lock()
config.endpoint = "api.example.com"
config.Unlock()
}
})
}
func BenchmarkPMutexGet(b *testing.B) {
config := Config{endpoint: "api.example.com"}
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
config.RLock()
_ = config.endpoint
config.RUnlock()
}
})
}
func BenchmarkPAtomicSet(b *testing.B) {
var config atomic.Value
c := Config{endpoint: "api.example.com"}
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
config.Store(c)
}
})
}
func BenchmarkPAtomicGet(b *testing.B) {
var config atomic.Value
config.Store(Config{endpoint: "api.example.com"})
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = config.Load().(Config)
}
})
}
# go version go1.5 darwin/amd64
BenchmarkPMutexSet-4 20000000 90.2 ns/op 0 B/op 0 allocs/op
BenchmarkPMutexGet-4 30000000 57.9 ns/op 0 B/op 0 allocs/op
BenchmarkPAtomicSet-4 20000000 72.2 ns/op 48 B/op 1 allocs/op
BenchmarkPAtomicGet-4 100000000 12.8 ns/op 0 B/op 0 allocs/op
典型使用场景就是原子读写+copyonwrite。
atomic.Value 对于希望通过原子操作来实现,不加锁的高并发非常有用。
我们设计的每一个 Server 都需要访问 viewService 来获得最新的 Primary/Backup 数据库服务器视图.
这时候其实对这个 View 的操作就是周期写, 不定时读. 这时候就是一个使用 Value 的合适场景 (也可以使用原子函数 CompareAndSwapPointer).
golang value并发安全的另一种玩法,就是使用atomic.Value,看一段代码。
package main
import (
"sync"
"sync/atomic"
"time"
)
func main() {
var m atomic.Value
type Map map[string]string
m.Store(make(Map))
var mu sync.Mutex
read := func(key string) (val string) {
m1 := m.Load().(Map)
return m1[key]
}
insert := func(key, val string) {
mu.Lock()
defer mu.Unlock()
m1 := m.Load().(Map)
m2 := make(Map)
for k, v := range m1 {
m2[k] = v
}
m2[key] = val
m.Store(m2)
}
go func() {
for {
insert("k", "v")
time.Sleep(100 * time.Millisecond)
}
}()
go func() {
for {
read("k")
}
}()
time.Sleep(10 * time.Second)
}
相对于读写锁,少了一些锁的争抢,不过相对的,带来了一些,内存上的开销,适用于读多写少并且变量占用内存不是特别大的情况,如果用内存存储大量数据,这个并不适合,技术上主要是常见的写时复制(copy-on-write)。
另外这个还比较适合程序配置的存储,贴一段官方的栗子
var config Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
// Reload config every 10 seconds
// and update config value with the new version.
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
并发的几种控制方式
- atomic 原子操作
- sync.Mutex 锁
- chan 通道
分布式课程, 线程和锁一定是最基本的元素
由于 GO 本身提倡的Share memory by communicating; don't communicate by sharing memory.
, 所以在实现的时候试图不用 sharing memory + lock
而多用 channel
来实现,
这时候就带来了一些小小的不方便, 比如一个字符串 s 可以被多个 goroutine 读而被一个 goroutine 写, 显然我们要将其加锁.
因为不想加锁于是我试图用 atomic 来解决这个问题, 其中有一个 Value 类型我发现很有趣, 于是决定写下来.
源代码分析
atomic.Value 分为两个操作,
通过 Store()
存储 Value,
通过 Load()
来读取 Value 的值.
atomic.Value
value.go 中提供一个Value结构体,可以帮助用来存储和装载任意值。
Store() 操作有两种行为模式:
First Store
: 当我们第一次调用 Store 的时候, Store 函数会初始化 typ 指针 (需要注意的是, 每一个 Value 在第一次 Store 之后 typ 就被确定而不能更改了, 否则会 panic).
如果 typ==nil 则函数会调用 runtime_procPin(没找到实现, 但注释中说是 active spin wait)
随后调用原子操作函数 CompareAndSwapPointer(typ, nil, unsafe.Pointer(^uintptr(0))), 如果此时结果返回 false 说明 typ 已经不等于 nil(被其他 goroutine 修改过), 于是调用 runtime_procUnpin 解锁并重新进行 Store 过程.
如果原子操作函数返回了 true, 即 typ == nil, 那么存储 typ 以及 data 的指针.
后面的每次 Store 调用都是直接替换掉 data 指针
Load() 函数检测 typ 的值, 如果为 nil 或者正在进行首次调用 Store 则会返回 nil. 否则返回一个 interface{}(实际存储的是 ifaceWords 值)
// Value结构体提供原子装载和存储始任意类型的值
type Value struct {
v interface{}
}
// ifaceWords 结构体是实际存储我们的 Value 值的地方, 可以看到, 我们存储的实际是指向 Value 的 type 和 data 的指针.
// ifaceWords 是 interface{} 的内部表示
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v)) // 通过unsafe.Pointer做一次指针转换
typ := LoadPointer(&vp.typ) // 一定要用LoadPointer,以保证原子性,比如读到的不是cpu cache
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
func (v *Value) Store(x interface{}) {
if x == nil {
// 把panic当NullPointException来用?是不是有点重啊?
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v)) // 通过unsafe.Pointer做指针转换
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// 尝试开始第一次存储
// 关闭抢占(preemption)以便其他goroutine能使用active spin wait来等待完成。
// 另外这样一来GC也不会意外的看到假类型
runtime_procPin()
// 通过acs操作来检查并设置tpy为特殊的标志位
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
// 如果失败,表示其他goroutine抢先
runtime_procUnpin()
continue
}
// 如果成功表示获得设置存储的权利,执行第一次存储
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// 第一次存储进行中,等。
// 因为我们在第一次存储前后禁用了抢占
// 我们可以使用active spin来等待
continue
}
// First store completed. Check type and overwrite data.
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}
TBD: runtime_procPin()和runtime_procUnpin()方法的使用不太理解,也没有找到资料。