以下文章均為拜讀公眾號 源碼游記 的筆記 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick // last tick observed by sysmon
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick) // schedtick 調(diào)度循環(huán)次數(shù),每次調(diào)度循環(huán)完成+1
if int64(pd.schedtick) != t { // 如果已經(jīng)切換,那就更新上次調(diào)度的數(shù)
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // 如果超過規(guī)定時間
// 搶占調(diào)度
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
if s == _Psyscall {
.... // 系統(tǒng)調(diào)用搶占代碼
}
}
unlock(&allpLock)
return uint32(n)
}
由上可見,如果當(dāng)前G運(yùn)行時間過長(通過p上的pd判斷),那么就發(fā)起搶占 `preemptone(_p_)`
而神奇的preemptone宛如一個彬彬有禮的紳士,沒坐任何暴力行為
```go
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 設(shè)置搶占調(diào)度位 為true
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt // 設(shè)置stackguard0為一個特別大的數(shù)
... // 信號搶占,暫時忽略
return true
}
到目前為止,負(fù)責(zé)搶占調(diào)度的retake已經(jīng)完事收工,但是我們沒有看到任何暴力行為,只是告訴g你需要讓位了。(ps.這也就是我們稱為協(xié)作式調(diào)度的原因)
響應(yīng)搶占
這個是由newstack完成的
具體調(diào)度流
morestack_noctxt()->morestack()->newstack(),
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the GC is trying to stop this g then it will set preemptscan to true.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
thisg := getg() // thisg = g0
......
// 這行代碼獲取g0.m.curg,也就是需要擴(kuò)?;蝽憫?yīng)搶占的goroutine
// 對于我們這個例子gp = main goroutine
gp := thisg.m.curg
......
// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
//檢查g.stackguard0是否被設(shè)置為stackPreempt
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
// Be conservative about where we preempt.
// We are interested in preempting user Go code, not runtime code.
// If we're holding locks, mallocing, or preemption is disabled, don't
// preempt.
// This check is very early in newstack so that even the status change
// from Grunning to Gwaiting and back doesn't happen in this case.
// That status change by itself can be viewed as a small preemption,
// because the GC might change Gwaiting to Gscanwaiting, and then
// this goroutine has to wait for the GC to finish before continuing.
// If the GC is in some way dependent on this goroutine (for example,
// it needs a lock held by the goroutine), that small preemption turns
// into a real deadlock.
if preempt {
//檢查被搶占goroutine的狀態(tài)
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
//還原stackguard0為正常值,表示我們已經(jīng)處理過搶占請求了
gp.stackguard0 = gp.stack.lo + _StackGuard
//不搶占,調(diào)用gogo繼續(xù)運(yùn)行當(dāng)前這個g,不需要調(diào)用schedule函數(shù)去挑選另一個goroutine
gogo(&gp.sched) // never return
}
}
//省略的代碼做了些其它檢查所以這里才有兩個同樣的判斷
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}
......
//下面開始響應(yīng)搶占請求
// Act like goroutine called runtime.Gosched.
//設(shè)置gp的狀態(tài),省略的代碼在處理gc時把gp的狀態(tài)修改成了_Gwaiting
casgstatus(gp, _Gwaiting, _Grunning)
//調(diào)用gopreempt_m把gp切換出去
gopreempt_m(gp) // never return
}
......
}
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
可見這里我們判斷如果需要搶占,那么就切換狀態(tài)為調(diào)用 goschedimpl完成調(diào)度。
響應(yīng)搶占的細(xì)節(jié)之棧擴(kuò)容
我們上面看到,響應(yīng)搶占是在檢查棧的時候才判斷調(diào)度,那么我們就會有個疑問,什么時候檢查棧
go設(shè)計(jì):在函數(shù)調(diào)用的時候會插入棧擴(kuò)容的檢查代碼
以以下代碼為例
package main
import "fmt"
func sum(a, b int) int {
a2 := a * a
b2 := b * b
c := a2 + b2
fmt.Println(c)
return c
}
func main() {
sum(1, 2)
}
運(yùn)行以下命令進(jìn)入gdb,結(jié)果如下
go build -o q3 q3.go
gdb q3
b q3.go:15
r
disass
<img src="/Users/gaoke/Library/Application Support/typora-user-images/image-20200607190207907.png" alt="image-20200607190207907" style="zoom:90%;" />
可見函數(shù)尾部對morestack_noctxt進(jìn)行了調(diào)用,而跳轉(zhuǎn)到該函數(shù)的邏輯是
=> 0x0000000000490eb0 <+0>: mov %fs:0xfffffffffffffff8,%rcx #main函數(shù)第一條指令,rcx = g
0x0000000000490eb9 <+9>: cmp 0x10(%rcx),%rsp #0x10(%rcx) 為 g.stackguard0
0x0000000000490ebd <+13>: jbe 0x490eed <main.main+61>
我們知道g結(jié)構(gòu)便宜16個字節(jié)為stackguard0。那么第二行的意思就是在比較棧頂寄存器rsp的值是否比stackguard0的值小,如果rsp的值更小,說明當(dāng)前g的棧要用完了,有溢出風(fēng)險,需要擴(kuò)棧。
回憶下棧結(jié)構(gòu)
<img src="http://picgo.vipkk.work/20200607190850.png" alt="image-20200607190850026" style="zoom:50%;" />
其中正常情況下 stackguard0 = lo+stackguard,而lo有所棧底,所以這里如果rsp比 stackguard0小的話,那就需要進(jìn)行擴(kuò)容。在設(shè)置了搶占調(diào)度的時候,那么stackguard0 是一個特別大的值,rsp必小,所以就會進(jìn)行調(diào)用morestack_noctxt。
當(dāng)執(zhí)行完之后,那么就繼續(xù)調(diào)用main恢復(fù)執(zhí)行?。?!
那么在協(xié)作式調(diào)度的時候,如果一個函數(shù)里沒有函數(shù)調(diào)用,那么它也就永遠(yuǎn)不會被搶占,因?yàn)槲覀兩厦娣治隽耍?code>morestack_noctxt是插在函數(shù)序言里的,如果沒有的話,那就永遠(yuǎn)不會檢查搶占了。
如果沒有函數(shù)調(diào)用,那就永遠(yuǎn)阻塞了
對系統(tǒng)調(diào)用過長發(fā)起搶占
回到retake
// 搶占調(diào)度檢測
func retake(now int64) uint32 {
...
if s == _Psyscall { // 如果是系統(tǒng)調(diào)用的話
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
/**
進(jìn)入條件
1. 當(dāng)前為空
2. 有空閑的p (有人工作 搶占調(diào)度沒必要)
3. 未超過10ms
*/
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_) // 尋找下一個新的m來讓p操作
}
incidlelocked(1)
lock(&allpLock)
}
...
}
根據(jù)retake函數(shù)的代碼,只要滿足下面三個條件中的任意一個就需要對處于_Psyscall 狀態(tài)的p進(jìn)行搶占:
- p的運(yùn)行隊(duì)列里面有等待運(yùn)行的goroutine。這用來保證當(dāng)前p的本地運(yùn)行隊(duì)列中的goroutine得到及時的調(diào)度,因?yàn)樵損對應(yīng)的工作線程正處于系統(tǒng)調(diào)用之中,無法調(diào)度隊(duì)列中g(shù)oroutine,所以需要尋找另外一個工作線程來接管這個p從而達(dá)到調(diào)度這些goroutine的目的;
- 沒有空閑的p。表示其它所有的p都已經(jīng)與工作線程綁定且正忙于執(zhí)行g(shù)o代碼,這說明系統(tǒng)比較繁忙,所以需要搶占當(dāng)前正處于系統(tǒng)調(diào)用之中而實(shí)際上系統(tǒng)調(diào)用并不需要的這個p并把它分配給其它工作線程去調(diào)度其它goroutine。
- 從上一次監(jiān)控線程觀察到p對應(yīng)的m處于系統(tǒng)調(diào)用之中到現(xiàn)在已經(jīng)超過10了毫秒。這表示只要系統(tǒng)調(diào)用超時,就對其搶占,而不管是否真的有g(shù)oroutine需要調(diào)度,這樣保證sysmon線程不至于覺得無事可做(sysmon線程會判斷retake函數(shù)的返回值,如果為0,表示retake并未做任何搶占,所以會覺得沒啥事情做)而休眠太長時間最終會降低sysmon監(jiān)控的實(shí)時性。
綜上,系統(tǒng)調(diào)用因?yàn)樯钕菰趏s中,go runtime鞭長莫及,所以在設(shè)置了搶占位之后,p還做了另外一件事,就是尋找下一個新的m來和p合作,脫離之前的m。
// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// if it has local work, start it straight away
// 如果當(dāng)前p運(yùn)行隊(duì)列不為空,那么它還有任務(wù)要做,所以啟動一個m來工作
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// if it has GC work, start it straight away
// 如果當(dāng)前正在進(jìn)行g(shù)c,也啟動(TODO 不解)
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
// 這里的意思是說,如果其他人(m or p)都在干活,證明現(xiàn)在比較忙。那么這個p也就不能歇著。
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(_p_, true)
return
}
lock(&sched.lock) // 接下來的操作涉及全局
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
// 全局隊(duì)列不為空
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
// 也不能都歇著,需要人等網(wǎng)絡(luò),其他人不干 那就我把。
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
pidleput(_p_)
unlock(&sched.lock)
}
handoffp函數(shù)流程比較簡單,它的主要任務(wù)是通過各種條件判斷是否需要啟動工作線程來接管p,如果不需要則把p放入P的全局空閑隊(duì)列。
從handoffp的代碼可以看出,在如下幾種情況下則需要調(diào)用我們已經(jīng)分析過的startm函數(shù)啟動新的工作線程出來接管p:
- p的本地運(yùn)行隊(duì)列或全局運(yùn)行隊(duì)列里面有待運(yùn)行的goroutine;
- 需要幫助gc完成標(biāo)記工作;
- 系統(tǒng)比較忙,所有其它p都在運(yùn)行g(shù)oroutine,需要幫忙;
- 所有其它P都已經(jīng)處于空閑狀態(tài),如果需要監(jiān)控網(wǎng)絡(luò)連接讀寫事件,則需要啟動新的m來poll網(wǎng)絡(luò)連接。
到此,sysmon監(jiān)控線程對處于系統(tǒng)調(diào)用之中的p的搶占就已經(jīng)完成。
系統(tǒng)調(diào)用
TODO
簡單說
- 解綁
- 恢復(fù)的時候嘗試重新綁定,綁定不上就去找新的。
系統(tǒng)調(diào)用匯編實(shí)現(xiàn)
// func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, err uintptr)
TEXT ·Syscall6(SB),NOSPLIT,$0-80
// 預(yù)處理
CALL runtime·entersyscall(SB)
// 系統(tǒng)調(diào)用
MOVQ a1+8(FP), DI
MOVQ a2+16(FP), SI
MOVQ a3+24(FP), DX
MOVQ a4+32(FP), R10
MOVQ a5+40(FP), R8
MOVQ a6+48(FP), R9
MOVQ trap+0(FP), AX // syscall entry
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS ok6
MOVQ $-1, r1+56(FP)
MOVQ $0, r2+64(FP)
NEGQ AX
MOVQ AX, err+72(FP)
// 退出處理
CALL runtime·exitsyscall(SB)
RET
ok6:
MOVQ AX, r1+56(FP)
MOVQ DX, r2+64(FP)
MOVQ $0, err+72(FP)
// 退出處理
CALL runtime·exitsyscall(SB)
RET
系統(tǒng)調(diào)用做了三件事
- 預(yù)處理: entersyscall
- 處理 syscall
- 退出處理: exitsyscall
entersyscall
func reentersyscall(pc, sp uintptr) {
_g_ := getg()
// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
// 需要禁止g被搶占
_g_.m.locks++
// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
// entersyscall中不能調(diào)用任何導(dǎo)致棧增長或者分裂的函數(shù)
_g_.stackguard0 = stackPreempt
// 設(shè)置 throwsplit,在newstack中,如果發(fā)現(xiàn)throwsplit是true,揮之即crash
/**
if thisg.m.curg.throwsplit{
throw("runtime: stack split at bad time")
}
*/
_g_.throwsplit = true
// Leave SP around for GC and traceback.
// 保存現(xiàn)場,在syscall之后 依據(jù)這些數(shù)據(jù)恢復(fù)現(xiàn)場
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
// 修改g的狀態(tài)
casgstatus(_g_, _Grunning, _Gsyscall)
...
// 解綁處理
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
_g_.m.mcache = nil
pp := _g_.m.p.ptr()
pp.m = 0
_g_.m.oldp.set(pp) // 優(yōu)先綁定
_g_.m.p = 0
// 修改p的狀態(tài)
atomic.Store(&pp.status, _Psyscall)
if sched.gcwaiting != 0 {
systemstack(entersyscall_gcwait)
save(pc, sp)
}
_g_.m.locks--
}
exitsyscall
源碼先不補(bǔ)充,這塊還需要再復(fù)習(xí)
大概工作如下
- 嘗試綁定之前的p
- 失敗之后,從全局找一個p來執(zhí)行
- 如果找不到,那就把g放到全局隊(duì)列,自身stop
信號搶占
go在1.14中引入了信號搶占,這里也分析下
發(fā)起搶占
func preemptone(_p_ *p) bool {
...
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
// 搶占m,信號調(diào)度
preemptM(mp)
}
...
}
func preemptM(mp *m) {
signalM(mp, sigPreempt)
}
// signalM sends a signal to mp.
func signalM(mp *m, sig int) {
tgkill(getpid(), int(mp.procid), sig)
}
go在設(shè)置搶占標(biāo)志位的時候,同時發(fā)送了一個搶占信號。
信號接收
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
if sig == sigPreempt { // 如果是搶占信息 ,go1.14加入
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
}
信號處理
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
// Inject a call to asyncPreempt.
// 執(zhí)行搶占的關(guān)鍵方法
ctxt.pushCall(funcPC(asyncPreempt))
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
}
pushcall 直接修改了g的pc,從而恢復(fù)的時候執(zhí)行的地方不一樣。
func asyncPreempt() // 編譯器實(shí)現(xiàn)
//go:nosplit
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
// 真正的搶占
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
https://changkun.de/golang/zh-cn/part2runtime/ch06sched/preemption/