今天來講一下調(diào)度器,我本來寫了兩個(gè)版本,后面發(fā)現(xiàn)都好像不太好,其實(shí)核心差不太多,就是層次不夠清晰,然后在度娘上又啃了幾篇相關(guān)的文章,又進(jìn)行了綜合一下,文章末尾有引用的文章鏈接。不得不說,大佬們畫圖還是非常厲害的。其實(shí)突然在這個(gè)期間發(fā)現(xiàn)一些問題:就是markdown模式下的話容易讓人看不清重點(diǎn),最近在找替代簡書的地方,有推薦的可以留言推薦
(一)調(diào)度器的核心點(diǎn):
- 1.復(fù)用線程
避免頻繁的創(chuàng)建銷毀線程,我們知道線程的啟停銷毀是很耗費(fèi)性能的一件事情,我們就要reuse thread(線程復(fù)用) ,那么具體該怎么來處理。golang :”create threads when needed ;keep them around for reuse“,當(dāng)我們需要?jiǎng)?chuàng)建的時(shí)候就創(chuàng)建,然后保留它來復(fù)用,看過M結(jié)構(gòu)的人就知道M有個(gè)allM的字段就是保存所有的M的鏈表。 - 2.利用并行
設(shè)置GOMAXPROCS 來進(jìn)行設(shè)置go program的核心數(shù),程序的并行 - 3.stealing working
利用小偷算法,當(dāng)本地的隊(duì)列沒有g(shù)了,去別的地方偷一半的g進(jìn)行運(yùn)行,保證任務(wù)的公平性 - 4.handoff
利用移交算法,當(dāng)本線程因?yàn)橄到y(tǒng)調(diào)用進(jìn)行阻塞的時(shí)候,線程釋放綁定的P,把P給其他的M執(zhí)行
值得一說的是:Go1.1之前只有G-M模型,沒有P,Dmitry Vyukov在Scalable Go Scheduler Design Doc提出該模型在并發(fā)伸縮性方面的問題,并通過加入P(Processors)來改進(jìn)該問題。
(二)重要結(jié)構(gòu)體
G:goroutine
每次go調(diào)用的時(shí)候,都會(huì)創(chuàng)建一個(gè)G對(duì)象,它包括棧、指令指針以及對(duì)于調(diào)用goroutines很重要的其它信息,比如阻塞它的任何channel,其主要數(shù)據(jù)結(jié)構(gòu)
// Go1.11版本默認(rèn)stack大小為2KB
_StackMin = 2048
// 創(chuàng)建一個(gè)g對(duì)象,然后放到g隊(duì)列
// 等待被執(zhí)行
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg()
_g_.m.locks++
siz := narg
siz = (siz + 7) &^ 7
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
// 初始化g stack大小
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
// 以下省略}
M:mechine
代表一個(gè)線程,每次創(chuàng)建一個(gè)M的時(shí)候,都會(huì)有一個(gè)底層線程創(chuàng)建;所有的G任務(wù),最終還是在M上執(zhí)行,其主要數(shù)據(jù)結(jié)構(gòu)
type m struct {
/*
1. 所有調(diào)用棧的Goroutine,這是一個(gè)比較特殊的Goroutine。
2. 普通的Goroutine棧是在Heap分配的可增長的stack,而g0的stack是M對(duì)應(yīng)的線程棧。
3. 所有調(diào)度相關(guān)代碼,會(huì)先切換到該Goroutine的棧再執(zhí)行。
*/
g0 *g
curg *g // M當(dāng)前綁定的結(jié)構(gòu)體G
// SP、PC寄存器用于現(xiàn)場(chǎng)保護(hù)和現(xiàn)場(chǎng)恢復(fù)
vdsoSP uintptr
vdsoPC uintptr
// 省略…}
P:Processor
代表一個(gè)處理器,每一個(gè)運(yùn)行的M都必須綁定一個(gè)P,就像線程必須在么一個(gè)CPU核上執(zhí)行一樣,由P來調(diào)度G在M上的運(yùn)行,P的個(gè)數(shù)就是GOMAXPROCS(最大256),啟動(dòng)時(shí)固定的,一般不修改;M的個(gè)數(shù)和P的個(gè)數(shù)不一定一樣多(會(huì)有休眠的M或者不需要太多的M)(最大10000);每一個(gè)P保存著本地G任務(wù)隊(duì)列,也有一個(gè)全局G任務(wù)隊(duì)列。P的數(shù)據(jù)結(jié)構(gòu)
// 自定義設(shè)置GOMAXPROCS數(shù)量
func GOMAXPROCS(n int) int {
/*
1. GOMAXPROCS設(shè)置可執(zhí)行的CPU的最大數(shù)量,同時(shí)返回之前的設(shè)置。
2. 如果n < 1,則不更改當(dāng)前的值。
*/
ret := int(gomaxprocs)
stopTheWorld("GOMAXPROCS")
// startTheWorld啟動(dòng)時(shí),使用newprocs。
newprocs = int32(n)
startTheWorld()
return ret
}
// 默認(rèn)P被綁定到所有CPU核上
// P == cpu.cores
func getproccount() int32 {
const maxCPUs = 64 * 1024
var buf [maxCPUs / 8]byte
// 獲取CPU Core
r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])
n := int32(0)
for _, v := range buf[:r] {
for v != 0 {
n += int32(v & 1)
v >>= 1
}
}
if n == 0 {
n = 1
}
return n
}
// 一個(gè)進(jìn)程默認(rèn)被綁定在所有CPU核上,返回所有CPU core。
// 獲取進(jìn)程的CPU親和性掩碼系統(tǒng)調(diào)用
// rax 204 ; 系統(tǒng)調(diào)用碼
// system_call sys_sched_getaffinity; 系統(tǒng)調(diào)用名稱
// rid pid ; 進(jìn)程號(hào)
// rsi unsigned int len
// rdx unsigned long *user_mask_ptr
sys_linux_amd64.s:
TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0
MOVQ pid+0(FP), DI
MOVQ len+8(FP), SI
MOVQ buf+16(FP), DX
MOVL $SYS_sched_getaffinity, AX
SYSCALL
MOVL AX, ret+24(FP)
RET
(三)調(diào)度過程

我們通過 go func()來創(chuàng)建一個(gè)goroutine;g 的結(jié)構(gòu)是可復(fù)用的,對(duì)于可復(fù)用的g也是有l(wèi)ocal隊(duì)列和global隊(duì)列的,用:p.freeg 這個(gè)參數(shù),全局隊(duì)列就是sched.pfree,獲取參數(shù)都是差不多的,優(yōu)先從p.gfree中獲取,這一步是無鎖的,否者就從sched.pfree中獲取一部分過來這是有鎖的一個(gè)操作
有兩個(gè)存儲(chǔ)G的隊(duì)列,一個(gè)是局部調(diào)度器P的本地隊(duì)列、一個(gè)是全局G隊(duì)列。新創(chuàng)建的G會(huì)優(yōu)先嘗試放到p的runnext中,作為下一個(gè)執(zhí)行G,如果不行就得放到我們的本地隊(duì)列中,如果P的本地隊(duì)列已經(jīng)滿了就會(huì)保存在全局的隊(duì)列中;
G只能運(yùn)行在M中,一個(gè)M必須持有一個(gè)P,M與P是1:1的關(guān)系。M會(huì)從P的本地隊(duì)列彈出一個(gè)可執(zhí)行狀態(tài)的G來
執(zhí)行,如果P的本地隊(duì)列為空,就會(huì)想其他的MP組合偷取一個(gè)可執(zhí)行的G來執(zhí)行;一個(gè)M調(diào)度G執(zhí)行的過程是一個(gè)循環(huán)機(jī)制;
當(dāng)M執(zhí)行某一個(gè)G時(shí)候如果發(fā)生了syscall或則其余阻塞操作,M會(huì)阻塞,如果當(dāng)前有一些G在執(zhí)行,runtime會(huì)把
這個(gè)線程M從P中摘除(detach),然后再創(chuàng)建一個(gè)新的操作系統(tǒng)的線程(如果有空閑的線程可用就復(fù)用空閑線程)來
服務(wù)于這個(gè)P;當(dāng)M系統(tǒng)調(diào)用結(jié)束時(shí)候,這個(gè)G會(huì)嘗試獲取一個(gè)空閑的P執(zhí)行,并放入到這個(gè)P的本地隊(duì)列。如果獲取不到P,
那么這個(gè)線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個(gè)G會(huì)被放入全局隊(duì)列中
3.1G的幾種暫停方式
- gosched: 將當(dāng)前的G暫停,保存堆棧狀態(tài),以_GRunnable狀態(tài)放入Global隊(duì)列中,讓當(dāng)前M繼續(xù)執(zhí)行其它任務(wù)。無需對(duì)G進(jìn)行喚醒操作,因?yàn)榭倳?huì)有M從Global隊(duì)列取得并執(zhí)行該G。搶占調(diào)度即使用該方式
- 2.gopark: 與goched的最大區(qū)別在于gopark沒有將G放回執(zhí)行隊(duì)列,而是位于某個(gè)等待隊(duì)列中(如channel的waitq,此時(shí)G狀態(tài)為_Gwaitting),因此G必須被手動(dòng)喚醒(通過goready),否則會(huì)丟失任務(wù)。應(yīng)用層阻塞通常使用這種方式。
- 3.notesleep: 既不讓出M,也不讓G和P重新調(diào)度,直接讓線程休眠直到被喚醒(notewakeup),該方式更快,通常用于gcMark,stopm這類自旋場(chǎng)景
- 4.notesleepg: 阻塞G和M,放飛P,P可以和其它M綁定繼續(xù)執(zhí)行,比如可能阻塞的系統(tǒng)調(diào)用會(huì)主動(dòng)調(diào)用entersyscallblock,則會(huì)觸發(fā) notesleepg
- 5.goexit: 立即終止G任務(wù),不管其處于調(diào)用堆棧的哪個(gè)層次,在終止前,確保所有defer正確執(zhí)行。
(四)調(diào)度源碼
// go1.9.1 src/runtime/proc.go
// 省略了GC檢查等其它細(xì)節(jié),只保留了主要流程
// g: G結(jié)構(gòu)體定義
// sched: Global隊(duì)列
// 獲取一個(gè)待執(zhí)行的G
func findrunnable() (gp *g, inheritTime bool) {
// 獲取當(dāng)前的G對(duì)象
_g_ := getg()
top:
// 獲取當(dāng)前P對(duì)象
_p_ := _g_.m.p.ptr()
// 1. 嘗試從P的Local隊(duì)列中取得G 優(yōu)先_p_.runnext 然后再從Local隊(duì)列中取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 2. 嘗試從Global隊(duì)列中取得G
if sched.runqsize != 0 {
lock(&sched.lock)
// globrunqget從Global隊(duì)列中獲取G 并轉(zhuǎn)移一批G到_p_的Local隊(duì)列
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 3. 檢查netpoll任務(wù):檢測(cè)是否存在M阻塞
if netpollinited() && sched.lastpoll != 0 {
if gp := netpoll(false); gp != nil { // non-blocking
// netpoll返回的是G鏈表,將其它G放回Global隊(duì)列
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// 4. 嘗試從其它P竊取任務(wù)
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
// 隨機(jī)P的遍歷順序
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// runqsteal執(zhí)行實(shí)際的steal工作,從目標(biāo)P的Local隊(duì)列轉(zhuǎn)移一般的G過來
// stealRunNextG指是否steal目標(biāo)P的p.runnext G
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
...
}
4.1用戶態(tài)的阻塞
當(dāng)Goroutine因?yàn)镃hannel操作而阻塞(通過gopark)時(shí),對(duì)應(yīng)的G會(huì)被放置到某個(gè)wait隊(duì)列(如channel的waitq),該G的狀態(tài)由_Gruning變?yōu)開Gwaitting,而M會(huì)跳過該G嘗試獲取并執(zhí)行下一個(gè)G。
當(dāng)阻塞的G被G2喚醒(通過goready)時(shí)(比如channel可讀/寫),G會(huì)嘗試加入G2所在P的runnext,然后再是P Local隊(duì)列和Global隊(duì)列。簡單解釋一下:當(dāng)G是chan<-的接收消息,被阻塞了,如果G2是Chan的寫消息,當(dāng)G阻塞,G2寫入了一個(gè)數(shù)據(jù),那么G就被G2喚醒了,G就被放到了G2的P的runnext,如果放成功了,就是跳過了排隊(duì),然后執(zhí)行,如果失敗了就丟入local隊(duì)列
4.2系統(tǒng)調(diào)用阻塞:syscall
當(dāng)G被阻塞在某個(gè)系統(tǒng)調(diào)用上時(shí),此時(shí)G會(huì)阻塞在_Gsyscall狀態(tài),M也處于block on syscall狀態(tài),此時(shí)仍然可被搶占調(diào)度: 執(zhí)行該G的M會(huì)與P解綁,而P則嘗試與其它idle的M綁定,繼續(xù)執(zhí)行其它G。如果沒有其它idle的M,但隊(duì)列中仍然有G需要執(zhí)行,則創(chuàng)建一個(gè)新的M。
當(dāng)系統(tǒng)調(diào)用完成后,G會(huì)重新嘗試獲取一個(gè)idle的P,并恢復(fù)執(zhí)行,如果沒有idle的P,G將加入到Global隊(duì)列。
系統(tǒng)調(diào)用能被調(diào)度的關(guān)鍵有兩點(diǎn):
runtime/syscall包中,將系統(tǒng)調(diào)用分為SysCall和RawSysCall,前者和后者的區(qū)別是前者會(huì)在系統(tǒng)調(diào)用前后分別調(diào)用entersyscall和exitsyscall(位于src/runtime/proc.go),做一些現(xiàn)場(chǎng)保存和恢復(fù)操作,這樣才能使P安全地與M解綁,并在其它M上繼續(xù)執(zhí)行其它G。某些系統(tǒng)調(diào)用本身可以確定會(huì)長時(shí)間阻塞(比如鎖),會(huì)調(diào)用entersyscallblock在發(fā)起系統(tǒng)調(diào)用前直接讓P和M解綁(handoffp)。
4.3GMP的幾個(gè)狀態(tài)
- P的幾個(gè)狀態(tài):
(五)sysmon
sysmon是一個(gè)由runtime啟動(dòng)的M,也叫監(jiān)控線程,它無需P也可以運(yùn)行,它每20us~10ms喚醒一次,主要執(zhí)行:
釋放閑置超過5分鐘的span物理內(nèi)存;
如果超過2分鐘沒有垃圾回收,強(qiáng)制執(zhí)行;
將長時(shí)間未處理的netpoll結(jié)果添加到任務(wù)隊(duì)列;
向長時(shí)間運(yùn)行的G任務(wù)發(fā)出搶占調(diào)度;
收回因syscall長時(shí)間阻塞的P;
入口在src/runtime/proc.go:sysmon函數(shù),它通過retake實(shí)現(xiàn)對(duì)syscall和長時(shí)間運(yùn)行的G進(jìn)行調(diào)度:
func retake(now int64) uint32 {
n := 0
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// 如果當(dāng)前P Local隊(duì)列沒有其它G,當(dāng)前有其它P處于Idle狀態(tài),并且syscall執(zhí)行事件不超過10ms,則不用解綁當(dāng)前P(handoffp)
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// handoffp
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
} else if s == _Prunning {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
// 如果當(dāng)前G執(zhí)行時(shí)間超過10ms,則搶占(preemptone)
if pd.schedwhen+forcePreemptNS > now {
continue
}
// 執(zhí)行搶占
preemptone(_p_)
}
}
return uint32(n)
}
搶占式調(diào)度
當(dāng)某個(gè)goroutine執(zhí)行超過10ms,sysmon會(huì)向其發(fā)起搶占調(diào)度請(qǐng)求,由于Go調(diào)度不像OS調(diào)度那樣有時(shí)間片的概念,因此實(shí)際搶占機(jī)制要弱很多: Go中的搶占實(shí)際上是為G設(shè)置搶占標(biāo)記(g.stackguard0),當(dāng)G調(diào)用某函數(shù)時(shí)(更確切說,在通過newstack分配函數(shù)棧時(shí)),被編譯器安插的指令會(huì)檢查這個(gè)標(biāo)記,并且將當(dāng)前G以runtime.Goched的方式暫停,并加入到全局隊(duì)列。源代碼如下:
// src/runtime/stack.go
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
func newstack(ctxt unsafe.Pointer) {
...
// gp為當(dāng)前G
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
...
// Act like goroutine called runtime.Gosched.
// G狀態(tài)由_Gwaiting變?yōu)?_Grunning 這是為了能以Gosched的方式暫停Go
casgstatus(gp, _Gwaiting, _Grunning)
gopreempt_m(gp) // never return
}
}
// 以goched的方式將G重新放入
func goschedImpl(gp *g) {
status := readgstatus(gp)
// 由Running變?yōu)镽unnable
casgstatus(gp, _Grunning, _Grunnable)
// 與M解除綁定
dropg()
lock(&sched.lock)
// 將G放入Global隊(duì)列
globrunqput(gp)
unlock(&sched.lock)
// 重新調(diào)度
schedule()
}
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
netpoll
前面的findrunnable,G的獲取除了p.runnext,p.runq和sched.runq外,還有一中G從netpoll中獲取,netpoll是Go針對(duì)網(wǎng)絡(luò)IO的一種優(yōu)化,本質(zhì)上為了避免網(wǎng)絡(luò)IO陷入系統(tǒng)調(diào)用之中,這樣使得即便G發(fā)起網(wǎng)絡(luò)I/O操作也不會(huì)導(dǎo)致M被阻塞(僅阻塞G),從而不會(huì)導(dǎo)致大量M被創(chuàng)建出來。