go Sync.Cond介紹

是什么

sync.Cond 條件變量用來協(xié)調(diào)想要訪問共享資源的那些 goroutine,當(dāng)共享資源的狀態(tài)發(fā)生變化的時(shí)候,它可以用來通知被互斥鎖阻塞的 goroutine。

sync.Cond 基于互斥鎖/讀寫鎖,經(jīng)常用在多個(gè) goroutine 等待,一個(gè) goroutine 通知(事件發(fā)生)的場(chǎng)景。

怎么用

sync.Cond 的結(jié)構(gòu)

// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
        noCopy noCopy

        // L is held while observing or changing the condition
        L Locker

        notify  notifyList
        checker copyChecker
}

sync.Cond 的四個(gè)方法

// NewCond 創(chuàng)建 Cond 實(shí)例時(shí),需要關(guān)聯(lián)一個(gè)鎖。
func NewCond(l Locker) *Cond{}

// Broadcast 喚醒所有等待條件變量 c 的 goroutine,無需鎖保護(hù)。
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast(){}

// Signal 喚醒一個(gè)協(xié)程
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal(){}

// Wait 等待
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait(){}

使用demo

package main

import (
    "log"
    "sync"
    "time"
)

var done = false

func read(name string, c *sync.Cond) {
    c.L.Lock()
    for !done {
        c.Wait()
    }
    log.Println(name, "starts reading")
    c.L.Unlock()
}

func write(name string, c *sync.Cond) {
    log.Println(name, "starts writing")
    time.Sleep(time.Second)
    c.L.Lock()
    done = true
    c.L.Unlock()
    log.Println(name, "wakes")
    c.Broadcast()
    //c.Signal()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})

    go read("reader1", cond)
    go read("reader2", cond)
    go read("reader3", cond)
    write("writer", cond)

    time.Sleep(time.Second * 3)
}

應(yīng)用場(chǎng)景

經(jīng)常用在多個(gè) goroutine 等待,一個(gè) goroutine 通知(事件發(fā)生)的場(chǎng)景。

比如:有一個(gè)協(xié)程在異步地接收數(shù)據(jù),剩下的多個(gè)協(xié)程必須等待這個(gè)協(xié)程接收完數(shù)據(jù),才能讀取到正確的數(shù)據(jù)

實(shí)際使用

https://github.com/panjf2000/ants.
第三方包goroutine池,ants包中pool.go文件

// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

    w = p.workers.detach()
    if w != nil { // first try to fetch the worker from the queue
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        // if the worker queue is empty and we don't run out of the pool capacity,
        // then just spawn a new worker goroutine.
        p.lock.Unlock()
        spawnWorker()
    } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        p.blockingNum++
        p.cond.Wait() // block and wait for an available worker
        p.blockingNum--
        var nw int
        if nw = p.Running(); nw == 0 { // awakened by the scavenger
            p.lock.Unlock()
            if !p.IsClosed() {
                spawnWorker()
            }
            return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }

        p.lock.Unlock()
    }
    return
}

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
    if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
        p.cond.Broadcast()
        return false
    }
    worker.recycleTime = time.Now()
    p.lock.Lock()

    // To avoid memory leaks, add a double check in the lock scope.
    // Issue: https://github.com/panjf2000/ants/issues/113
    if p.IsClosed() {
        p.lock.Unlock()
        return false
    }

    err := p.workers.insert(worker)
    if err != nil {
        p.lock.Unlock()
        return false
    }

    // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
    p.cond.Signal()
    p.lock.Unlock()
    return true
}

參考

1、sync.Cond 條件變量
2、一文讀懂 Go sync.Cond 設(shè)計(jì)
3、源碼剖析sync.Cond(條件變量的實(shí)現(xiàn)機(jī)制)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容