Golang Sync包

原文鏈接:https://blog.csdn.net/chenguolinblog/article/details/90691127

前言

Golang sync包提供了基礎的異步操作方法,包括互斥鎖Mutex,執(zhí)行一次Once和并發(fā)等待組WaitGroup。
本文主要介紹sync包提供的這些功能的基本使用方法。

  • Mutex: 互斥鎖
  • RWMutex:讀寫鎖
  • WaitGroup:并發(fā)等待組
  • Once:執(zhí)行一次
  • Cond:信號量
  • Pool:臨時對象池
  • Map:自帶鎖的map
sync.Mutex

sync.Mutex稱為互斥鎖,常用在并發(fā)編程里面。互斥鎖需要保證的是同一個時間段內(nèi)不能有多個并發(fā)協(xié)程同時訪問某一個資源(臨界區(qū))
sync.Mutex有2個函數(shù)Lock和UnLock分別表示獲得鎖和釋放鎖。

func (m *Mutex) Lock()      //獲取鎖
func (m *Mutex) UnLock()    //釋放鎖

sync.Mutex初始值為UnLock狀態(tài),并且sync.Mutex常做為其它結(jié)構(gòu)體的匿名變量使用。

舉個例子: 我們經(jīng)常使用網(wǎng)上支付購物東西,就會出現(xiàn)同一個銀行賬戶在某一個時間既有支出也有收入,那銀行就得保證我們余額準確,保證數(shù)據(jù)無誤。
我們可以簡單的實現(xiàn)銀行的支出和收入來說明Mutex的使用

type Bank struct {
    sync.Mutex
    balance map[string]float64
}

// In 收入
func (b *Bank) In(account string, value float64) {
        // 加鎖 保證同一時間只有一個協(xié)程能訪問這段代碼
        // 加鎖 保證同一時間只有一個協(xié)程能訪問這段代碼
    b.Lock()
    defer b.Unlock()

    if _, ok := b.balance[account]; !ok {
        b.balance[account] = 0.0
    }
    
    b.balance[account] += value
}

// Out 支出
func (b *Bank) Out(account string, value float64) error {
        // 加鎖 保證同一時間只有一個協(xié)程能訪問這段代碼
    b.Lock()
    defer b.Unlock()

    v, ok := b.balance[account]
    if !ok || v < value {
        return errors.New("account not enough balance")
    }

    b.balance[account] -= value
    return nil
}
sync.RWMutex

sync.RWMutex稱為讀寫鎖,是sync.Mutex的一種變種,RWMutex來自于計算機操作系統(tǒng)非常有名的讀者寫者問題。
sync.RWMutex目的是為了能夠支持多個并發(fā)協(xié)程同時讀取某一個資源,但只有一個并發(fā)協(xié)程能夠更新資源。也就是說讀和寫是互斥的,寫和寫也是互斥的,讀和讀是不互斥的。

總結(jié)起來如下

當有一個協(xié)程在讀的時候,所有寫的協(xié)程必須等到所有讀的協(xié)程結(jié)束才可以獲得鎖進行寫操作。
當有一個協(xié)程在讀的時候,所有讀的協(xié)程不受影響都可以進行讀操作。
當有一個協(xié)程在寫的時候,所有讀、寫的協(xié)程必須等到寫的協(xié)程結(jié)束才可以獲得鎖進行讀、寫操作。

RWMutex有5個函數(shù),分別為讀和寫提供鎖操作

寫操作
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()

讀操作
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()

RLocker()能獲取讀鎖,然后傳遞給其他協(xié)程使用。
func (rw *RWMutex) RLocker() Locker

舉個例子,sync.Mutex一節(jié)例子里面我們沒有提供查詢操作,如果用Mutex互斥鎖就沒有辦法支持多人同時查詢,所以我們使用sync.RWMutex來改寫這個代碼

type Bank struct {
    sync.RWMutex
    balance map[string]float64
}

func (b *Bank) In(account string, value float64) {
    b.Lock()
    defer b.Unlock()

    if _, ok := b.balance[account]; !ok {
        b.balance[account] = 0.0
    }

    b.balance[account] += value
}

func (b *Bank) Out(account string, value float64) error {
    b.Lock()
    defer b.Unlock()

    v, ok := b.balance[account]
    if !ok || v < value {
        return errors.New("account not enough balance")
    }

    b.balance[account] -= value
    return nil
}

func (b *Bank) Query(account string) float64 {
    b.RLock()
    defer b.RUnlock()

    if _,ok := b.balance[account]; !ok {
        return 0.0
    }

    return b.balance[account]
}
sync.WaitGroup

sync.WaitGroup指的是等待組,在Golang并發(fā)編程里面非常常見,指的是等待一組工作完成后,再進行下一組工作。

sync.WaitGroup有3個函數(shù)

func (wg *WaitGroup) Add(delta int)  Add添加n個并發(fā)協(xié)程
func (wg *WaitGroup) Done()  Done完成一個并發(fā)協(xié)程
func (wg *WaitGroup) Wait()  Wait等待其它并發(fā)協(xié)程結(jié)束

sync.WaitGroup在Golang編程里面最常用于協(xié)程池,下面這個例子會同時啟動1000個并發(fā)協(xié)程。

func main() {
     wg := &sync.WaitGroup{}
     for i := 0; i < 1000; i++ {
         wg.Add(1)
         go func() {
         defer func() {
        wg.Done()
         }()
         time.Sleep(1 * time.Second)
         fmt.Println("hello world ~")
     }()
     }
     // 等待所有協(xié)程結(jié)束
     wg.Wait()
     fmt.Println("WaitGroup all process done ~")
}

sync.WaitGroup沒有辦法指定最大并發(fā)協(xié)程數(shù),在一些場景下會有問題。例如操作數(shù)據(jù)庫場景下,我們不希望某一些時刻出現(xiàn)大量連接數(shù)據(jù)庫導致數(shù)據(jù)庫不可訪問。所以,為了能夠控制最大的并發(fā)數(shù),推薦使用github.com/remeh/sizedwaitgroup,用法和sync.WaitGroup非常類似。

下面這個例子最多只有10個并發(fā)協(xié)程,如果已經(jīng)達到10個并發(fā)協(xié)程,只有某一個協(xié)程執(zhí)行了Done才能啟動一個新的協(xié)程。

import  "github.com/remeh/sizedwaitgroup"

func main() {
     # 最大10個并發(fā)
     wg := sizedwaitgroup.New(10)
     for i = 0; i < 1000; i++ {
         wg.Add()
         go func() {
         defer func() {
        wg.Done()
         }()
         time.Sleep(1 * time.Second)
         fmt.Println("hello world ~")
     }()
     }
     // 等待所有協(xié)程結(jié)束
     wg.Wait()
     fmt.Println("WaitGroup all process done ~")
}
sync.Once

sync.Once指的是只執(zhí)行一次的對象實現(xiàn),常用來控制某些函數(shù)只能被調(diào)用一次。sync.Once的使用場景例如單例模式、系統(tǒng)初始化。
例如并發(fā)情況下多次調(diào)用channel的close會導致panic,解決這個問題我們可以使用sync.Once來保證close只會被執(zhí)行一次。

sync.Once的結(jié)構(gòu)如下所示,只有一個函數(shù)。使用變量done來記錄函數(shù)的執(zhí)行狀態(tài),使用sync.Mutex和sync.atomic來保證線程安全的讀取done。

type Once struct {
    m    Mutex     #互斥鎖
    done uint32    #執(zhí)行狀態(tài)
}

func (o *Once) Do(f func())

舉個例子,1000個并發(fā)協(xié)程情況下只有一個協(xié)程會執(zhí)行到fmt.Printf,多次執(zhí)行的情況下輸出的內(nèi)容還不一樣,因為這取決于哪個協(xié)程先調(diào)用到該匿名函數(shù)。

func main() {
     once := &sync.Once{}
     for i := 0; i < 1000; i++ {
     go func(idx int) {
        once.Do(func() {
        time.Sleep(1 * time.Second)
        fmt.Printf("hello world index: %d", idx)
        })
     }(i)
     }

     time.Sleep(5 * time.Second)
}
sync.Cond

sync.Cond指的是同步條件變量,一般需要與互斥鎖組合使用,本質(zhì)上是一些正在等待某個條件的協(xié)程的同步機制。

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

sync.Cond有3個函數(shù)Wait、Signal、Broadcast

// Wait 等待通知
func (c *Cond) Wait()
// Signal 單播通知
func (c *Cond) Signal()
// Broadcast 廣播通知
func (c *Cond) Broadcast()

舉個例子,sync.Cond用于并發(fā)協(xié)程條件變量。

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}
sync.Pool

sync.Pool指的是臨時對象池,Golang和Java具有GC機制,因此很多開發(fā)者基本上都不會考慮內(nèi)存回收問題,不像C++很多時候開發(fā)需要自己回收對象。
Gc是一把雙刃劍,帶來了編程的方便但同時也增加了運行時開銷,使用不當可能會嚴重影響程序的性能,因此性能要求高的場景不能任意產(chǎn)生太多的垃圾。
sync.Pool正是用來解決這類問題的,Pool可以作為臨時對象池來使用,不再自己單獨創(chuàng)建對象,而是從臨時對象池中獲取出一個對象。

sync.Pool有2個函數(shù)Get和Put,Get負責從臨時對象池中取出一個對象,Put用于結(jié)束的時候把對象放回臨時對象池中。

func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})

看一個官方例子

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func timeNow() time.Time {
    return time.Unix(1136214245, 0)
}

func Log(w io.Writer, key, val string) {
    // 獲取臨時對象,沒有的話會自動創(chuàng)建
    b := bufPool.Get().(*bytes.Buffer)
    b.Reset()
    b.WriteString(timeNow().UTC().Format(time.RFC3339))
    b.WriteByte(' ')
    b.WriteString(key)
    b.WriteByte('=')
    b.WriteString(val)
    w.Write(b.Bytes())
    // 將臨時對象放回到 Pool 中
    bufPool.Put(b)
}

func main() {
    Log(os.Stdout, "path", "/search?q=flowers")
}

從上面的例子我們可以看到創(chuàng)建一個Pool對象并不能指定大小,所以sync.Pool的緩存對象數(shù)量是沒有限制的(只受限于內(nèi)存),那sync.Pool是如何控制緩存臨時對象數(shù)的呢?
sync.Pool在init的時候注冊了一個poolCleanup函數(shù),它會清除所有的pool里面的所有緩存的對象,該函數(shù)注冊進去之后會在每次Gc之前都會調(diào)用,因此sync.Pool緩存的期限只是兩次Gc之間這段時間。正因Gc的時候會清掉緩存對象,所以不用擔心pool會無限增大的問題。

正因為如此sync.Pool適合用于緩存臨時對象,而不適合用來做持久保存的對象池(連接池等)。

sync.Map

Go在1.9版本之前自帶的map對象是不具有并發(fā)安全的,很多時候我們都得自己封裝支持并發(fā)安全的Map結(jié)構(gòu),如下所示給map加個讀寫鎖sync.RWMutex。

type MapWithLock struct {
    sync.RWMutex
    M map[string]Kline
}

Go1.9版本新增了sync.Map它是原生支持并發(fā)安全的map,sync.Map封裝了更為復雜的數(shù)據(jù)結(jié)構(gòu)實現(xiàn)了比之前加讀寫鎖鎖map更優(yōu)秀的性能。

sync.Map總共5個函數(shù),用法和原生的map差點很多

// 查詢一個key
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// 設置key value
func (m *Map) Store(key, value interface{})
// 如果key存在則返回key對應的value,否則設置key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// 刪除一個key
func (m *Map) Delete(key interface{})
// 遍歷map,仍然是無序的
func (m *Map) Range(f func(key, value interface{}) bool)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容