goroutine的暫停和恢復(fù)源碼剖析

上一節(jié)《GC 對(duì)根對(duì)象掃描實(shí)現(xiàn)的源碼分析》中,我們提到過在GC的時(shí)候,會(huì)對(duì)一些goroutine 棧掃描時(shí)會(huì)掃描前臺(tái)觸發(fā) G 的暫停(suspendG)和恢復(fù)(resumeG)。

// markroot scans the i'th root.
//
// Preemption must be disabled (because this uses a gcWork).
//
// nowritebarrier is only advisory here.
//
//go:nowritebarrier
func markroot(gcw *gcWork, i uint32) {
    baseFlushCache := uint32(fixedRootCount)
    baseData := baseFlushCache + uint32(work.nFlushCacheRoots)
    baseBSS := baseData + uint32(work.nDataRoots)
    baseSpans := baseBSS + uint32(work.nBSSRoots)
    baseStacks := baseSpans + uint32(work.nSpanRoots)
    end := baseStacks + uint32(work.nStackRoots)

    // Note: if you add a case here, please also update heapdump.go:dumproots.
    switch {
        ......

    default:
        var gp *g
        if baseStacks <= i && i < end {
            gp = allgs[i-baseStacks]
        } else {
            throw("markroot: bad index")
        }

        status := readgstatus(gp) // We are not in a scan state
        if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
            gp.waitsince = work.tstart
        }

        // scanstack must be done on the system stack in case
        // we're trying to scan our own stack.
        systemstack(func() {
            userG := getg().m.curg
            selfScan := gp == userG && readgstatus(userG) == _Grunning
            if selfScan {
                casgstatus(userG, _Grunning, _Gwaiting)
                userG.waitreason = waitReasonGarbageCollectionScan
            }

            // TODO: suspendG blocks (and spins) until gp
            // stops, which may take a while for
            // running goroutines. Consider doing this in
            // two phases where the first is non-blocking:
            // we scan the stacks we can and ask running
            // goroutines to scan themselves; and the
            // second blocks.
            stopped := suspendG(gp)
            if stopped.dead {
                gp.gcscandone = true
                return
            }
            if gp.gcscandone {
                throw("g already scanned")
            }
            scanstack(gp, gcw)
            gp.gcscandone = true
            resumeG(stopped)

            if selfScan {
                casgstatus(userG, _Gwaiting, _Grunning)
            }
        })
    }
}

那么它在暫停和恢復(fù)一個(gè)goroutine時(shí)都做了些什么工作呢,今天我們通過源碼來詳細(xì)看一下。 go version 1.16.2

G的搶占

一個(gè)G可以在任何 安全點(diǎn)(safe-point) 被搶占,目前安全點(diǎn)可以分為以下幾類:

  1. 阻塞安全點(diǎn)出現(xiàn)在 goroutine 被取消調(diào)度、同步阻塞或系統(tǒng)調(diào)用期間;
  2. 同步安全點(diǎn)出現(xiàn)在運(yùn)行g(shù)oroutine檢查搶占請(qǐng)求時(shí);
  3. 異步安全點(diǎn)出現(xiàn)在用戶代碼中的任何指令上,其中G可以安全的暫停且可以保證堆棧和寄存器掃描找到 stack root(這個(gè)很重要,GC掃描開始的地方)。runtime 可以通過一個(gè)信號(hào)在一個(gè)異步安全點(diǎn)暫停一個(gè)G。

這里將安全點(diǎn)分為 阻塞安全點(diǎn)、同步安全點(diǎn)異步安全點(diǎn),每種安全點(diǎn)都出現(xiàn)在不同的場(chǎng)景。

阻塞安全點(diǎn)和同步安全點(diǎn),一個(gè)G的CPU狀態(tài)是最小的(無法理解這里最小的意思)。垃圾回收器擁有整個(gè)stack的完整信息。這樣就有可能使用最小的空間重新調(diào)度G,并精確的掃描G的 棧。

同步安全點(diǎn)是通過在重載函數(shù)序言中stack bound check(棧邊界檢查)實(shí)現(xiàn)的。在下一個(gè)同步安全點(diǎn)搶占G,runtime 在G的 stack綁定一個(gè)值,該值將導(dǎo)致下一個(gè) stack bound check 失敗,從而進(jìn)入棧的增漲實(shí)現(xiàn),此實(shí)現(xiàn)將檢測(cè)到它實(shí)際上是搶占并重寫向到搶占處理邏輯。

異步安全點(diǎn)搶占是通過操作系統(tǒng)(如:信號(hào))掛起一個(gè)線程并檢查它的狀態(tài)以確定G是否處于一個(gè)異步安全點(diǎn)。由于掛起線程本身是異步的,它將檢查運(yùn)行的G是否需要被搶占,這將引起一些改變。如果所有條件都滿足,它將調(diào)整信號(hào)上下文,使其看起來像剛剛發(fā)起調(diào)用的asyncPreempt(異步搶占)信號(hào)線程并恢復(fù)此線程。asyncPreempt溢出所有寄存器并進(jìn)入調(diào)度程序。

(另一種方法是搶占信號(hào)處理程序本身。這將允許操作系統(tǒng)保存和恢復(fù)寄存器狀態(tài),運(yùn)行時(shí)只需要知道如何從信號(hào)上下文中提取可能包含指針的寄存器。但是,這將為每個(gè)搶占的G消耗一個(gè)M,并且調(diào)度器本身并不是設(shè)計(jì)為從信號(hào)處理程序運(yùn)行的,因?yàn)樗鼉A向于在搶占路徑中分配內(nèi)存和啟動(dòng)線程)

暫停狀態(tài)

在G的暫停狀態(tài)沒有使用一個(gè)單獨(dú)的變量來表示,而是通過一個(gè) suspendGState 的結(jié)構(gòu)體來表示。

type suspendGState struct {
    g *g
    dead bool
    stopped bool
}

字段意義:

  • g 表示當(dāng)前暫停的G,將其放在狀態(tài)結(jié)構(gòu)體中,這樣直需要什么一個(gè)結(jié)構(gòu)體就可以了,不需要再單獨(dú)占用一個(gè)參數(shù)來表示暫停的哪個(gè)G;
  • dead 表示當(dāng)前G并沒有暫停,而是處于 _Gdead 狀態(tài)。這個(gè) G 可以以后被復(fù)用,因?yàn)檎{(diào)用者不能一直認(rèn)為它是 _Gdead 狀態(tài),見G的狀態(tài)流轉(zhuǎn)圖;
  • stopped 表示通過 g.preemptStop 將G轉(zhuǎn)換為 _Gwaiting 狀態(tài),因此負(fù)責(zé)在完成時(shí)做好準(zhǔn)備

暫停G (suspendG)

在安全點(diǎn)暫停G將返回一個(gè) suspendGState 結(jié)構(gòu)體的狀態(tài)值,調(diào)用者在此期間將一直擁有此G的讀權(quán)限,直到恢復(fù) resumeG 為止。

多個(gè)調(diào)用者在同一時(shí)間試圖suspend同一個(gè)G時(shí),它是安全的。goroutine 可以在后續(xù)成功掛起操作之間執(zhí)行。當(dāng)前實(shí)現(xiàn)授予對(duì)G的獨(dú)占訪問權(quán)限,所以多個(gè)調(diào)用者將會(huì)序列化。但是,其目的是授予共享read權(quán)限,所以不要依賴獨(dú)占訪問。

suspend操作必須在系統(tǒng)棧執(zhí)行,并且在M(如果有的話)上的用戶goroutine必須處于一個(gè)可搶占的狀態(tài)。這樣可以防止兩個(gè)goroutine試圖互相掛起并且都處于非搶占狀態(tài)時(shí)出現(xiàn)死鎖。有其它的方式來解決這個(gè)死鎖,但看起來非常的簡(jiǎn)單。

// go:systemstack
func suspendG(gp *g) suspendGState {
    // 當(dāng)前暫停的G正是自己,且自己還處于_Grunning,直接拋出異常
    if mp := getg().m; mp.curg != nil && readgstatus(mp.curg) == _Grunning {
        throw("suspendG from non-preemptible goroutine")
    }

    const yieldDelay = 10 * 1000
    var nextYield int64

    stopped := false
    var asyncM *m
    var asyncGen uint32
    var nextPreemptM int64
    for i := 0; ; i++ {
        switch s := readgstatus(gp); s {
        default:
            if s&_Gscan != 0 {
                break
            }

            dumpgstatus(gp)
            throw("invalid g status")
        case _Gdead:
            return suspendGState{dead: true}
        case _Gcopystack:

        case _Gpreempted:
            if !casGFromPreempted(gp, _Gpreempted, _Gwaiting) {
                break
            }

            stopped = true

            s = _Gwaiting
            fallthrough
        case _Grunnable, _Gsyscall, _Gwaiting:
            if !castogscanstatus(gp, s, s|_Gscan) {
                break
            }

            gp.preemptStop = false
            gp.preempt = false
            gp.stackguard0 = gp.stack.lo + _StackGuard
            return suspendGState{g: gp, stopped: stopped}
        case _Grunning:
            if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
                break
            }

            // Temporarily block state transitions.
            if !castogscanstatus(gp, _Grunning, _Gscanrunning) {
                break
            }

            // Request synchronous preemption.
            gp.preemptStop = true
            gp.preempt = true
            gp.stackguard0 = stackPreempt

            // Prepare for asynchronous preemption.
            asyncM2 := gp.m
            asyncGen2 := atomic.Load(&asyncM2.preemptGen)
            needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
            asyncM = asyncM2
            asyncGen = asyncGen2

            casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)

            if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
                now := nanotime()
                if now >= nextPreemptM {
                    nextPreemptM = now + yieldDelay/2
                    preemptM(asyncM)
                }
            }
        }

        if i == 0 {
            nextYield = nanotime() + yieldDelay
        }
        if nanotime() < nextYield {
            procyield(10)
        } else {
            osyield()
            nextYield = nanotime() + yieldDelay/2
        }
    }

整體流程是通過一個(gè) for 方法,不斷的檢查G的狀態(tài)并在合適的機(jī)會(huì)返回suspendGState。

  • _Gdead 已處于 dead狀態(tài),直接返回 suspendGState{dead: true},注意這時(shí)沒有g(shù);
  • _Gcopystack 處于復(fù)制stack狀態(tài),當(dāng)前處于棧的擴(kuò)容或縮減,繼續(xù)等待直到完成;
  • _Gpreempted 可搶占狀態(tài);將G變?yōu)?_Gwaiting 狀態(tài),同時(shí)設(shè)置變量 stopped=true。繼續(xù)等待;
  • _Grunnable, _Gsyscall, _Gwaiting : 標(biāo)記為掃描狀態(tài);取消搶占請(qǐng)求等,返回 suspendGState{g: gp, stopped: true};
  • _Grunning 這里指非當(dāng)前G的運(yùn)行狀態(tài); 先將 _Grunning 變?yōu)?_Gscanrunning;設(shè)置同步搶占標(biāo)記并做一些搶占準(zhǔn)備,再恢復(fù) _Grunning 狀態(tài);最后再發(fā)送異步搶占

這里提到過幾個(gè)與轉(zhuǎn)換G狀態(tài)的函數(shù),如casfrom_Gscanstatus()、castogscanstatus()casGFromPreempted()、

恢復(fù)G (resumeG)

所謂恢復(fù)G就是指暫停的撤銷,允許暫停的G從當(dāng)前 安全點(diǎn)(safe-point) 繼續(xù)執(zhí)行。

func resumeG(state suspendGState) {
    if state.dead {
        // We didn't actually stop anything.
        return
    }
    gp := state.g
    switch s := readgstatus(gp); s {
    default:
        dumpgstatus(gp)
        throw("unexpected g status")

    case _Grunnable | _Gscan,
        _Gwaiting | _Gscan,
        _Gsyscall | _Gscan:
        casfrom_Gscanstatus(gp, s, s&^_Gscan)
    }

    if state.stopped {
        // We stopped it, so we need to re-schedule it.
        ready(gp, 0, true)
    }
}

主要是最后一句,調(diào)用 ready() ,將其G設(shè)置為運(yùn)行_Grunnable 狀態(tài),這樣就可以立即在下次被執(zhí)行。

總結(jié)

可以看到對(duì)G的暫停和恢復(fù),其實(shí)是對(duì)G 的狀態(tài)進(jìn)行改變。對(duì)于suspend操作只會(huì)在安全點(diǎn)才會(huì)發(fā)生,它會(huì)一直重試嘗試著修改G的狀態(tài),同時(shí)會(huì)對(duì)一些搶占標(biāo)記做一些修改直到修改成功為止。

參考資料

由于個(gè)人能力有限,文章中若有錯(cuò)誤,可以聯(lián)系本人。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容