深入分析go調(diào)度(四)

以下文章均為拜讀公眾號 源碼游記 的筆記 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect

n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
    _p_ := allp[i]
    if _p_ == nil {
        // This can happen if procresize has grown
        // allp but not yet created new Ps.
        continue
    }
    pd := &_p_.sysmontick // last tick observed by sysmon
    s := _p_.status
    sysretake := false
    if s == _Prunning || s == _Psyscall {
        // Preempt G if it's running for too long.
        t := int64(_p_.schedtick) // schedtick 調(diào)度循環(huán)次數(shù),每次調(diào)度循環(huán)完成+1
        if int64(pd.schedtick) != t { // 如果已經(jīng)切換,那就更新上次調(diào)度的數(shù)
            pd.schedtick = uint32(t)
            pd.schedwhen = now
        } else if pd.schedwhen+forcePreemptNS <= now { // 如果超過規(guī)定時間
            // 搶占調(diào)度
            preemptone(_p_)
            // In case of syscall, preemptone() doesn't
            // work, because there is no M wired to P.
            sysretake = true
        }
    }
    if s == _Psyscall {
    .... // 系統(tǒng)調(diào)用搶占代碼
}
}
unlock(&allpLock)
return uint32(n)

}


由上可見,如果當(dāng)前G運(yùn)行時間過長(通過p上的pd判斷),那么就發(fā)起搶占 `preemptone(_p_)`

而神奇的preemptone宛如一個彬彬有禮的紳士,沒坐任何暴力行為

```go
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    // 設(shè)置搶占調(diào)度位 為true 
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    gp.stackguard0 = stackPreempt // 設(shè)置stackguard0為一個特別大的數(shù)

    ... // 信號搶占,暫時忽略

    return true
}

到目前為止,負(fù)責(zé)搶占調(diào)度的retake已經(jīng)完事收工,但是我們沒有看到任何暴力行為,只是告訴g你需要讓位了。(ps.這也就是我們稱為協(xié)作式調(diào)度的原因)

響應(yīng)搶占

這個是由newstack完成的

具體調(diào)度流morestack_noctxt()->morestack()->newstack() ,

// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the GC is trying to stop this g then it will set preemptscan to true.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
    thisg := getg() // thisg = g0
    ......
    // 這行代碼獲取g0.m.curg,也就是需要擴(kuò)?;蝽憫?yīng)搶占的goroutine
    // 對于我們這個例子gp = main goroutine
    gp := thisg.m.curg
    ......
    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below.
    //檢查g.stackguard0是否被設(shè)置為stackPreempt
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

    // Be conservative about where we preempt.
    // We are interested in preempting user Go code, not runtime code.
    // If we're holding locks, mallocing, or preemption is disabled, don't
    // preempt.
    // This check is very early in newstack so that even the status change
    // from Grunning to Gwaiting and back doesn't happen in this case.
    // That status change by itself can be viewed as a small preemption,
    // because the GC might change Gwaiting to Gscanwaiting, and then
    // this goroutine has to wait for the GC to finish before continuing.
    // If the GC is in some way dependent on this goroutine (for example,
    // it needs a lock held by the goroutine), that small preemption turns
    // into a real deadlock.
    if preempt {
        //檢查被搶占goroutine的狀態(tài)
        if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" ||  thisg.m.p.ptr().status != _Prunning {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            //還原stackguard0為正常值,表示我們已經(jīng)處理過搶占請求了
            gp.stackguard0 = gp.stack.lo + _StackGuard
           
            //不搶占,調(diào)用gogo繼續(xù)運(yùn)行當(dāng)前這個g,不需要調(diào)用schedule函數(shù)去挑選另一個goroutine
            gogo(&gp.sched) // never return
        }
    }

    //省略的代碼做了些其它檢查所以這里才有兩個同樣的判斷

    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }
        ......
        //下面開始響應(yīng)搶占請求
        // Act like goroutine called runtime.Gosched.
        //設(shè)置gp的狀態(tài),省略的代碼在處理gc時把gp的狀態(tài)修改成了_Gwaiting
        casgstatus(gp, _Gwaiting, _Grunning)
       
        //調(diào)用gopreempt_m把gp切換出去
        gopreempt_m(gp) // never return
    }
    ......
}


func gopreempt_m(gp *g) {
    if trace.enabled {
        traceGoPreempt()
    }
    goschedImpl(gp)
}


可見這里我們判斷如果需要搶占,那么就切換狀態(tài)為調(diào)用 goschedimpl完成調(diào)度。

響應(yīng)搶占的細(xì)節(jié)之棧擴(kuò)容

我們上面看到,響應(yīng)搶占是在檢查棧的時候才判斷調(diào)度,那么我們就會有個疑問,什么時候檢查棧

go設(shè)計(jì):在函數(shù)調(diào)用的時候會插入棧擴(kuò)容的檢查代碼

以以下代碼為例

package main

import "fmt"

func sum(a, b int) int {
    a2 := a * a
    b2 := b * b
    c := a2 + b2

    fmt.Println(c)

    return c
}

func main() {
    sum(1, 2)
}

運(yùn)行以下命令進(jìn)入gdb,結(jié)果如下

go build -o q3 q3.go
gdb q3
b q3.go:15
r
disass

<img src="/Users/gaoke/Library/Application Support/typora-user-images/image-20200607190207907.png" alt="image-20200607190207907" style="zoom:90%;" />

可見函數(shù)尾部對morestack_noctxt進(jìn)行了調(diào)用,而跳轉(zhuǎn)到該函數(shù)的邏輯是

=> 0x0000000000490eb0 <+0>: mov    %fs:0xfffffffffffffff8,%rcx #main函數(shù)第一條指令,rcx = g
   0x0000000000490eb9 <+9>: cmp    0x10(%rcx),%rsp  #0x10(%rcx) 為 g.stackguard0
   0x0000000000490ebd <+13>:    jbe    0x490eed <main.main+61>

我們知道g結(jié)構(gòu)便宜16個字節(jié)為stackguard0。那么第二行的意思就是在比較棧頂寄存器rsp的值是否比stackguard0的值小,如果rsp的值更小,說明當(dāng)前g的棧要用完了,有溢出風(fēng)險,需要擴(kuò)棧。

回憶下棧結(jié)構(gòu)

<img src="http://picgo.vipkk.work/20200607190850.png" alt="image-20200607190850026" style="zoom:50%;" />

其中正常情況下 stackguard0 = lo+stackguard,而lo有所棧底,所以這里如果rsp比 stackguard0小的話,那就需要進(jìn)行擴(kuò)容。在設(shè)置了搶占調(diào)度的時候,那么stackguard0 是一個特別大的值,rsp必小,所以就會進(jìn)行調(diào)用morestack_noctxt。

當(dāng)執(zhí)行完之后,那么就繼續(xù)調(diào)用main恢復(fù)執(zhí)行?。?!

那么在協(xié)作式調(diào)度的時候,如果一個函數(shù)里沒有函數(shù)調(diào)用,那么它也就永遠(yuǎn)不會被搶占,因?yàn)槲覀兩厦娣治隽耍?code>morestack_noctxt是插在函數(shù)序言里的,如果沒有的話,那就永遠(yuǎn)不會檢查搶占了。

如果沒有函數(shù)調(diào)用,那就永遠(yuǎn)阻塞了

對系統(tǒng)調(diào)用過長發(fā)起搶占

回到retake

// 搶占調(diào)度檢測
func retake(now int64) uint32 {
        ...
  
 if s == _Psyscall { // 如果是系統(tǒng)調(diào)用的話
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                /**
                進(jìn)入條件
                1. 當(dāng)前為空
                2. 有空閑的p (有人工作 搶占調(diào)度沒必要)
                3. 未超過10ms
                 */
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_) // 尋找下一個新的m來讓p操作
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    
  ...
}

根據(jù)retake函數(shù)的代碼,只要滿足下面三個條件中的任意一個就需要對處于_Psyscall 狀態(tài)的p進(jìn)行搶占:

  1. p的運(yùn)行隊(duì)列里面有等待運(yùn)行的goroutine。這用來保證當(dāng)前p的本地運(yùn)行隊(duì)列中的goroutine得到及時的調(diào)度,因?yàn)樵損對應(yīng)的工作線程正處于系統(tǒng)調(diào)用之中,無法調(diào)度隊(duì)列中g(shù)oroutine,所以需要尋找另外一個工作線程來接管這個p從而達(dá)到調(diào)度這些goroutine的目的;
  2. 沒有空閑的p。表示其它所有的p都已經(jīng)與工作線程綁定且正忙于執(zhí)行g(shù)o代碼,這說明系統(tǒng)比較繁忙,所以需要搶占當(dāng)前正處于系統(tǒng)調(diào)用之中而實(shí)際上系統(tǒng)調(diào)用并不需要的這個p并把它分配給其它工作線程去調(diào)度其它goroutine。
  3. 從上一次監(jiān)控線程觀察到p對應(yīng)的m處于系統(tǒng)調(diào)用之中到現(xiàn)在已經(jīng)超過10了毫秒。這表示只要系統(tǒng)調(diào)用超時,就對其搶占,而不管是否真的有g(shù)oroutine需要調(diào)度,這樣保證sysmon線程不至于覺得無事可做(sysmon線程會判斷retake函數(shù)的返回值,如果為0,表示retake并未做任何搶占,所以會覺得沒啥事情做)而休眠太長時間最終會降低sysmon監(jiān)控的實(shí)時性。

綜上,系統(tǒng)調(diào)用因?yàn)樯钕菰趏s中,go runtime鞭長莫及,所以在設(shè)置了搶占位之后,p還做了另外一件事,就是尋找下一個新的m來和p合作,脫離之前的m。

// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    // 如果當(dāng)前p運(yùn)行隊(duì)列不為空,那么它還有任務(wù)要做,所以啟動一個m來工作
    if !runqempty(_p_) || sched.runqsize != 0 {
        startm(_p_, false)
        return
    }
    // if it has GC work, start it straight away
    // 如果當(dāng)前正在進(jìn)行g(shù)c,也啟動(TODO 不解)
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 這里的意思是說,如果其他人(m or p)都在干活,證明現(xiàn)在比較忙。那么這個p也就不能歇著。
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock) // 接下來的操作涉及全局
    if sched.gcwaiting != 0 {
        _p_.status = _Pgcstop
        sched.stopwait--
        if sched.stopwait == 0 {
            notewakeup(&sched.stopnote)
        }
        unlock(&sched.lock)
        return
    }
    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
        sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {
            notewakeup(&sched.safePointNote)
        }
    }
    // 全局隊(duì)列不為空
    if sched.runqsize != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    // 也不能都歇著,需要人等網(wǎng)絡(luò),其他人不干 那就我把。
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    if when := nobarrierWakeTime(_p_); when != 0 {
        wakeNetPoller(when)
    }
    pidleput(_p_)
    unlock(&sched.lock)
}

handoffp函數(shù)流程比較簡單,它的主要任務(wù)是通過各種條件判斷是否需要啟動工作線程來接管p,如果不需要則把p放入P的全局空閑隊(duì)列。

從handoffp的代碼可以看出,在如下幾種情況下則需要調(diào)用我們已經(jīng)分析過的startm函數(shù)啟動新的工作線程出來接管p

  1. p的本地運(yùn)行隊(duì)列或全局運(yùn)行隊(duì)列里面有待運(yùn)行的goroutine;
  2. 需要幫助gc完成標(biāo)記工作;
  3. 系統(tǒng)比較忙,所有其它p都在運(yùn)行g(shù)oroutine,需要幫忙;
  4. 所有其它P都已經(jīng)處于空閑狀態(tài),如果需要監(jiān)控網(wǎng)絡(luò)連接讀寫事件,則需要啟動新的m來poll網(wǎng)絡(luò)連接。

到此,sysmon監(jiān)控線程對處于系統(tǒng)調(diào)用之中的p的搶占就已經(jīng)完成。

系統(tǒng)調(diào)用

TODO

簡單說

  1. 解綁
  2. 恢復(fù)的時候嘗試重新綁定,綁定不上就去找新的。

系統(tǒng)調(diào)用匯編實(shí)現(xiàn)

// func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, err uintptr)
TEXT ·Syscall6(SB),NOSPLIT,$0-80
  // 預(yù)處理
    CALL    runtime·entersyscall(SB)
    // 系統(tǒng)調(diào)用
    MOVQ    a1+8(FP), DI
    MOVQ    a2+16(FP), SI
    MOVQ    a3+24(FP), DX
    MOVQ    a4+32(FP), R10
    MOVQ    a5+40(FP), R8
    MOVQ    a6+48(FP), R9
    MOVQ    trap+0(FP), AX  // syscall entry
    SYSCALL
    CMPQ    AX, $0xfffffffffffff001
    JLS ok6
    MOVQ    $-1, r1+56(FP)
    MOVQ    $0, r2+64(FP)
    NEGQ    AX
    MOVQ    AX, err+72(FP)
    // 退出處理
    CALL    runtime·exitsyscall(SB)
    RET
ok6:
    MOVQ    AX, r1+56(FP)
    MOVQ    DX, r2+64(FP)
    MOVQ    $0, err+72(FP)
    // 退出處理
    CALL    runtime·exitsyscall(SB)
    RET

系統(tǒng)調(diào)用做了三件事

  1. 預(yù)處理: entersyscall
  2. 處理 syscall
  3. 退出處理: exitsyscall

entersyscall

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    // 需要禁止g被搶占
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    // entersyscall中不能調(diào)用任何導(dǎo)致棧增長或者分裂的函數(shù)
    _g_.stackguard0 = stackPreempt
    // 設(shè)置 throwsplit,在newstack中,如果發(fā)現(xiàn)throwsplit是true,揮之即crash
    /**
    if thisg.m.curg.throwsplit{
        throw("runtime: stack split at bad time")
    }
    */
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    // 保存現(xiàn)場,在syscall之后 依據(jù)這些數(shù)據(jù)恢復(fù)現(xiàn)場
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
  // 修改g的狀態(tài)
    casgstatus(_g_, _Grunning, _Gsyscall)
    
  ...
    
  // 解綁處理
    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    _g_.m.mcache = nil
    pp := _g_.m.p.ptr()
    pp.m = 0
    _g_.m.oldp.set(pp) // 優(yōu)先綁定
    _g_.m.p = 0
    // 修改p的狀態(tài)
    atomic.Store(&pp.status, _Psyscall)
    if sched.gcwaiting != 0 {
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    _g_.m.locks--
}

exitsyscall

源碼先不補(bǔ)充,這塊還需要再復(fù)習(xí)

大概工作如下

  1. 嘗試綁定之前的p
  2. 失敗之后,從全局找一個p來執(zhí)行
  3. 如果找不到,那就把g放到全局隊(duì)列,自身stop

信號搶占

go在1.14中引入了信號搶占,這里也分析下

發(fā)起搶占

func preemptone(_p_ *p) bool {
  ...
  
  // Request an async preemption of this P.
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        // 搶占m,信號調(diào)度
        preemptM(mp)
    }
  
  ...
}


func preemptM(mp *m) {
    signalM(mp, sigPreempt)
}

// signalM sends a signal to mp.
func signalM(mp *m, sig int) {
    tgkill(getpid(), int(mp.procid), sig)
}

go在設(shè)置搶占標(biāo)志位的時候,同時發(fā)送了一個搶占信號。

信號接收

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    if sig == sigPreempt { // 如果是搶占信息 ,go1.14加入
        // Might be a preemption signal.
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we
        // still let it through to the application.
    }
}

信號處理

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
        // Inject a call to asyncPreempt.
        // 執(zhí)行搶占的關(guān)鍵方法
        ctxt.pushCall(funcPC(asyncPreempt))
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
}
        

pushcall 直接修改了g的pc,從而恢復(fù)的時候執(zhí)行的地方不一樣。

func asyncPreempt() // 編譯器實(shí)現(xiàn)

//go:nosplit
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    // 真正的搶占
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
    gp.asyncSafePoint = false
}

https://changkun.de/golang/zh-cn/part2runtime/ch06sched/preemption/

  1. https://go-review.googlesource.com/c/go/+/201762
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容