深入分析go調(diào)度(三)

以下文章均為拜讀公眾號 源碼游記 的筆記 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect

3. Goroutine調(diào)度策略

前面提過,所謂的goroutine調(diào)度,是指程序代碼按照一定的算法找到合適的g并放到CPU上去運(yùn)行的過程。

所以有以下三個(gè)問題需要思考解決

  1. 調(diào)度時(shí)機(jī):什么時(shí)候發(fā)生調(diào)度
  2. 調(diào)度策略:選取哪個(gè)g去調(diào)度
  3. 切換機(jī)制:怎么把g放到CPU中(這就是我們說的gogo

下面我們主要未然調(diào)度策略展開,看下我們是怎么選擇g的。

1. schedule的操作

之前我們看到schedule在調(diào)度循環(huán)中將一個(gè)g放到CPU上去運(yùn)行,但是在之前它還負(fù)責(zé)找到一個(gè)合適的g去調(diào)度。

這次我們主要關(guān)注那些調(diào)度算法

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    _g_ := getg()   //_g_ = m.g0

    ......

    var gp *g

    ......
   
    if gp == nil {
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
       //為了保證調(diào)度的公平性,每個(gè)工作線程每進(jìn)行61次調(diào)度就需要優(yōu)先從全局運(yùn)行隊(duì)列中獲取goroutine出來運(yùn)行,
       //因?yàn)槿绻徽{(diào)度本地運(yùn)行隊(duì)列中的goroutine,則全局運(yùn)行隊(duì)列中的goroutine有可能得不到運(yùn)行
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock) //所有工作線程都能訪問全局運(yùn)行隊(duì)列,所以需要加鎖
            gp = globrunqget(_g_.m.p.ptr(), 1) //從全局運(yùn)行隊(duì)列中獲取1個(gè)goroutine
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        //從與m關(guān)聯(lián)的p的本地運(yùn)行隊(duì)列中獲取goroutine
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        //如果從本地運(yùn)行隊(duì)列和全局運(yùn)行隊(duì)列都沒有找到需要運(yùn)行的goroutine,
        //則調(diào)用findrunnable函數(shù)從其它工作線程的運(yùn)行隊(duì)列中偷取,如果偷取不到,則當(dāng)前工作線程進(jìn)入睡眠,
        //直到獲取到需要運(yùn)行的goroutine之后findrunnable函數(shù)才會返回。
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    ......

    //當(dāng)前運(yùn)行的是runtime的代碼,函數(shù)調(diào)用棧使用的是g0的棧空間
    //調(diào)用execte切換到gp的代碼和棧空間去運(yùn)行
    execute(gp, inheritTime)  
}

由代碼可見,schedule函數(shù)大概分為了三步去尋找可運(yùn)行的g

三步查找

第一步

從全局隊(duì)列中尋找g。為了保證調(diào)度的公平性,每個(gè)工作線程每經(jīng)過61次調(diào)度就需要優(yōu)先嘗試從全局隊(duì)列中找出一個(gè)g來運(yùn)行,這樣保證每個(gè)g都有機(jī)會得到調(diào)用。

之前提過曲劇隊(duì)列是所有工作線程都可以訪問的,所以在訪問它之前需要加鎖。

第二步

從本地運(yùn)行隊(duì)列尋找g,這個(gè)是常態(tài),基本上都是從這里獲取。這個(gè)的前提是在上一步?jīng)]有取到g

第三步

從其他工作線程的運(yùn)行隊(duì)列中偷取g。

下面我們詳細(xì)看下這三步

2.全局運(yùn)行隊(duì)列獲取goroutine

下面我們來看下獲取g的函數(shù)globrunqget

// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 { // 全局運(yùn)行隊(duì)列為空,前面已經(jīng)加鎖了
        return nil
    }

    // 以下為一些數(shù)量限制
    n := sched.runqsize/gomaxprocs + 1 // 按照p數(shù)量均分
    if n > sched.runqsize {
        n = sched.runqsize
    }
    if max > 0 && n > max { // 不超過最大值
        n = max
    }
    if n > int32(len(_p_.runq))/2 { // 不超過一半,如果runq為1個(gè)是不是永遠(yuǎn)取不走???
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n // 更新個(gè)數(shù)

    gp := sched.runq.pop() // 至少取走一個(gè),
    n--
    for ; n > 0; n-- { // 循環(huán)取走,并放入本地運(yùn)行隊(duì)列
        gp1 := sched.runq.pop()  // 取
        runqput(_p_, gp1, false) // 放
    }
    return gp // 把第一個(gè)return ,用做運(yùn)行
}

該函數(shù)的第一個(gè)參數(shù)當(dāng)前工作線程綁定的p,第二個(gè)參數(shù)是最多可以取多少個(gè)。

并且如果多于1個(gè),那么其他取出放入本地運(yùn)行隊(duì)列。

3. 本地運(yùn)行隊(duì)列中獲取goroutine

函數(shù)runqget

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    for {
        next := _p_.runnext // runnext 高優(yōu)插隊(duì)者,如果有的話 優(yōu)先runnext TODO 查詢r(jià)unnext在什么條件下出現(xiàn)
        if next == 0 {
            break
        }
        if _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
    }

    for {
        // 獲取首尾
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        // 原子+1
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

這里需要注意的是取gp時(shí)候是加鎖的。

因?yàn)橛衅渌墓ぷ骶€程有可能也在訪問這2個(gè)成員,比如有人正在偷你的g

對于語義

<img src="http://picgo.vipkk.work/20200523210047.png" alt="image-20200523210047055" style="zoom:50%;" />

CAS操作和ABA問題

CAS 操作會造成[ABA問題][ABA]

那么在runqget中會不會呢,答案是不會的,分析如下

對于runnext

  1. 只有runqget才會將其改為0,其他地方都只是將他改為非0值
  2. 任何時(shí)刻只有一個(gè)當(dāng)前工作線程可以執(zhí)行runqget,不會并發(fā),也就不存在其他人將其改為0

對于runq

  1. 只有當(dāng)前線程才會去往自己的隊(duì)列添加g,知會去偷g,從而修改runqhead
  2. 所以,當(dāng)我們這個(gè)工作線程從runqhead讀取到值A(chǔ)之后,其它工作線程也就不可能修改runqhead的值為B之后再第二次把它修改為值A(chǔ)(因?yàn)閞unqtail在這段時(shí)間之內(nèi)不可能被修改,runqhead的值也就無法越過runqtail再回繞到A值),也就是說,代碼從邏輯上已經(jīng)杜絕了引發(fā)ABA的條件。

4. 從其他線程偷取G

      //如果從本地運(yùn)行隊(duì)列和全局運(yùn)行隊(duì)列都沒有找到需要運(yùn)行的goroutine,
      //則調(diào)用findrunnable函數(shù)從其它工作線程的運(yùn)行隊(duì)列中偷取,如果偷取不到,則當(dāng)前工作線程進(jìn)入睡眠,
      //直到獲取到需要運(yùn)行的goroutine之后findrunnable函數(shù)才會返回。
      gp, inheritTime = findrunnable() // blocks until work is available

findrunnable函數(shù)負(fù)責(zé)處理與盜取相關(guān)的邏輯,另外該函數(shù)還負(fù)責(zé)gc和netpoll等相關(guān)的事情。目前我們先關(guān)注偷取算法。因?yàn)榇a太長我們分步驟來看

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
}

在偷取之前再看下本地和全局有沒有可運(yùn)行的隊(duì)列,有的話取出返回收工

    // local runq 檢查本地隊(duì)列是否有可以運(yùn)行的
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq 檢查全局隊(duì)列是否有可以運(yùn)行的
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

接下來是一些netpoll相關(guān),跳過。

再然后是fastpath,快速失敗

需要注意fastpath的2個(gè)點(diǎn)

  1. m 沒有在自旋
  2. 自旋的個(gè)數(shù) 大于 work的個(gè)數(shù)。2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle)
    // Steal work from other P's.
    procs := uint32(gomaxprocs)
    ranTimer := false
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
    // 這里的意思是 m還沒有開始尋找,但是已經(jīng)有超多一半的m在尋找了,那么它就不找了,這樣省CPU。
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }

接著開始偷取,一共進(jìn)行4次嘗試。如果成功,則偷取一半并返回。當(dāng)進(jìn)行第3、4次嘗試的時(shí)候,也會嘗試偷取runnext。

這里的偷取為了公平是隨機(jī)選p的。不論位置還是下一個(gè)都是隨機(jī)的詳見[隨機(jī)偷取算法](# 隨機(jī)偷取算法)

    for i := 0; i < 4; i++ {
        // 隨機(jī)開始,隨機(jī)下一個(gè)
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            p2 := allp[enum.position()]
            if _p_ == p2 {
                continue
            }
            // 如果成功取出一半
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

當(dāng)找到的時(shí)候,通過runqsteal來偷取,

這個(gè)邏輯比較簡單,grab之后獲取到根據(jù)大小來跟新 runqhead & runqtail

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    t := _p_.runqtail
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    if n == 0 {
        return nil
    }
    n--
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    if t-h+n >= uint32(len(_p_.runq)) {
        throw("runqsteal: runq overflow")
    }
    atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
    return gp
}

接下來就是runqgrad這里負(fù)責(zé)了真正的偷取

grad 根據(jù)_p_的大小判斷偷取的一半,然后放進(jìn)batch里,再更新head&tail。 至此整個(gè)偷取算法完成。

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h
        n = n - n/2
        if n == 0 {
            ...
            return 0
        }
        if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
            continue
        }
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }
        if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
            return n
        }
    }
}

再回到findrunnable,如果四次都沒有成功則進(jìn)入stop模式(ps 忽略runtime,暫時(shí)沒看懂)

在這里首先將m與p解除綁定,然后準(zhǔn)備去休眠,然后把p放到空閑隊(duì)列,不過在結(jié)束之前進(jìn)行了最后一次判斷。

最后stopm

stop:

    ......

    // Before we drop our P, make a snapshot of the allp slice,
    // which can change underfoot once we no longer block
    // safe-points. We don't need to snapshot the contents because
    // everything up to cap(allp) is immutable.
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)
 
    ......
 
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
   
    // 當(dāng)前工作線程解除與p之間的綁定,準(zhǔn)備去休眠
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    //把p放入空閑隊(duì)列
    pidleput(_p_)
    unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        //m即將睡眠,狀態(tài)不再是spinning
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    // 休眠之前再看一下是否有工作要做
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }

    ......
    //休眠
    stopm()
    goto top
}

最后萬般無奈進(jìn)入stopm

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("stopm holding locks")
    }
    if _g_.m.p != 0 {
        throw("stopm holding p")
    }
    if _g_.m.spinning {
        throw("stopm spinning")
    }

    lock(&sched.lock)
    mput(_g_.m)   //把m結(jié)構(gòu)體對象放入sched.midle空閑隊(duì)列
    unlock(&sched.lock)
    notesleep(&_g_.m.park)  //進(jìn)入睡眠狀態(tài)
 
    //被其它工作線程喚醒
    noteclear(&_g_.m.park)
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

stopm的主要操作就是將m放到 midle隊(duì)列里,然后notesleep 進(jìn)入睡眠狀態(tài)。note的是go的一次性睡眠和喚醒機(jī)制,具體參考note細(xì)節(jié)。

繼續(xù)當(dāng)notesleep返回后,繼續(xù)綁定p 然后開始新的循環(huán)調(diào)度。

至于notesleep,它是通過futex完成的sleep。

從前面的代碼我們已經(jīng)看到,stopm調(diào)用notesleep時(shí)給它傳遞的參數(shù)是m結(jié)構(gòu)體的park成員,而m又早已通過mput放入了全局的milde空閑隊(duì)列,這樣其它運(yùn)行著的線程一旦發(fā)現(xiàn)有更多的goroutine需要運(yùn)行時(shí)就可以通過全局的m空閑隊(duì)列找到處于睡眠狀態(tài)的m,然后調(diào)用notewakeup(&m.park)將其喚醒,至于怎么喚醒,我們在其它章節(jié)繼續(xù)討論.

補(bǔ)充

隨機(jī)偷取算法


type randomEnum struct {
    i     uint32
    count uint32
    pos   uint32
    inc   uint32
}

func (ord *randomOrder) reset(count uint32) {
    ord.count = count
    ord.coprimes = ord.coprimes[:0]
    for i := uint32(1); i <= count; i++ {
        if gcd(i, count) == 1 {
            ord.coprimes = append(ord.coprimes, i) // 找比count小的質(zhì)數(shù)作為incr
        }
    }
}

func (ord *randomOrder) start(i uint32) randomEnum {
    return randomEnum{
        count: ord.count,
        pos:   i % ord.count,
        inc:   ord.coprimes[i%uint32(len(ord.coprimes))], // 隨機(jī)選一個(gè)質(zhì)數(shù)
    }
}

func (enum *randomEnum) done() bool {
    return enum.i == enum.count
}

func (enum *randomEnum) next() {
    enum.i++
    enum.pos = (enum.pos + enum.inc) % enum.count // 隨機(jī)下一個(gè)
}

下面舉例說明一下上述算法過程,現(xiàn)假設(shè)nprocs為8,也就是一共有8個(gè)p。

如果第一次隨機(jī)選擇的offset = 6,coprime = 3(3與8互質(zhì),滿足算法要求)的話,則從allp切片中偷取的下標(biāo)順序?yàn)?, 1, 4, 7, 2, 5, 0, 3,計(jì)算過程:

6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3

如果第二次隨機(jī)選擇的offset = 4,coprime = 5的話,則從allp切片中偷取的下標(biāo)順序?yàn)?, 6, 3, 0, 5, 2, 7, 4,計(jì)算過程:

1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

可以看到只要隨機(jī)數(shù)不一樣,偷取p的順序也不一樣,但可以保證經(jīng)過8次循環(huán),每個(gè)p都會被訪問到??梢杂脭?shù)論知識證明,不管nprocs是多少,這個(gè)算法都可以保證經(jīng)過nprocs次循環(huán),每個(gè)p都可以得到訪問。

公平公正

note細(xì)節(jié)

note是go runtime實(shí)現(xiàn)的一次性睡眠和喚醒機(jī)制,一個(gè)線程可以通過調(diào)用notesleep(*note)進(jìn)入睡眠狀態(tài),而另外一個(gè)線程則可以通過notewakeup(*note)把其喚醒。note的底層實(shí)現(xiàn)機(jī)制跟操作系統(tǒng)相關(guān),不同系統(tǒng)使用不同的機(jī)制,比如linux下使用的futex系統(tǒng)調(diào)用,而mac下則是使用的pthread_cond_t條件變量,note對這些底層機(jī)制做了一個(gè)抽象和封裝,這種封裝給擴(kuò)展性帶來了很大的好處,比如當(dāng)睡眠和喚醒功能需要支持新平臺時(shí),只需要在note層增加對特定平臺的支持即可,不需要修改上層的任何代碼。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容