內(nèi)容
- Atomic
- Mutex
- RWMutex
- WaitGroup
- Once
- Sync.Map
- Sync.Pool
- Cond
Atomic
go中atomic 包實(shí)現(xiàn)原子操作
- 基本類(lèi)型的原子操作有6種:int32, int64, uint32, uint64, uintptr, unsafe.Pinter;操作類(lèi)型有5種:增減, 比較并交換, 載入, 存儲(chǔ),交換
- Value提供任意類(lèi)型的原子操作,操作類(lèi)型有:載入和存儲(chǔ)
基本數(shù)據(jù)類(lèi)型:
//增減操作
var a int32;
//增操作
new_a := atomic.AddInt32(&a, 1);
//減操作
new_a = atomic.AddInt32(&a, -1);
//CAS(Compare And Swap)比較并交換操作
//函數(shù)名以CompareAndSwap為前綴,并具體類(lèi)型名
//函數(shù)會(huì)先判斷參數(shù)一指向的值與參數(shù)二是否相等,如果相等,則用參數(shù)三替換參數(shù)一的值。
var b int32;
atomic.CompareAndSwapInt32(&b, 0, 3);
//載入操作
//當(dāng)我們對(duì)某個(gè)變量進(jìn)行讀取操作時(shí),可能該變量正在被其他操作改變,或許我們讀取的是被修改了一半的數(shù)據(jù)。
//所以我們通過(guò)Load這類(lèi)函數(shù)來(lái)確保我們正確的讀取
//函數(shù)名以L(fǎng)oad為前綴,加具體類(lèi)型名
var c int32;
tmp := atomic.LoadInt32(&c);
//存儲(chǔ)操作
//與載入函數(shù)相對(duì)應(yīng),提供原子的存儲(chǔ)函數(shù); 函數(shù)名以Store為前綴,加具體類(lèi)型名
var d int32;
//存儲(chǔ)某個(gè)值時(shí),任何CPU都不會(huì)都該值進(jìn)行讀或?qū)懖僮? //存儲(chǔ)操作總會(huì)成功,它不關(guān)心舊值是什么,與CAS不同
atomic.StoreInt32(&d, 666);
任意類(lèi)型:
提供了兩個(gè)方法,保證原子的存儲(chǔ)和讀?。úl(fā)操作時(shí),保證不會(huì)讀到正在寫(xiě)入到一半的值)
1 func (v *Value) Load() (x interface{})
2 func (v *Value) Store(x interface{})
type AtomicStruct struct {
Age int
name string
}
config := atomic.Value{} // 1 聲明一個(gè)value類(lèi)型變量,相當(dāng)于一個(gè)容器
config.Store(&AtomicStruct{}) // 2 說(shuō)明這個(gè)容器存儲(chǔ)的是AtomicStruct 指針類(lèi)型,一旦說(shuō)明后,后續(xù)的store操作只能是這個(gè)類(lèi)型
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
defer wg.Done()
if i == 5 {
name := "name" + string(i)
config.Store(&AtomicStruct{Age: i, name: name})
}
fmt.Println(config.Load())
}(i)
}
wg.Wait()
Mutex
互斥鎖: go中通過(guò)sync.Mutex的零值來(lái)表示一個(gè)沒(méi)有被鎖定的互斥量,開(kāi)箱即用,申明一個(gè)變量就可以用,但是如果把互斥量當(dāng)做入?yún)魅氲胶瘮?shù)中使用時(shí),只能傳遞指針,不能是值傳遞,值傳遞是拷貝,不能達(dá)到鎖住的作用。
var x int
func main() {
var m sync.Mutex
add(&m, 2)
}
// 同步的做甲方
func add(m *sync.Mutex, value int){
m.Lock()
x = x + value
m.Unlock()
}
RWMutex
讀寫(xiě)鎖: go中通過(guò)sync.RWMutex的零值來(lái)表示一個(gè)讀寫(xiě)鎖實(shí)例,讀寫(xiě)鎖中有兩對(duì)方法,分別是讀鎖定和讀解鎖、寫(xiě)鎖定和寫(xiě)解鎖
- 規(guī)則:讀寫(xiě)鎖下,多個(gè)寫(xiě)操作之間是互斥的,寫(xiě)操作和讀操作之間也是互斥的,但是多個(gè)讀操作之間不存在互斥關(guān)系
rw.Lock() // 寫(xiě)鎖定
rw.Unlock()
rw.RLock() // 讀鎖定
rw.RUnlock()
func main() {
var rw sync.RWMutex
go func() {
rw.RLock()
println(1)
time.Sleep(2*time.Second)
rw.RUnlock()
println(2)
}()
time.Sleep(1*time.Second)
println(3)
// 被堵在這里,因?yàn)閰f(xié)程中 rw.RLock()加了讀鎖,會(huì)阻塞寫(xiě)鎖,不會(huì)堵塞其他協(xié)程獲取讀鎖,rw.RUnlock()執(zhí)行后,釋放了讀鎖,同樣也會(huì)釋放寫(xiě)鎖,前提是已經(jīng)沒(méi)有任何其他的讀鎖了;如果是先執(zhí)行rw.Lock(),同樣也會(huì)堵塞其他寫(xiě)鎖和讀鎖,rw.Ulock()會(huì)把讀鎖和寫(xiě)鎖全部釋放
rw.Lock()
println(4)
}
WaitGroup
- wg會(huì)阻塞在wait()方法上,等待一組協(xié)程執(zhí)行結(jié)束,同樣是開(kāi)箱即用,在函數(shù)中當(dāng)做入?yún)鬟f時(shí),記住傳遞指針
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
println(1)
wg.Done()
}()
//主協(xié)程阻塞在這里,必定會(huì)等協(xié)程中執(zhí)行完畢后,才會(huì)執(zhí)行打印2
wg.Wait()
println(2)
}
Once
- sync.Once 同樣開(kāi)箱即用,申明即可, Do方法接收一個(gè)無(wú)參數(shù)、無(wú)結(jié)果的函數(shù)作為參數(shù),保證這個(gè)方法只執(zhí)行一次
- 原理是Once結(jié)構(gòu)體內(nèi)維護(hù)了一個(gè)無(wú)符號(hào)的變量done,當(dāng)執(zhí)行一次func后會(huì)原子的加1,如果done不會(huì)0時(shí),就不會(huì)執(zhí)行func了
var once sync.Once //同樣開(kāi)箱即用
// Do方法接收一個(gè)無(wú)參數(shù)、無(wú)結(jié)果的函數(shù)作為參數(shù),保證這個(gè)方法只執(zhí)行一次
Once.Do(func(){
fmt.println(“func方法只執(zhí)行一次")
})
// 原理是Once結(jié)構(gòu)體內(nèi)維護(hù)了一個(gè)無(wú)符號(hào)的變量done,當(dāng)執(zhí)行一次func后會(huì)原子的加1,如果done不會(huì)0時(shí),就不會(huì)執(zhí)行func了
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
sync.Map
- 并發(fā)安全的map
func main(){
var sm sync.Map
// 寫(xiě)入
sm.Store("a", 1)
sm.Store("b", 2)
sm.Store("c", 3)
// 取出
v, _ := sm.Load("a")
fmt.Println(v.(int))
// 刪除
sm.Delete("b")
// 存在則獲取 不存在則添加
sm.LoadOrStore("d", 4)
// 遍歷
sm.Range(func(key, value interface{}) bool {
fmt.Println(key.(string), value.(int))
return true
})
}
Sync.Pool
- Pool用來(lái)做一個(gè)臨時(shí)對(duì)象池,當(dāng)某個(gè)對(duì)象會(huì)被經(jīng)常創(chuàng)建或是在并發(fā)場(chǎng)景下多個(gè)協(xié)程會(huì)會(huì)創(chuàng)建相同的對(duì)象,此時(shí)可以考慮用sync.Pool來(lái)優(yōu)化性能,避免大量對(duì)象的創(chuàng)建 銷(xiāo)毀引起的GC問(wèn)題。
- Pool只有兩個(gè)方法,Get和Put, Get從對(duì)象池中獲取對(duì)象,如果不存在則用初始化對(duì)象池是賦予的New方法創(chuàng)建一個(gè)對(duì)象返回, Put方法用于當(dāng)用完對(duì)象后,把對(duì)象歸還到對(duì)象池,如果對(duì)象需要保持初始化狀態(tài),則用完對(duì)象后,應(yīng)該對(duì)對(duì)象做一些清零的邏輯,然后在歸還到池子中,否則下一次get是,獲取到的對(duì)象會(huì)保存了上一次put的記錄;
- Pool不適于做連接池之類(lèi)的,因?yàn)閜ool會(huì)在GC是被回收的;
- Pool是并發(fā)安全的;
舉個(gè)例子
func SyncPoolExample() {
pool := sync.Pool{
New: func() interface{} {
fmt.Println("aa")
return People{}
},
}
for i:=0; i < 3; i++ {
go func() {
people := pool.Get().(People)
fmt.Println(people)
people.name = "li"
fmt.Println(people)
people.name = ""
pool.Put(people)
}()
time.Sleep(time.Second * 2)
}
time.Sleep(time.Second * 10)
}
aa
{0 }
{0 li}
aa
{0 }
{0 li}
{0 }
{0 li}
當(dāng)協(xié)程還沒(méi)有歸還對(duì)象到池子里時(shí),如果其他協(xié)程此時(shí)來(lái)get,則就會(huì)新建一個(gè)
//example 2
func main() {
pool := sync.Pool{
New: func() interface{} {
b := make([]byte, 0, 1024)
return b
},
}
b := pool.Get().([]byte)
b = append(b, 0xff)
b = b[:0] // 用完后,清空[]byte,在歸還池子
pool.Put(b)
}
實(shí)現(xiàn)原理:
- 深度解密Go語(yǔ)言之sync.Pool: 深度解密 Go 語(yǔ)言之 sync.Pool
- 我所理解的Sync Pool: https://www.haohongfan.com/post/2019-05-26-sync-pool/
- go sync.pool []byte導(dǎo)致grpc解包異常 [http://xiaorui.cc/archives/5969]
Cond
條件變量,類(lèi)似java object中的wait notify和notifyAll方法,Cond也提供了三個(gè)方法Wait、Singal、Broadcast
func SyncCondExample(){
cond := sync.NewCond(new(sync.Mutex))
for i := 0; i < 10; i++ {
go func(i int) {
cond.L.Lock()
cond.Wait()
fmt.Println("goroutine: ", i)
cond.L.Unlock()
}(i)
}
fmt.Println("all goroutines wait...")
time.Sleep(time.Second * 3)
// 按等待順序 釋放一個(gè)
//cond.Signal()
// 釋放所有
cond.Broadcast()
time.Sleep(time.Second * 3)
}
all goroutines wait...
goroutine: 4
goroutine: 8
goroutine: 5
goroutine: 1
goroutine: 0
goroutine: 7
goroutine: 3
goroutine: 2
goroutine: 6
goroutine: 9