go atomic.value并发安全的另一种玩法


原文链接: go atomic.value并发安全的另一种玩法

atomic · Go语言学习笔记
使用atomic.Value的Golang无锁值
不使用锁来保证Golang的并发安全 - nsq 源码
从go1.4开始,标准库提供了一种通过atomic.Value类型,实现任意类型的线程安全的替代方法
所以我看完这句话也就继续在项目中使用锁和channel来处理并发安全,后来一个机会我看了下nsq的源码,发现里面大量的使用atomic.Value 于是在1.6之后我也开始在项目中大量的使用atomic.Value

package main

import (
	"fmt"
	"sync/atomic"
)

type Value struct {
	Key string
	Val interface{}
}

type Noaway struct {
	Movies atomic.Value
	Total  atomic.Value
}

func NewNoaway() *Noaway {
	n := new(Noaway)
	n.Movies.Store(&Value{Key: "movie", Val: "Wolf Warrior 2"})
	n.Total.Store("$2,539,306")
	return n
}

func main() {
	n := NewNoaway()
	val := n.Movies.Load().(*Value)
	total := n.Total.Load().(string)
	fmt.Printf("Movies %v domestic total as of Aug. 27, 2017: %v \n", val.Val, total)
}
package value_test

import (
	"sync"
	"sync/atomic"
	"testing"
)

type Config struct {
	sync.RWMutex
	endpoint string
}

func BenchmarkPMutexSet(b *testing.B) {
	config := Config{}
	b.ReportAllocs()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			config.Lock()
			config.endpoint = "api.example.com"
			config.Unlock()
		}
	})
}

func BenchmarkPMutexGet(b *testing.B) {
	config := Config{endpoint: "api.example.com"}
	b.ReportAllocs()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			config.RLock()
			_ = config.endpoint
			config.RUnlock()
		}
	})
}

func BenchmarkPAtomicSet(b *testing.B) {
	var config atomic.Value
	c := Config{endpoint: "api.example.com"}
	b.ReportAllocs()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			config.Store(c)
		}
	})
}

func BenchmarkPAtomicGet(b *testing.B) {
	var config atomic.Value
	config.Store(Config{endpoint: "api.example.com"})
	b.ReportAllocs()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			_ = config.Load().(Config)
		}
	})
}
# go version go1.5 darwin/amd64
BenchmarkPMutexSet-4    20000000             90.2 ns/op        0 B/op      0 allocs/op
BenchmarkPMutexGet-4    30000000             57.9 ns/op        0 B/op      0 allocs/op
BenchmarkPAtomicSet-4   20000000             72.2 ns/op       48 B/op      1 allocs/op
BenchmarkPAtomicGet-4   100000000            12.8 ns/op        0 B/op      0 allocs/op

典型使用场景就是原子读写+copyonwrite。
atomic.Value 对于希望通过原子操作来实现,不加锁的高并发非常有用。

我们设计的每一个 Server 都需要访问 viewService 来获得最新的 Primary/Backup 数据库服务器视图.
这时候其实对这个 View 的操作就是周期写, 不定时读. 这时候就是一个使用 Value 的合适场景 (也可以使用原子函数 CompareAndSwapPointer).

golang value并发安全的另一种玩法,就是使用atomic.Value,看一段代码。

package main

import (
	"sync"
	"sync/atomic"
	"time"
)

func main() {
	var m atomic.Value

	type Map map[string]string
	m.Store(make(Map))
	var mu sync.Mutex

	read := func(key string) (val string) {
		m1 := m.Load().(Map)
		return m1[key]
	}

	insert := func(key, val string) {
		mu.Lock()
		defer mu.Unlock()
		m1 := m.Load().(Map)
		m2 := make(Map)
		for k, v := range m1 {
			m2[k] = v
		}
		m2[key] = val
		m.Store(m2)
	}
	go func() {
		for {
			insert("k", "v")
			time.Sleep(100 * time.Millisecond)
		}
	}()
	go func() {
		for {
			read("k")
		}
	}()
	time.Sleep(10 * time.Second)
}

相对于读写锁,少了一些锁的争抢,不过相对的,带来了一些,内存上的开销,适用于读多写少并且变量占用内存不是特别大的情况,如果用内存存储大量数据,这个并不适合,技术上主要是常见的写时复制(copy-on-write)。

另外这个还比较适合程序配置的存储,贴一段官方的栗子

var config Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
        // Reload config every 10 seconds
        // and update config value with the new version.
        for {
                time.Sleep(10 * time.Second)
                config.Store(loadConfig())
        }
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
        go func() {
                for r := range requests() {
                        c := config.Load()
                        // Handle request r using config c.
                        _, _ = r, c
                }
        }()
}

并发的几种控制方式

  1. atomic 原子操作
  2. sync.Mutex 锁
  3. chan 通道

分布式课程, 线程和锁一定是最基本的元素
由于 GO 本身提倡的Share memory by communicating; don't communicate by sharing memory., 所以在实现的时候试图不用 sharing memory + lock 而多用 channel 来实现,
这时候就带来了一些小小的不方便, 比如一个字符串 s 可以被多个 goroutine 读而被一个 goroutine 写, 显然我们要将其加锁.

因为不想加锁于是我试图用 atomic 来解决这个问题, 其中有一个 Value 类型我发现很有趣, 于是决定写下来.

源代码分析
atomic.Value 分为两个操作,
通过 Store()存储 Value,
通过 Load() 来读取 Value 的值.
atomic.Value
value.go 中提供一个Value结构体,可以帮助用来存储和装载任意值。
Store() 操作有两种行为模式:

First Store : 当我们第一次调用 Store 的时候, Store 函数会初始化 typ 指针 (需要注意的是, 每一个 Value 在第一次 Store 之后 typ 就被确定而不能更改了, 否则会 panic).
如果 typ==nil 则函数会调用 runtime_procPin(没找到实现, 但注释中说是 active spin wait)
随后调用原子操作函数 CompareAndSwapPointer(typ, nil, unsafe.Pointer(^uintptr(0))), 如果此时结果返回 false 说明 typ 已经不等于 nil(被其他 goroutine 修改过), 于是调用 runtime_procUnpin 解锁并重新进行 Store 过程.
如果原子操作函数返回了 true, 即 typ == nil, 那么存储 typ 以及 data 的指针.

后面的每次 Store 调用都是直接替换掉 data 指针

Load() 函数检测 typ 的值, 如果为 nil 或者正在进行首次调用 Store 则会返回 nil. 否则返回一个 interface{}(实际存储的是 ifaceWords 值)

// Value结构体提供原子装载和存储始任意类型的值
type Value struct {
    v interface{}
}

// ifaceWords 结构体是实际存储我们的 Value 值的地方, 可以看到, 我们存储的实际是指向 Value 的 type 和 data 的指针.
// ifaceWords 是 interface{} 的内部表示
type ifaceWords struct {
    typ  unsafe.Pointer
    data unsafe.Pointer
}

func (v *Value) Load() (x interface{}) {
    vp := (*ifaceWords)(unsafe.Pointer(v))  // 通过unsafe.Pointer做一次指针转换
    typ := LoadPointer(&vp.typ) // 一定要用LoadPointer,以保证原子性,比如读到的不是cpu cache
    if typ == nil || uintptr(typ) == ^uintptr(0) {
        // First store not yet completed.
        return nil
    }
    data := LoadPointer(&vp.data)
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    xp.typ = typ
    xp.data = data
    return
}

func (v *Value) Store(x interface{}) {
    if x == nil {
        // 把panic当NullPointException来用?是不是有点重啊?
        panic("sync/atomic: store of nil value into Value")
    }
    vp := (*ifaceWords)(unsafe.Pointer(v)) // 通过unsafe.Pointer做指针转换
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    for {
        typ := LoadPointer(&vp.typ)
        if typ == nil {
            // 尝试开始第一次存储
            // 关闭抢占(preemption)以便其他goroutine能使用active spin wait来等待完成。
            // 另外这样一来GC也不会意外的看到假类型
            runtime_procPin()
            // 通过acs操作来检查并设置tpy为特殊的标志位
            if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
                // 如果失败,表示其他goroutine抢先
                runtime_procUnpin()
                continue
            }
            // 如果成功表示获得设置存储的权利,执行第一次存储
            StorePointer(&vp.data, xp.data)
            StorePointer(&vp.typ, xp.typ)
            runtime_procUnpin()
            return
        }
        if uintptr(typ) == ^uintptr(0) {
            // 第一次存储进行中,等。
            // 因为我们在第一次存储前后禁用了抢占
            // 我们可以使用active spin来等待
            continue
        }
        // First store completed. Check type and overwrite data.
        if typ != xp.typ {
            panic("sync/atomic: store of inconsistently typed value into Value")
        }
        StorePointer(&vp.data, xp.data)
        return
    }
}

TBD: runtime_procPin()和runtime_procUnpin()方法的使用不太理解,也没有找到资料。

`