原文鏈接:https://blog.csdn.net/chenguolinblog/article/details/90691127
前言
Golang sync包提供了基礎的異步操作方法,包括互斥鎖Mutex,執(zhí)行一次Once和并發(fā)等待組WaitGroup。
本文主要介紹sync包提供的這些功能的基本使用方法。
- Mutex: 互斥鎖
- RWMutex:讀寫鎖
- WaitGroup:并發(fā)等待組
- Once:執(zhí)行一次
- Cond:信號量
- Pool:臨時對象池
- Map:自帶鎖的map
sync.Mutex
sync.Mutex稱為互斥鎖,常用在并發(fā)編程里面。互斥鎖需要保證的是同一個時間段內(nèi)不能有多個并發(fā)協(xié)程同時訪問某一個資源(臨界區(qū))
sync.Mutex有2個函數(shù)Lock和UnLock分別表示獲得鎖和釋放鎖。
func (m *Mutex) Lock() //獲取鎖
func (m *Mutex) UnLock() //釋放鎖
sync.Mutex初始值為UnLock狀態(tài),并且sync.Mutex常做為其它結(jié)構(gòu)體的匿名變量使用。
舉個例子: 我們經(jīng)常使用網(wǎng)上支付購物東西,就會出現(xiàn)同一個銀行賬戶在某一個時間既有支出也有收入,那銀行就得保證我們余額準確,保證數(shù)據(jù)無誤。
我們可以簡單的實現(xiàn)銀行的支出和收入來說明Mutex的使用
type Bank struct {
sync.Mutex
balance map[string]float64
}
// In 收入
func (b *Bank) In(account string, value float64) {
// 加鎖 保證同一時間只有一個協(xié)程能訪問這段代碼
// 加鎖 保證同一時間只有一個協(xié)程能訪問這段代碼
b.Lock()
defer b.Unlock()
if _, ok := b.balance[account]; !ok {
b.balance[account] = 0.0
}
b.balance[account] += value
}
// Out 支出
func (b *Bank) Out(account string, value float64) error {
// 加鎖 保證同一時間只有一個協(xié)程能訪問這段代碼
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
sync.RWMutex
sync.RWMutex稱為讀寫鎖,是sync.Mutex的一種變種,RWMutex來自于計算機操作系統(tǒng)非常有名的讀者寫者問題。
sync.RWMutex目的是為了能夠支持多個并發(fā)協(xié)程同時讀取某一個資源,但只有一個并發(fā)協(xié)程能夠更新資源。也就是說讀和寫是互斥的,寫和寫也是互斥的,讀和讀是不互斥的。
總結(jié)起來如下
當有一個協(xié)程在讀的時候,所有寫的協(xié)程必須等到所有讀的協(xié)程結(jié)束才可以獲得鎖進行寫操作。
當有一個協(xié)程在讀的時候,所有讀的協(xié)程不受影響都可以進行讀操作。
當有一個協(xié)程在寫的時候,所有讀、寫的協(xié)程必須等到寫的協(xié)程結(jié)束才可以獲得鎖進行讀、寫操作。
RWMutex有5個函數(shù),分別為讀和寫提供鎖操作
寫操作
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
讀操作
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
RLocker()能獲取讀鎖,然后傳遞給其他協(xié)程使用。
func (rw *RWMutex) RLocker() Locker
舉個例子,sync.Mutex一節(jié)例子里面我們沒有提供查詢操作,如果用Mutex互斥鎖就沒有辦法支持多人同時查詢,所以我們使用sync.RWMutex來改寫這個代碼
type Bank struct {
sync.RWMutex
balance map[string]float64
}
func (b *Bank) In(account string, value float64) {
b.Lock()
defer b.Unlock()
if _, ok := b.balance[account]; !ok {
b.balance[account] = 0.0
}
b.balance[account] += value
}
func (b *Bank) Out(account string, value float64) error {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
func (b *Bank) Query(account string) float64 {
b.RLock()
defer b.RUnlock()
if _,ok := b.balance[account]; !ok {
return 0.0
}
return b.balance[account]
}
sync.WaitGroup
sync.WaitGroup指的是等待組,在Golang并發(fā)編程里面非常常見,指的是等待一組工作完成后,再進行下一組工作。
sync.WaitGroup有3個函數(shù)
func (wg *WaitGroup) Add(delta int) Add添加n個并發(fā)協(xié)程
func (wg *WaitGroup) Done() Done完成一個并發(fā)協(xié)程
func (wg *WaitGroup) Wait() Wait等待其它并發(fā)協(xié)程結(jié)束
sync.WaitGroup在Golang編程里面最常用于協(xié)程池,下面這個例子會同時啟動1000個并發(fā)協(xié)程。
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
// 等待所有協(xié)程結(jié)束
wg.Wait()
fmt.Println("WaitGroup all process done ~")
}
sync.WaitGroup沒有辦法指定最大并發(fā)協(xié)程數(shù),在一些場景下會有問題。例如操作數(shù)據(jù)庫場景下,我們不希望某一些時刻出現(xiàn)大量連接數(shù)據(jù)庫導致數(shù)據(jù)庫不可訪問。所以,為了能夠控制最大的并發(fā)數(shù),推薦使用github.com/remeh/sizedwaitgroup,用法和sync.WaitGroup非常類似。
下面這個例子最多只有10個并發(fā)協(xié)程,如果已經(jīng)達到10個并發(fā)協(xié)程,只有某一個協(xié)程執(zhí)行了Done才能啟動一個新的協(xié)程。
import "github.com/remeh/sizedwaitgroup"
func main() {
# 最大10個并發(fā)
wg := sizedwaitgroup.New(10)
for i = 0; i < 1000; i++ {
wg.Add()
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
// 等待所有協(xié)程結(jié)束
wg.Wait()
fmt.Println("WaitGroup all process done ~")
}
sync.Once
sync.Once指的是只執(zhí)行一次的對象實現(xiàn),常用來控制某些函數(shù)只能被調(diào)用一次。sync.Once的使用場景例如單例模式、系統(tǒng)初始化。
例如并發(fā)情況下多次調(diào)用channel的close會導致panic,解決這個問題我們可以使用sync.Once來保證close只會被執(zhí)行一次。
sync.Once的結(jié)構(gòu)如下所示,只有一個函數(shù)。使用變量done來記錄函數(shù)的執(zhí)行狀態(tài),使用sync.Mutex和sync.atomic來保證線程安全的讀取done。
type Once struct {
m Mutex #互斥鎖
done uint32 #執(zhí)行狀態(tài)
}
func (o *Once) Do(f func())
舉個例子,1000個并發(fā)協(xié)程情況下只有一個協(xié)程會執(zhí)行到fmt.Printf,多次執(zhí)行的情況下輸出的內(nèi)容還不一樣,因為這取決于哪個協(xié)程先調(diào)用到該匿名函數(shù)。
func main() {
once := &sync.Once{}
for i := 0; i < 1000; i++ {
go func(idx int) {
once.Do(func() {
time.Sleep(1 * time.Second)
fmt.Printf("hello world index: %d", idx)
})
}(i)
}
time.Sleep(5 * time.Second)
}
sync.Cond
sync.Cond指的是同步條件變量,一般需要與互斥鎖組合使用,本質(zhì)上是一些正在等待某個條件的協(xié)程的同步機制。
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
sync.Cond有3個函數(shù)Wait、Signal、Broadcast
// Wait 等待通知
func (c *Cond) Wait()
// Signal 單播通知
func (c *Cond) Signal()
// Broadcast 廣播通知
func (c *Cond) Broadcast()
舉個例子,sync.Cond用于并發(fā)協(xié)程條件變量。
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
sync.Pool
sync.Pool指的是臨時對象池,Golang和Java具有GC機制,因此很多開發(fā)者基本上都不會考慮內(nèi)存回收問題,不像C++很多時候開發(fā)需要自己回收對象。
Gc是一把雙刃劍,帶來了編程的方便但同時也增加了運行時開銷,使用不當可能會嚴重影響程序的性能,因此性能要求高的場景不能任意產(chǎn)生太多的垃圾。
sync.Pool正是用來解決這類問題的,Pool可以作為臨時對象池來使用,不再自己單獨創(chuàng)建對象,而是從臨時對象池中獲取出一個對象。
sync.Pool有2個函數(shù)Get和Put,Get負責從臨時對象池中取出一個對象,Put用于結(jié)束的時候把對象放回臨時對象池中。
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
看一個官方例子
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// 獲取臨時對象,沒有的話會自動創(chuàng)建
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// 將臨時對象放回到 Pool 中
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
從上面的例子我們可以看到創(chuàng)建一個Pool對象并不能指定大小,所以sync.Pool的緩存對象數(shù)量是沒有限制的(只受限于內(nèi)存),那sync.Pool是如何控制緩存臨時對象數(shù)的呢?
sync.Pool在init的時候注冊了一個poolCleanup函數(shù),它會清除所有的pool里面的所有緩存的對象,該函數(shù)注冊進去之后會在每次Gc之前都會調(diào)用,因此sync.Pool緩存的期限只是兩次Gc之間這段時間。正因Gc的時候會清掉緩存對象,所以不用擔心pool會無限增大的問題。
正因為如此sync.Pool適合用于緩存臨時對象,而不適合用來做持久保存的對象池(連接池等)。
sync.Map
Go在1.9版本之前自帶的map對象是不具有并發(fā)安全的,很多時候我們都得自己封裝支持并發(fā)安全的Map結(jié)構(gòu),如下所示給map加個讀寫鎖sync.RWMutex。
type MapWithLock struct {
sync.RWMutex
M map[string]Kline
}
Go1.9版本新增了sync.Map它是原生支持并發(fā)安全的map,sync.Map封裝了更為復雜的數(shù)據(jù)結(jié)構(gòu)實現(xiàn)了比之前加讀寫鎖鎖map更優(yōu)秀的性能。
sync.Map總共5個函數(shù),用法和原生的map差點很多
// 查詢一個key
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// 設置key value
func (m *Map) Store(key, value interface{})
// 如果key存在則返回key對應的value,否則設置key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// 刪除一個key
func (m *Map) Delete(key interface{})
// 遍歷map,仍然是無序的
func (m *Map) Range(f func(key, value interface{}) bool)