上一節(jié)《GC 對(duì)根對(duì)象掃描實(shí)現(xiàn)的源碼分析》中,我們提到過在GC的時(shí)候,會(huì)對(duì)一些goroutine 棧掃描時(shí)會(huì)掃描前臺(tái)觸發(fā) G 的暫停(suspendG)和恢復(fù)(resumeG)。
// markroot scans the i'th root.
//
// Preemption must be disabled (because this uses a gcWork).
//
// nowritebarrier is only advisory here.
//
//go:nowritebarrier
func markroot(gcw *gcWork, i uint32) {
baseFlushCache := uint32(fixedRootCount)
baseData := baseFlushCache + uint32(work.nFlushCacheRoots)
baseBSS := baseData + uint32(work.nDataRoots)
baseSpans := baseBSS + uint32(work.nBSSRoots)
baseStacks := baseSpans + uint32(work.nSpanRoots)
end := baseStacks + uint32(work.nStackRoots)
// Note: if you add a case here, please also update heapdump.go:dumproots.
switch {
......
default:
var gp *g
if baseStacks <= i && i < end {
gp = allgs[i-baseStacks]
} else {
throw("markroot: bad index")
}
status := readgstatus(gp) // We are not in a scan state
if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
gp.waitsince = work.tstart
}
// scanstack must be done on the system stack in case
// we're trying to scan our own stack.
systemstack(func() {
userG := getg().m.curg
selfScan := gp == userG && readgstatus(userG) == _Grunning
if selfScan {
casgstatus(userG, _Grunning, _Gwaiting)
userG.waitreason = waitReasonGarbageCollectionScan
}
// TODO: suspendG blocks (and spins) until gp
// stops, which may take a while for
// running goroutines. Consider doing this in
// two phases where the first is non-blocking:
// we scan the stacks we can and ask running
// goroutines to scan themselves; and the
// second blocks.
stopped := suspendG(gp)
if stopped.dead {
gp.gcscandone = true
return
}
if gp.gcscandone {
throw("g already scanned")
}
scanstack(gp, gcw)
gp.gcscandone = true
resumeG(stopped)
if selfScan {
casgstatus(userG, _Gwaiting, _Grunning)
}
})
}
}
那么它在暫停和恢復(fù)一個(gè)goroutine時(shí)都做了些什么工作呢,今天我們通過源碼來詳細(xì)看一下。 go version 1.16.2
G的搶占
一個(gè)G可以在任何 安全點(diǎn)(safe-point) 被搶占,目前安全點(diǎn)可以分為以下幾類:
- 阻塞安全點(diǎn)出現(xiàn)在 goroutine 被取消調(diào)度、同步阻塞或系統(tǒng)調(diào)用期間;
- 同步安全點(diǎn)出現(xiàn)在運(yùn)行g(shù)oroutine檢查搶占請(qǐng)求時(shí);
- 異步安全點(diǎn)出現(xiàn)在用戶代碼中的任何指令上,其中G可以安全的暫停且可以保證堆棧和寄存器掃描找到
stack root(這個(gè)很重要,GC掃描開始的地方)。runtime可以通過一個(gè)信號(hào)在一個(gè)異步安全點(diǎn)暫停一個(gè)G。
這里將安全點(diǎn)分為 阻塞安全點(diǎn)、同步安全點(diǎn) 和 異步安全點(diǎn),每種安全點(diǎn)都出現(xiàn)在不同的場(chǎng)景。
阻塞安全點(diǎn)和同步安全點(diǎn),一個(gè)G的CPU狀態(tài)是最小的(無法理解這里最小的意思)。垃圾回收器擁有整個(gè)stack的完整信息。這樣就有可能使用最小的空間重新調(diào)度G,并精確的掃描G的 棧。
同步安全點(diǎn)是通過在重載函數(shù)序言中stack bound check(棧邊界檢查)實(shí)現(xiàn)的。在下一個(gè)同步安全點(diǎn)搶占G,runtime 在G的 stack綁定一個(gè)值,該值將導(dǎo)致下一個(gè) stack bound check 失敗,從而進(jìn)入棧的增漲實(shí)現(xiàn),此實(shí)現(xiàn)將檢測(cè)到它實(shí)際上是搶占并重寫向到搶占處理邏輯。
異步安全點(diǎn)搶占是通過操作系統(tǒng)(如:信號(hào))掛起一個(gè)線程并檢查它的狀態(tài)以確定G是否處于一個(gè)異步安全點(diǎn)。由于掛起線程本身是異步的,它將檢查運(yùn)行的G是否需要被搶占,這將引起一些改變。如果所有條件都滿足,它將調(diào)整信號(hào)上下文,使其看起來像剛剛發(fā)起調(diào)用的asyncPreempt(異步搶占)信號(hào)線程并恢復(fù)此線程。asyncPreempt溢出所有寄存器并進(jìn)入調(diào)度程序。
(另一種方法是搶占信號(hào)處理程序本身。這將允許操作系統(tǒng)保存和恢復(fù)寄存器狀態(tài),運(yùn)行時(shí)只需要知道如何從信號(hào)上下文中提取可能包含指針的寄存器。但是,這將為每個(gè)搶占的G消耗一個(gè)M,并且調(diào)度器本身并不是設(shè)計(jì)為從信號(hào)處理程序運(yùn)行的,因?yàn)樗鼉A向于在搶占路徑中分配內(nèi)存和啟動(dòng)線程)
暫停狀態(tài)
在G的暫停狀態(tài)沒有使用一個(gè)單獨(dú)的變量來表示,而是通過一個(gè) suspendGState 的結(jié)構(gòu)體來表示。
type suspendGState struct {
g *g
dead bool
stopped bool
}
字段意義:
-
g表示當(dāng)前暫停的G,將其放在狀態(tài)結(jié)構(gòu)體中,這樣直需要什么一個(gè)結(jié)構(gòu)體就可以了,不需要再單獨(dú)占用一個(gè)參數(shù)來表示暫停的哪個(gè)G; -
dead表示當(dāng)前G并沒有暫停,而是處于_Gdead狀態(tài)。這個(gè) G 可以以后被復(fù)用,因?yàn)檎{(diào)用者不能一直認(rèn)為它是_Gdead狀態(tài),見G的狀態(tài)流轉(zhuǎn)圖; -
stopped表示通過g.preemptStop將G轉(zhuǎn)換為_Gwaiting狀態(tài),因此負(fù)責(zé)在完成時(shí)做好準(zhǔn)備
暫停G (suspendG)
在安全點(diǎn)暫停G將返回一個(gè) suspendGState 結(jié)構(gòu)體的狀態(tài)值,調(diào)用者在此期間將一直擁有此G的讀權(quán)限,直到恢復(fù) resumeG 為止。
多個(gè)調(diào)用者在同一時(shí)間試圖suspend同一個(gè)G時(shí),它是安全的。goroutine 可以在后續(xù)成功掛起操作之間執(zhí)行。當(dāng)前實(shí)現(xiàn)授予對(duì)G的獨(dú)占訪問權(quán)限,所以多個(gè)調(diào)用者將會(huì)序列化。但是,其目的是授予共享read權(quán)限,所以不要依賴獨(dú)占訪問。
suspend操作必須在系統(tǒng)棧執(zhí)行,并且在M(如果有的話)上的用戶goroutine必須處于一個(gè)可搶占的狀態(tài)。這樣可以防止兩個(gè)goroutine試圖互相掛起并且都處于非搶占狀態(tài)時(shí)出現(xiàn)死鎖。有其它的方式來解決這個(gè)死鎖,但看起來非常的簡(jiǎn)單。
// go:systemstack
func suspendG(gp *g) suspendGState {
// 當(dāng)前暫停的G正是自己,且自己還處于_Grunning,直接拋出異常
if mp := getg().m; mp.curg != nil && readgstatus(mp.curg) == _Grunning {
throw("suspendG from non-preemptible goroutine")
}
const yieldDelay = 10 * 1000
var nextYield int64
stopped := false
var asyncM *m
var asyncGen uint32
var nextPreemptM int64
for i := 0; ; i++ {
switch s := readgstatus(gp); s {
default:
if s&_Gscan != 0 {
break
}
dumpgstatus(gp)
throw("invalid g status")
case _Gdead:
return suspendGState{dead: true}
case _Gcopystack:
case _Gpreempted:
if !casGFromPreempted(gp, _Gpreempted, _Gwaiting) {
break
}
stopped = true
s = _Gwaiting
fallthrough
case _Grunnable, _Gsyscall, _Gwaiting:
if !castogscanstatus(gp, s, s|_Gscan) {
break
}
gp.preemptStop = false
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
return suspendGState{g: gp, stopped: stopped}
case _Grunning:
if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
break
}
// Temporarily block state transitions.
if !castogscanstatus(gp, _Grunning, _Gscanrunning) {
break
}
// Request synchronous preemption.
gp.preemptStop = true
gp.preempt = true
gp.stackguard0 = stackPreempt
// Prepare for asynchronous preemption.
asyncM2 := gp.m
asyncGen2 := atomic.Load(&asyncM2.preemptGen)
needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
asyncM = asyncM2
asyncGen = asyncGen2
casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
now := nanotime()
if now >= nextPreemptM {
nextPreemptM = now + yieldDelay/2
preemptM(asyncM)
}
}
}
if i == 0 {
nextYield = nanotime() + yieldDelay
}
if nanotime() < nextYield {
procyield(10)
} else {
osyield()
nextYield = nanotime() + yieldDelay/2
}
}
整體流程是通過一個(gè) for 方法,不斷的檢查G的狀態(tài)并在合適的機(jī)會(huì)返回suspendGState。
-
_Gdead已處于 dead狀態(tài),直接返回suspendGState{dead: true},注意這時(shí)沒有g(shù); -
_Gcopystack處于復(fù)制stack狀態(tài),當(dāng)前處于棧的擴(kuò)容或縮減,繼續(xù)等待直到完成; -
_Gpreempted可搶占狀態(tài);將G變?yōu)?_Gwaiting狀態(tài),同時(shí)設(shè)置變量stopped=true。繼續(xù)等待; -
_Grunnable,_Gsyscall,_Gwaiting: 標(biāo)記為掃描狀態(tài);取消搶占請(qǐng)求等,返回suspendGState{g: gp, stopped: true}; -
_Grunning這里指非當(dāng)前G的運(yùn)行狀態(tài); 先將_Grunning變?yōu)?_Gscanrunning;設(shè)置同步搶占標(biāo)記并做一些搶占準(zhǔn)備,再恢復(fù)_Grunning狀態(tài);最后再發(fā)送異步搶占
這里提到過幾個(gè)與轉(zhuǎn)換G狀態(tài)的函數(shù),如casfrom_Gscanstatus()、castogscanstatus()、casGFromPreempted()、
恢復(fù)G (resumeG)
所謂恢復(fù)G就是指暫停的撤銷,允許暫停的G從當(dāng)前 安全點(diǎn)(safe-point) 繼續(xù)執(zhí)行。
func resumeG(state suspendGState) {
if state.dead {
// We didn't actually stop anything.
return
}
gp := state.g
switch s := readgstatus(gp); s {
default:
dumpgstatus(gp)
throw("unexpected g status")
case _Grunnable | _Gscan,
_Gwaiting | _Gscan,
_Gsyscall | _Gscan:
casfrom_Gscanstatus(gp, s, s&^_Gscan)
}
if state.stopped {
// We stopped it, so we need to re-schedule it.
ready(gp, 0, true)
}
}
主要是最后一句,調(diào)用 ready() ,將其G設(shè)置為運(yùn)行_Grunnable 狀態(tài),這樣就可以立即在下次被執(zhí)行。
總結(jié)
可以看到對(duì)G的暫停和恢復(fù),其實(shí)是對(duì)G 的狀態(tài)進(jìn)行改變。對(duì)于suspend操作只會(huì)在安全點(diǎn)才會(huì)發(fā)生,它會(huì)一直重試嘗試著修改G的狀態(tài),同時(shí)會(huì)對(duì)一些搶占標(biāo)記做一些修改直到修改成功為止。
參考資料
- https://github.com/golang/go/blob/go1.16.2/src/runtime/preempt.go
- https://blog.haohtml.com/archives/21010
由于個(gè)人能力有限,文章中若有錯(cuò)誤,可以聯(lián)系本人。