Go scheduler 源碼分析

1、進(jìn)程/線程/協(xié)程基本概念

一個(gè)進(jìn)程可以有多個(gè)線程,一般情況下固定2MB內(nèi)存塊來(lái)做棧,用來(lái)保存當(dāng)前被調(diào)用/掛起的函數(shù)內(nèi)部的變量,CPU在執(zhí)行調(diào)度的時(shí)候切換的是線程,如果下一個(gè)線程也是當(dāng)前進(jìn)程的,就只有線程切換,“很快”就能完成;如果下一個(gè)線程不是當(dāng)前的進(jìn)程,就需要切換進(jìn)程,這就得費(fèi)點(diǎn)時(shí)間了。

線程分為內(nèi)核態(tài)線程用戶態(tài)線程,用戶態(tài)線程需要綁定內(nèi)核態(tài)線程,CPU并不能感知用戶態(tài)線程的存在,它只知道它在運(yùn)行1個(gè)線程,這個(gè)線程實(shí)際是內(nèi)核態(tài)線程。

用戶態(tài)線程實(shí)際有個(gè)名字叫協(xié)程(co-routine),為了容易區(qū)分,我們使用協(xié)程指用戶態(tài)線程,使用線程指內(nèi)核態(tài)線程。

協(xié)程跟線程是有區(qū)別的,線程由CPU調(diào)度是搶占式的,協(xié)程由用戶態(tài)調(diào)度是協(xié)作式的,一個(gè)協(xié)程讓出CPU后,才執(zhí)行下一個(gè)協(xié)程。

協(xié)程和線程綁定關(guān)系有以下3種:

N:1,N個(gè)協(xié)程綁定1個(gè)線程,優(yōu)點(diǎn)就是協(xié)程在用戶態(tài)線程即完成切換,不會(huì)陷入到內(nèi)核態(tài),這種切換非常的輕量快速。但也有很大的缺點(diǎn),1個(gè)進(jìn)程的所有協(xié)程都綁定在1個(gè)線程上,一是某個(gè)程序用不了硬件的多核加速能力,二是一旦某協(xié)程阻塞,造成線程阻塞,本進(jìn)程的其他協(xié)程都無(wú)法執(zhí)行了,根本就沒(méi)有并發(fā)的能力了。

1:1,1個(gè)協(xié)程綁定1個(gè)線程,這種最容易實(shí)現(xiàn)。協(xié)程的調(diào)度都由CPU完成了,不存在N:1缺點(diǎn),但有一個(gè)缺點(diǎn)是協(xié)程的創(chuàng)建、刪除和切換的代價(jià)都由CPU完成,有點(diǎn)略顯昂貴了。

M:N,M個(gè)協(xié)程綁定N個(gè)線程,是N:1和1:1類型的結(jié)合,克服了以上2種模型的缺點(diǎn),但實(shí)現(xiàn)起來(lái)最為復(fù)雜。

2、Golang簡(jiǎn)介

2.1 Goroutine 概念

因?yàn)榫€程切換需要很大的上下文,這種切換消耗了大量CPU時(shí)間,所以Go的并行單元并不是傳統(tǒng)意義上的線程,而是采用更輕量的協(xié)程(goroutine)來(lái)處理,大大提高了并行度,因此Go被稱為“最并行的語(yǔ)言”。

2.2與其他并發(fā)模型的對(duì)比

Python等解釋性語(yǔ)言采用的是多進(jìn)程并發(fā)模型,進(jìn)程的上下文是最大的,所以切換耗費(fèi)巨大,同時(shí)由于多進(jìn)程通信只能用socket通訊,或者專門設(shè)置共享內(nèi)存,給編程帶來(lái)了極大的困擾與不便;

C++等語(yǔ)言通常會(huì)采用多線程并發(fā)模型,相比進(jìn)程,線程的上下文要小很多,而且多個(gè)線程之間本來(lái)就是共享內(nèi)存的,所以編程相比要輕松很多。但是線程的啟動(dòng)和銷毀,切換依然要耗費(fèi)大量CPU時(shí)間;于是出現(xiàn)了線程池技術(shù),將線程先儲(chǔ)存起來(lái),保持一定的數(shù)量,來(lái)避免頻繁開啟/關(guān)閉線程的時(shí)間消耗,但是這種初級(jí)的技術(shù)存在一些問(wèn)題,比如有線程一直被IO阻塞,這樣的話這個(gè)線程一直占據(jù)著坑位,導(dǎo)致后面的任務(wù)排不到隊(duì),拿不到線程來(lái)執(zhí)行;

Go的并發(fā)較為復(fù)雜,Go采用了更輕量的數(shù)據(jù)結(jié)構(gòu)來(lái)代替線程,這種數(shù)據(jù)結(jié)構(gòu)相比線程更輕量,他有自己的棧,切換起來(lái)更快。然而真正執(zhí)行并發(fā)的還是線程,Go通過(guò)調(diào)度器將goroutine調(diào)度到線程中執(zhí)行,并適時(shí)地釋放和創(chuàng)建新的線程,并且當(dāng)一個(gè)正在運(yùn)行的goroutine進(jìn)入阻塞(常見(jiàn)場(chǎng)景就是等待IO)時(shí),將其脫離占用的線程,將其他準(zhǔn)備好運(yùn)行的goroutine放在該線程上執(zhí)行。通過(guò)較為復(fù)雜的調(diào)度手段,使得整個(gè)系統(tǒng)獲得極高的并行度同時(shí)又不耗費(fèi)大量的CPU資源。

2.3 Goroutine的特點(diǎn)

非阻塞。Goroutine的引入是為了方便高并發(fā)程序的編寫。一個(gè)Goroutine在進(jìn)行阻塞操作(比如系統(tǒng)調(diào)用)時(shí),會(huì)把當(dāng)前線程中的其他Goroutine移交到其他線程中繼續(xù)執(zhí)行,從而避免了整個(gè)程序的阻塞。

調(diào)度器。雖然Golang引入了垃圾回收(gc),在執(zhí)行g(shù)c時(shí)就要求Goroutine是停止的,但Go通過(guò)自己實(shí)現(xiàn)調(diào)度器,也可以方便的實(shí)現(xiàn)該功能。 通過(guò)多個(gè)Goroutine來(lái)實(shí)現(xiàn)并發(fā)程序,既有異步IO的優(yōu)勢(shì),又具有多線程、多進(jìn)程編寫程序的便利性。

自己維護(hù)堆棧。當(dāng)然引入Goroutine,也意味著引入了極大的復(fù)雜性。一個(gè)Goroutine既要包含要執(zhí)行的代碼,又要包含用于執(zhí)行該代碼的棧、PC(PC值=當(dāng)前程序執(zhí)行位置+8)和SP指針。堆棧指針需要保證各種模式下程序完成性。

既然每個(gè)Goroutine都有自己的棧,那么在創(chuàng)建Goroutine時(shí),就要同時(shí)創(chuàng)建對(duì)應(yīng)的棧。Goroutine在執(zhí)行時(shí),??臻g會(huì)不停增長(zhǎng)。棧通常是連續(xù)增長(zhǎng)的,由于每個(gè)進(jìn)程中的各個(gè)線程共享虛擬內(nèi)存空間,當(dāng)有多個(gè)線程時(shí),就需要為每個(gè)線程分配不同起始地址的棧。這就需要在分配棧之前先預(yù)估每個(gè)線程棧的大小。如果線程數(shù)量非常多,就很容易棧溢出。

為了解決這個(gè)問(wèn)題,就有了Split Stacks 技術(shù):創(chuàng)建棧時(shí),只分配一塊比較小的內(nèi)存,如果進(jìn)行某次函數(shù)調(diào)用導(dǎo)致棧空間不足時(shí),就會(huì)在其他地方分配一塊新的??臻g。新的空間不需要和老的??臻g連續(xù)。函數(shù)調(diào)用的參數(shù)會(huì)拷貝到新的??臻g中,接下來(lái)的函數(shù)執(zhí)行都在新??臻g中進(jìn)行。Golang的棧管理方式與此類似,但是為了更高的效率,使用了連續(xù)棧( Golang連續(xù)棧) 實(shí)現(xiàn)方式也是先分配一塊固定大小的棧,在??臻g不足時(shí),分配一塊更大的棧,并把舊的棧全部拷貝到新棧中。這樣避免了Split Stacks方法可能導(dǎo)致的頻繁內(nèi)存分配和釋放。

Goroutine的執(zhí)行是可以被搶占的。如果一個(gè)Goroutine一直占用CPU,長(zhǎng)時(shí)間沒(méi)有被調(diào)度過(guò),就會(huì)被runtime搶占掉,把CPU時(shí)間交給其他Goroutine。 這個(gè)可以通過(guò) debug/goroutine 阻塞實(shí)現(xiàn)。

2.4 結(jié)構(gòu)體

M:指go中的工作者線程,是真正執(zhí)行代碼的單元;

P:是一種調(diào)度goroutine的上下文,goroutine依賴于P進(jìn)行調(diào)度,P是真正的并行單元;

G:即goroutine,是go語(yǔ)言中的一段代碼(以一個(gè)函數(shù)的形式展現(xiàn)),最小的并行單元;

P必須綁定在M上才能運(yùn)行,M必須綁定了P才能運(yùn)行,而一般情況下,最多有MAXPROCS(通常等于CPU數(shù)量)個(gè)P,但是可能有很多個(gè)M,真正運(yùn)行的只有綁定了M的P,所以P是真正的并行單元。

每個(gè)P有一個(gè)自己的runnableG隊(duì)列,可以從里面拿出一個(gè)G來(lái)運(yùn)行,同時(shí)也有一個(gè)全局的runnable G隊(duì)列,G通過(guò)P依附在M上面執(zhí)行。不單獨(dú)使用全局的runnable G隊(duì)列的原因是,分布式的隊(duì)列有利于減小臨界區(qū)大小,想一想多個(gè)線程同時(shí)請(qǐng)求可用的G的時(shí)候,如果只有全局的資源,那么這個(gè)全局的鎖會(huì)導(dǎo)致多少線程一直在等待。

但是如果一個(gè)正在執(zhí)行的G進(jìn)入了阻塞,典型的例子就是等待IO,那么他和它所在的M會(huì)在那邊等待,而上下文P會(huì)傳遞到其他可用的M上面,這樣這個(gè)阻塞就不會(huì)影響程序的并行度。

G結(jié)構(gòu)體

typegstruct{// Stack parameters.// stack describes the actual stack memory: [stack.lo, stack.hi).// stackguard0 is the stack pointer compared in the Go stack growth prologue.// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.// stackguard1 is the stack pointer compared in the C stack growth prologue.// It is stack.lo+StackGuard on g0 and gsignal stacks.// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).stack? ? ? stack// offset known to runtime/cgo //描述了真實(shí)的棧內(nèi)存,包括上下界、stackguard0uintptr// offset known to liblinkstackguard1uintptr// offset known to liblink_panic? ? ? ? *_panic// innermost panic - offset known to liblink_defer? ? ? ? *_defer// innermost deferm? ? ? ? ? ? ? *m// current m; offset known to arm liblink? //當(dāng)前的Msched? ? ? ? ? gobuf//goroutine切換時(shí),用于保存g的上下文syscallspuintptr// if status==Gsyscall, syscallsp = sched.sp to use during gcsyscallpcuintptr// if status==Gsyscall, syscallpc = sched.pc to use during gcstktopspuintptr// expected sp at top of stack, to check in tracebackparam? ? ? ? ? unsafe.Pointer// passed parameter on wakeup 用于傳遞參數(shù),睡眠時(shí) 其他goroutine可以設(shè)置param,喚醒時(shí)該goroutine可以獲取atomicstatusuint32stackLockuint32// sigprof/scang lock;TODO:fold in to atomicstatusgoidint64//goroutine 的IDwaitsinceint64// approx time when the g become blocked? g被阻塞的 大概時(shí)間waitreasonstring// if status==Gwaitingschedlink? ? ? guintptr? preemptbool// preemption signal, duplicates stackguard0 = stackpreemptpaniconfaultbool// panic (instead of crash) on unexpected fault addresspreemptscanbool// preempted g does scan for gcgcscandonebool// g has scanned stack; protected by _Gscan bit in statusgcscanvalidbool// false at start of gc cycle, true if G has not run since last scan;TODO:remove?throwsplitbool// must not split stackraceignoreint8// ignore race detection eventssysblocktracedbool// StartTrace has emitted EvGoInSyscall about this goroutinesysexitticksint64// cputicks when syscall has returned (for tracing)tracesequint64// trace event sequencertracelastp? ? puintptr// last P emitted an event for this goroutinelockedm? ? ? ? muintptr//G被鎖定只能在這個(gè)M運(yùn)行siguint32writebuf? ? ? []bytesigcode0uintptrsigcode1uintptrsigpcuintptrgopcuintptr// pc of go statement that created this goroutinestartpcuintptr// pc of goroutine functionracectxuintptrwaiting? ? ? ? *sudog// sudog structures this g is waiting on (that have a valid elem ptr); in lock ordercgoCtxt? ? ? ? []uintptr// cgo traceback contextlabels? ? ? ? unsafe.Pointer// profiler labelstimer? ? ? ? ? *timer// cached timer for time.SleepselectDoneuint32// are we participating in a select and did someone win the race?// Per-G GC state// gcAssistBytes is this G's GC assist credit in terms of// bytes allocated. If this is positive, then the G has credit// to allocate gcAssistBytes bytes without assisting. If this// is negative, then the G must correct this by performing// scan work. We track this in bytes to make it fast to update// and check for debt in the malloc hot path. The assist ratio// determines how this corresponds to scan work debt.gcAssistBytesint64}

Gobuf結(jié)構(gòu)體

typegobuf struct {spuintptrpcuintptrgguintptrctxtunsafe.Pointerretsys.Uintreglruintptrbpuintptr // for GOEXPERIMENT=framepointer}

其中最主要的當(dāng)然是sched了,保存了goroutine的上下文。goroutine切換的時(shí)候不同于線程有OS來(lái)負(fù)責(zé)這部分?jǐn)?shù)據(jù),而是由一個(gè)gobuf對(duì)象來(lái)保存,這樣能夠更加輕量級(jí),再來(lái)看看gobuf的結(jié)構(gòu)

M結(jié)構(gòu)體

typem struct {g0*g? ? // 帶有調(diào)度棧的goroutinegsignal*g? ? ? ? // 處理信號(hào)的goroutinetls[6]uintptr // thread-local storagemstartfnfunc()curg*g? ? ? // 當(dāng)前運(yùn)行的goroutinecaughtsigguintptrppuintptr // 關(guān)聯(lián)p和執(zhí)行的go代碼nextppuintptridint32mallocingint32 // 狀態(tài)spinningbool // m是否out of workblockedbool // m是否被阻塞inwbbool // m是否在執(zhí)行寫屏蔽printlockint8incgobool // m在執(zhí)行cgo嗎fastranduint32ncgocalluint64? ? ? // cgo調(diào)用的總數(shù)ncgoint32? ? ? // 當(dāng)前cgo調(diào)用的數(shù)目parknotealllink*m // 用于鏈接allmschedlinkmuintptrmcache*mcache // 當(dāng)前m的內(nèi)存緩存lockedg*g // 鎖定g在當(dāng)前m上執(zhí)行,而不會(huì)切換到其他mcreatestack[32]uintptr // thread創(chuàng)建的棧}

結(jié)構(gòu)體M中有兩個(gè)G是需要關(guān)注一下的:

一個(gè)是curg,代表結(jié)構(gòu)體M當(dāng)前綁定的結(jié)構(gòu)體G。

另一個(gè)是g0,是帶有調(diào)度棧的goroutine,這是一個(gè)比較特殊的goroutine。普通的goroutine的棧是在堆上分配的可增長(zhǎng)的棧,而g0的棧是M對(duì)應(yīng)的線程的棧。所有調(diào)度相關(guān)的代碼,會(huì)先切換到該goroutine的棧中再執(zhí)行。也就是說(shuō)線程的棧也是用的g實(shí)現(xiàn),而不是使用的OS的。

P結(jié)構(gòu)體

typep struct {lockmutexidint32statusuint32 // 狀態(tài),可以為pidle/prunning/...linkpuintptrschedtickuint32? ? // 每調(diào)度一次加1syscalltickuint32? ? // 每一次系統(tǒng)調(diào)用加1sysmonticksysmontickmmuintptr? // 回鏈到關(guān)聯(lián)的mmcache*mcacheracectxuintptrgoidcacheuint64 // goroutine的ID的緩存goidcacheenduint64//可運(yùn)行的goroutine的隊(duì)列runqheaduint32runqtailuint32runq[256]guintptrrunnextguintptr // 下一個(gè)運(yùn)行的gsudogcache[]*sudogsudogbuf[128]*sudogpallocpersistentAlloc // per-P to avoid mutexpad[sys.CacheLineSize]byte}

其中P的狀態(tài)有Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其內(nèi)部隊(duì)列runqhead里面有可運(yùn)行的goroutine,P優(yōu)先從內(nèi)部獲取執(zhí)行的g,這樣能夠提高效率。

Schedt結(jié)構(gòu)體

typeschedtstruct{? goidgenuint64lastpolluint64lock mutex? ? midle? ? ? ? muintptr// idle狀態(tài)的mnmidleint32// idle狀態(tài)的m個(gè)數(shù)nmidlelockedint32// lockde狀態(tài)的m個(gè)數(shù)mcountint32// 創(chuàng)建的m的總數(shù)maxmcountint32// m允許的最大個(gè)數(shù)ngsysuint32// 系統(tǒng)中g(shù)oroutine的數(shù)目,會(huì)自動(dòng)更新pidle? ? ? puintptr// idle的pnpidleuint32nmspinninguint32// 全局的可運(yùn)行的g隊(duì)列runqhead guintptr? ? runqtail guintptr? ? runqsizeint32// dead的G的全局緩存gflock? ? ? mutex? ? gfreeStack? *g? ? gfreeNoStack *g? ? ngfreeint32// sudog的緩存中心sudoglock? mutex? ? sudogcache *sudog}

大多數(shù)需要的信息都已放在了結(jié)構(gòu)體M、G和P中,schedt結(jié)構(gòu)體只是一個(gè)殼??梢钥吹?,其中有M的idle隊(duì)列,P的idle隊(duì)列,以及一個(gè)全局的就緒的G隊(duì)列。schedt結(jié)構(gòu)體中的Lock是非常必須的,如果M或P等做一些非局部的操作,它們一般需要先鎖住調(diào)度器。

2.5具體函數(shù)

goroutine調(diào)度器的代碼在/src/runtime/proc.go中,一些比較關(guān)鍵的函數(shù)分析如下。

2.5.1 schedule函數(shù)

schedule函數(shù)在runtime需要進(jìn)行調(diào)度時(shí)執(zhí)行,為當(dāng)前的P尋找一個(gè)可以運(yùn)行的G并執(zhí)行它,尋找順序如下:

1) 調(diào)用runqget函數(shù)來(lái)從P自己的runnable G隊(duì)列中得到一個(gè)可以執(zhí)行的G;

2) 如果1)失敗,則調(diào)用findrunnable函數(shù)去尋找一個(gè)可以執(zhí)行的G;

3) 如果2)也沒(méi)有得到可以執(zhí)行的G,那么結(jié)束調(diào)度,從上次的現(xiàn)場(chǎng)繼續(xù)執(zhí)行。

4) 注意)//偶爾會(huì)先檢查一次全局可運(yùn)行隊(duì)列,以確保公平性。否則,兩個(gè)goroutine可以完全占用本地runqueue。 通過(guò) schedtick計(jì)數(shù) %61來(lái)保證

代碼如下:

// One round of scheduler: find a runnable goroutine and execute it.// Never returns.funcschedule(){? _g_ := getg()if_g_.m.locks !=0{? ? ? throw("schedule: holding locks")? }if_g_.m.lockedg !=0{? ? ? stoplockedm()? ? ? execute(_g_.m.lockedg.ptr(),false)// Never returns.}// We should not schedule away from a g that is executing a cgo call,// since the cgo call is using the m's g0 stack.if_g_.m.incgo {? ? ? throw("schedule: in cgo")? } top:ifsched.gcwaiting !=0{? ? ? gcstopm()gototop? }if_g_.m.p.ptr().runSafePointFn !=0{? ? ? runSafePointFn()? }vargp *gvarinheritTimebooliftrace.enabled || trace.shutdown {? ? ? gp = traceReader()ifgp !=nil{? ? ? ? casgstatus(gp, _Gwaiting, _Grunnable)? ? ? ? traceGoUnpark(gp,0)? ? ? }? }ifgp ==nil&& gcBlackenEnabled !=0{? ? ? gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())? }ifgp ==nil{// Check the global runnable queue once in a while to ensure fairness.// Otherwise two goroutines can completely occupy the local runqueue// by constantly respawning each other.if_g_.m.p.ptr().schedtick%61==0&& sched.runqsize >0{? ? ? ? lock(&sched.lock)? ? ? ? gp = globrunqget(_g_.m.p.ptr(),1)? ? ? ? unlock(&sched.lock)? ? ? }? }ifgp ==nil{? ? ? gp, inheritTime = runqget(_g_.m.p.ptr())ifgp !=nil&& _g_.m.spinning {? ? ? ? throw("schedule: spinning with local work")? ? ? }? }ifgp ==nil{? ? ? gp, inheritTime = findrunnable()// blocks until work is available}// This thread is going to run a goroutine and is not spinning anymore,// so if it was marked as spinning we need to reset it now and potentially// start a new spinning M.if_g_.m.spinning {? ? ? resetspinning()? }ifgp.lockedm !=0{// Hands off own p to the locked m,// then blocks waiting for a new p.startlockedm(gp)gototop? }? ? execute(gp, inheritTime)}

2.5.2 findrunnable函數(shù)

findrunnable函數(shù)負(fù)責(zé)給一個(gè)P尋找可以執(zhí)行的G,它的尋找順序如下:

1) 調(diào)用runqget函數(shù)來(lái)從P自己的runnable G隊(duì)列中得到一個(gè)可以執(zhí)行的G;

2) 如果1)失敗,調(diào)用globrunqget函數(shù)從全局runnableG隊(duì)列中得到一個(gè)可以執(zhí)行的G;

3) 如果2)失敗,調(diào)用netpoll(非阻塞)函數(shù)取一個(gè)異步回調(diào)的G

4) 如果3)失敗,嘗試從其他P那里偷取一半數(shù)量的G過(guò)來(lái);

5) 如果4)失敗,再次調(diào)用globrunqget函數(shù)從全局runnableG隊(duì)列中得到一個(gè)可以執(zhí)行的G;

6) 如果5)失敗,調(diào)用netpoll(阻塞)函數(shù)取一個(gè)異步回調(diào)的G;

7) 如果6)仍然沒(méi)有取到G,那么調(diào)用stopm函數(shù)停止這個(gè)M。

代碼如下:

// Finds a runnable goroutine to execute.// Tries to steal from other P's, get g from global queue, poll network.funcfindrunnable()(gp *g, inheritTimebool){? _g_ := getg()// The conditions here and in handoffp must agree: if// findrunnable would return a G to run, handoffp must start// an M.top:? _p_ := _g_.m.p.ptr()ifsched.gcwaiting !=0{? ? ? gcstopm()gototop? }if_p_.runSafePointFn !=0{? ? ? runSafePointFn()? }iffingwait && fingwake {ifgp := wakefing(); gp !=nil{? ? ? ? ready(gp,0,true)? ? ? }? }if*cgo_yield !=nil{? ? ? asmcgocall(*cgo_yield,nil)? }// local runqifgp, inheritTime := runqget(_p_); gp !=nil{returngp, inheritTime? }// global runqifsched.runqsize !=0{? ? ? lock(&sched.lock)? ? ? gp := globrunqget(_p_,0)? ? ? unlock(&sched.lock)ifgp !=nil{returngp,false}? }// Poll network.// This netpoll is only an optimization before we resort to stealing.// We can safely skip it if there are no waiters or a thread is blocked// in netpoll already. If there is any kind of logical race with that// blocked thread (e.g. it has already returned from netpoll, but does// not set lastpoll yet), this thread will do blocking netpoll below// anyway.ifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Load64(&sched.lastpoll) !=0{ifgp := netpoll(false); gp !=nil{// non-blocking// netpoll returns list of goroutines linked by schedlink.injectglist(gp.schedlink.ptr())? ? ? ? casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {? ? ? ? ? ? traceGoUnpark(gp,0)? ? ? ? }returngp,false}? }// Steal work from other P's.procs :=uint32(gomaxprocs)ifatomic.Load(&sched.npidle) == procs-1{// Either GOMAXPROCS=1 or everybody, except for us, is idle already.// New work can appear from returning syscall/cgocall, network or timers.// Neither of that submits to local run queues, so no point in stealing.gotostop? }// If number of spinning M's >= number of busy P's, block.// This is necessary to prevent excessive CPU consumption// when GOMAXPROCS>>1 but the program parallelism is low.if!_g_.m.spinning &&2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {gotostop? }if!_g_.m.spinning {? ? ? _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)? }fori :=0; i <4; i++ {forenum := stealOrder.start(fastrand()); !enum.done(); enum.next() {ifsched.gcwaiting !=0{gototop? ? ? ? }? ? ? ? stealRunNextG := i >2// first look for ready queues with more than 1 gifgp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp !=nil{returngp,false}? ? ? }? } stop:// We have nothing to do. If we're in the GC mark phase, can// safely scan and blacken objects, and have work to do, run// idle-time marking rather than give up the P.ifgcBlackenEnabled !=0&& _p_.gcBgMarkWorker !=0&& gcMarkWorkAvailable(_p_) {? ? ? _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode? ? ? gp := _p_.gcBgMarkWorker.ptr()? ? ? casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {? ? ? ? traceGoUnpark(gp,0)? ? ? }returngp,false}// Before we drop our P, make a snapshot of the allp slice,// which can change underfoot once we no longer block// safe-points. We don't need to snapshot the contents because// everything up to cap(allp) is immutable.allpSnapshot := allp// return P and blocklock(&sched.lock)ifsched.gcwaiting !=0|| _p_.runSafePointFn !=0{? ? ? unlock(&sched.lock)gototop? }ifsched.runqsize !=0{? ? ? gp := globrunqget(_p_,0)? ? ? unlock(&sched.lock)returngp,false}ifreleasep() != _p_ {? ? ? throw("findrunnable: wrong p")? }? pidleput(_p_)? unlock(&sched.lock)// Delicate dance: thread transitions from spinning to non-spinning state,// potentially concurrently with submission of new goroutines. We must// drop nmspinning first and then check all per-P queues again (with// #StoreLoad memory barrier in between). If we do it the other way around,// another thread can submit a goroutine after we've checked all run queues// but before we drop nmspinning; as the result nobody will unpark a thread// to run the goroutine.// If we discover new work below, we need to restore m.spinning as a signal// for resetspinning to unpark a new worker thread (because there can be more// than one starving goroutine). However, if after discovering new work// we also observe no idle Ps, it is OK to just park the current thread:// the system is fully loaded so no spinning threads are required.// Also see "Worker thread parking/unparking" comment at the top of the file.wasSpinning := _g_.m.spinningif_g_.m.spinning {? ? ? _g_.m.spinning =falseifint32(atomic.Xadd(&sched.nmspinning,-1)) <0{? ? ? ? throw("findrunnable: negative nmspinning")? ? ? }? }// check all runqueues once againfor_, _p_ :=rangeallpSnapshot {if!runqempty(_p_) {? ? ? ? lock(&sched.lock)? ? ? ? _p_ = pidleget()? ? ? ? unlock(&sched.lock)if_p_ !=nil{? ? ? ? ? ? acquirep(_p_)ifwasSpinning {? ? ? ? ? ? ? _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)? ? ? ? ? ? }gototop? ? ? ? }break}? }// Check for idle-priority GC work again.ifgcBlackenEnabled !=0&& gcMarkWorkAvailable(nil) {? ? ? lock(&sched.lock)? ? ? _p_ = pidleget()if_p_ !=nil&& _p_.gcBgMarkWorker ==0{? ? ? ? pidleput(_p_)? ? ? ? _p_ =nil}? ? ? unlock(&sched.lock)if_p_ !=nil{? ? ? ? acquirep(_p_)ifwasSpinning {? ? ? ? ? ? _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)? ? ? ? }// Go back to idle GC check.gotostop? ? ? }? }// poll networkifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Xchg64(&sched.lastpoll,0) !=0{if_g_.m.p !=0{? ? ? ? throw("findrunnable: netpoll with p")? ? ? }if_g_.m.spinning {? ? ? ? throw("findrunnable: netpoll with spinning")? ? ? }? ? ? gp := netpoll(true)// block until new work is availableatomic.Store64(&sched.lastpoll,uint64(nanotime()))ifgp !=nil{? ? ? ? lock(&sched.lock)? ? ? ? _p_ = pidleget()? ? ? ? unlock(&sched.lock)if_p_ !=nil{? ? ? ? ? ? acquirep(_p_)? ? ? ? ? ? injectglist(gp.schedlink.ptr())? ? ? ? ? ? casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {? ? ? ? ? ? ? traceGoUnpark(gp,0)? ? ? ? ? ? }returngp,false}? ? ? ? injectglist(gp)? ? ? }? }? stopm()gototop}

2.5.3 newproc函數(shù)

newproc函數(shù)負(fù)責(zé)創(chuàng)建一個(gè)可以運(yùn)行的G并將其放在當(dāng)前的P的runnable G隊(duì)列中,它是類似”go func() { … }”語(yǔ)句真正被編譯器翻譯后的調(diào)用,核心代碼在newproc1函數(shù)。這個(gè)函數(shù)執(zhí)行順序如下:

1) 獲得當(dāng)前的G所在的 P,然后從free G隊(duì)列中取出一個(gè)G;

2) 如果1)取到則對(duì)這個(gè)G進(jìn)行參數(shù)配置,否則新建一個(gè)G;

3) 將G加入P的runnable G隊(duì)列。

代碼如下:

// Go1.10.8版本默認(rèn)stack大小為2KB_StackMin =2048// 創(chuàng)建一個(gè)g對(duì)象,然后放到g隊(duì)列// 等待被執(zhí)行// Create a new g running fn with narg bytes of arguments starting// at argp. callerpc is the address of the go statement that created// this. The new g is put on the queue of g's waiting to run.funcnewproc1(fn *funcval, argp *uint8, nargint32, callerpcuintptr){? _g_ := getg()iffn ==nil{? ? ? _g_.m.throwing =-1// do not dump full stacksthrow("go of nil func value")? }? _g_.m.locks++// disable preemption because it can be holding p in a local varsiz := narg? siz = (siz +7) &^7// We could allocate a larger initial stack if necessary.// Not worth it: this is almost always an error.// 4*sizeof(uintreg): extra space added below// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).ifsiz >= _StackMin-4*sys.RegSize-sys.RegSize {? ? ? throw("newproc: function arguments too large for new goroutine")? }? ? _p_ := _g_.m.p.ptr()? newg := gfget(_p_)ifnewg ==nil{? ? ? newg = malg(_StackMin)? ? ? casgstatus(newg, _Gidle, _Gdead)? ? ? allgadd(newg)// publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.}ifnewg.stack.hi ==0{? ? ? throw("newproc1: newg missing stack")? }ifreadgstatus(newg) != _Gdead {? ? ? throw("newproc1: new g is not Gdead")? }? ? totalSize :=4*sys.RegSize +uintptr(siz) + sys.MinFrameSize// extra space in case of reads slightly beyond frametotalSize += -totalSize & (sys.SpAlign -1)// align to spAlignsp := newg.stack.hi - totalSize? spArg := spifusesLR {// caller's LR*(*uintptr)(unsafe.Pointer(sp)) =0prepGoExitFrame(sp)? ? ? spArg += sys.MinFrameSize? }ifnarg >0{? ? ? memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp),uintptr(narg))// This is a stack-to-stack copy. If write barriers// are enabled and the source stack is grey (the// destination is always black), then perform a// barrier copy. We do this *after* the memmove// because the destination stack may have garbage on// it.ifwriteBarrier.needed && !_g_.m.curg.gcscandone {? ? ? ? f := findfunc(fn.fn)? ? ? ? stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))// We're in the prologue, so it's always stack map index 0.bv := stackmapdata(stkmap,0)? ? ? ? bulkBarrierBitmap(spArg, spArg,uintptr(narg),0, bv.bytedata)? ? ? }? }? ? memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))? newg.sched.sp = sp? newg.stktopsp = sp? newg.sched.pc = funcPC(goexit) + sys.PCQuantum// +PCQuantum so that previous instruction is in same functionnewg.sched.g = guintptr(unsafe.Pointer(newg))? gostartcallfn(&newg.sched, fn)? newg.gopc = callerpc? newg.startpc = fn.fnif_g_.m.curg !=nil{? ? ? newg.labels = _g_.m.curg.labels? }ifisSystemGoroutine(newg) {? ? ? atomic.Xadd(&sched.ngsys, +1)? }? newg.gcscanvalid =falsecasgstatus(newg, _Gdead, _Grunnable)if_p_.goidcache == _p_.goidcacheend {// Sched.goidgen is the last allocated id,// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].// At startup sched.goidgen=0, so main goroutine receives goid=1._p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)? ? ? _p_.goidcache -= _GoidCacheBatch -1_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch? }? newg.goid =int64(_p_.goidcache)? _p_.goidcache++ifraceenabled {? ? ? newg.racectx = racegostart(callerpc)? }iftrace.enabled {? ? ? traceGoCreate(newg, newg.startpc)? }? runqput(_p_, newg,true)ifatomic.Load(&sched.npidle) !=0&& atomic.Load(&sched.nmspinning) ==0&& mainStarted {? ? ? wakep()? }? _g_.m.locks--if_g_.m.locks ==0&& _g_.preempt {// restore the preemption request in case we've cleared it in newstack_g_.stackguard0 = stackPreempt? }}

2.5.4 goexit0函數(shù)

goexit函數(shù)是當(dāng)G退出時(shí)調(diào)用的。這個(gè)函數(shù)對(duì)G進(jìn)行一些設(shè)置后,將它放入free G列表中,供以后復(fù)用,之后調(diào)用schedule函數(shù)調(diào)度。

// goexit continuation on g0.funcgoexit0(gp *g){? _g_ := getg()//設(shè)置g的 status從 _Grunning變?yōu)?_Gdeadcasgstatus(gp, _Grunning, _Gdead)ifisSystemGoroutine(gp) {? ? ? atomic.Xadd(&sched.ngsys,-1)? }//對(duì)該g 進(jìn)行釋放設(shè)置 基本為nil /0gp.m =nillocked := gp.lockedm !=0gp.lockedm =0_g_.m.lockedg =0gp.paniconfault =falsegp._defer =nil// should be true already but just in case.gp._panic =nil// non-nil for Goexit during panic. points at stack-allocated data.gp.writebuf =nilgp.waitreason =""gp.param =nilgp.labels =nilgp.timer =nilifgcBlackenEnabled !=0&& gp.gcAssistBytes >0{// Flush assist credit to the global pool. This gives// better information to pacing if the application is// rapidly creating an exiting goroutines.scanCredit :=int64(gcController.assistWorkPerByte *float64(gp.gcAssistBytes))? ? ? atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)? ? ? gp.gcAssistBytes =0}// Note that gp's stack scan is now "valid" because it has no// stack.gp.gcscanvalid =truedropg()if_g_.m.lockedInt !=0{print("invalid m->lockedInt = ", _g_.m.lockedInt,"\n")? ? ? throw("internal lockOSThread error")? }? _g_.m.lockedExt =0//把這個(gè)g 推到free G 列表gfput(_g_.m.p.ptr(), gp)iflocked {// The goroutine may have locked this thread because// it put it in an unusual kernel state. Kill it// rather than returning it to the thread pool.// Return to mstart, which will release the P and exit// the thread.ifGOOS !="plan9"{// See golang.org/issue/22227.gogo(&_g_.m.g0.sched)? ? ? }? }? schedule()}

2.5.5 handoffp函數(shù)

handoffp函數(shù)將P從系統(tǒng)調(diào)用或阻塞的M中傳遞出去,如果P還有runnable G隊(duì)列,那么新開一個(gè)M,調(diào)用startm函數(shù),新開的M不空旋。

// Hands off P from syscall or locked M.// Always runs without a P, so write barriers are not allowed.//go:nowritebarrierrecfunchandoffp(_p_ *p){// handoffp must start an M in any situation where// findrunnable would return a G to run on _p_.//如果這個(gè)P的隊(duì)列不為空或調(diào)度內(nèi)的size不為空 那么 進(jìn)行startm 且不空旋if!runqempty(_p_) || sched.runqsize !=0{? ? ? startm(_p_,false)return}//如果正在進(jìn)行GC處理? 同上ifgcBlackenEnabled !=0&& gcMarkWorkAvailable(_p_) {? ? ? startm(_p_,false)return}//如果沒(méi)活可做了,檢查下有沒(méi)有 空閑/自旋的 M//否則 不需要我們做自旋ifatomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) ==0&& atomic.Cas(&sched.nmspinning,0,1) {//TODO:fast atomicstartm(_p_,true)return}//調(diào)度上鎖? 將這個(gè)P 摘除走lock(&sched.lock)ifsched.gcwaiting !=0{? ? ? _p_.status = _Pgcstop? ? ? sched.stopwait--ifsched.stopwait ==0{? ? ? ? notewakeup(&sched.stopnote)? ? ? }? ? ? unlock(&sched.lock)return}if_p_.runSafePointFn !=0&& atomic.Cas(&_p_.runSafePointFn,1,0) {? ? ? sched.safePointFn(_p_)? ? ? sched.safePointWait--ifsched.safePointWait ==0{? ? ? ? notewakeup(&sched.safePointNote)? ? ? }? }ifsched.runqsize !=0{? ? ? unlock(&sched.lock)? ? ? startm(_p_,false)return}// If this is the last running P and nobody is polling network,// need to wakeup another M to poll network.ifsched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) !=0{? ? ? unlock(&sched.lock)? ? ? startm(_p_,false)return}? pidleput(_p_)? unlock(&sched.lock)}

2.5.6 startm函數(shù)

startm函數(shù)調(diào)度一個(gè)M或者必要時(shí)創(chuàng)建一個(gè)M來(lái)運(yùn)行指定的P。

// Schedules some M to run the p (creates an M if necessary).// If p==nil, tries to get an idle P, if no idle P's does nothing.// May run with m.p==nil, so write barriers are not allowed.// If spinning is set, the caller has incremented nmspinning and startm will// either decrement nmspinning or set m.spinning in the newly started M.//go:nowritebarrierrecfuncstartm(_p_ *p, spinning bool){//加鎖lock(&sched.lock)if_p_ ==nil{? ? ? ? ? ? _p_ = pidleget()if_p_ ==nil{? ? ? ? unlock(&sched.lock)ifspinning {// The caller incremented nmspinning, but there are no idle Ps,// so it's okay to just undo the increment and give up.ifint32(atomic.Xadd(&sched.nmspinning, -1)) <0{throw("startm: negative nmspinning")? ? ? ? ? ? }? ? ? ? }return}? }? ? ? mp := mget()? unlock(&sched.lock)ifmp ==nil{varfnfunc()ifspinning {// The caller incremented nmspinning, so set m.spinning in the new M.fn = mspinning? ? ? }? ? ? newm(fn, _p_)return}ifmp.spinning {throw("startm: m is spinning")? }ifmp.nextp !=0{throw("startm: m has p")? }ifspinning && !runqempty(_p_) {throw("startm: p has runnable gs")? }// The caller incremented nmspinning, so set m.spinning in the new M.mp.spinning = spinning? mp.nextp.set(_p_)? notewakeup(&mp.park)}

2.5.7 sysmon函數(shù)

sysmon函數(shù)是Go runtime啟動(dòng)時(shí)創(chuàng)建的,負(fù)責(zé)監(jiān)控所有g(shù)oroutine的狀態(tài),判斷是否需要GC,進(jìn)行netpoll等操作。sysmon函數(shù)中會(huì)調(diào)用retake函數(shù)進(jìn)行搶占式調(diào)度。

// Always runs without a P, so write barriers are not allowed.////go:nowritebarrierrecfuncsysmon(){? lock(&sched.lock)? sched.nmsys++? checkdead()? unlock(&sched.lock)// If a heap span goes unused for 5 minutes after a garbage collection,// we hand it back to the operating system.scavengelimit :=int64(5*60*1e9)ifdebug.scavenge >0{// Scavenge-a-lot for testing.forcegcperiod =10*1e6scavengelimit =20*1e6}? ? lastscavenge := nanotime()? nscavenge :=0lasttrace :=int64(0)? idle :=0// how many cycles in succession we had not wokeup somebodydelay :=uint32(0)for{ifidle ==0{// start with 20us sleep...delay =20}elseifidle >50{// start doubling the sleep after 1ms...delay *=2}ifdelay >10*1000{// up to 10msdelay =10*1000}? ? ? usleep(delay)ifdebug.schedtrace <=0&& (sched.gcwaiting !=0|| atomic.Load(&sched.npidle) ==uint32(gomaxprocs)) {? ? ? ? lock(&sched.lock)ifatomic.Load(&sched.gcwaiting) !=0|| atomic.Load(&sched.npidle) ==uint32(gomaxprocs) {? ? ? ? ? ? atomic.Store(&sched.sysmonwait,1)? ? ? ? ? ? unlock(&sched.lock)// Make wake-up period small enough// for the sampling to be correct.maxsleep := forcegcperiod /2ifscavengelimit < forcegcperiod {? ? ? ? ? ? ? maxsleep = scavengelimit /2}? ? ? ? ? ? shouldRelax :=trueifosRelaxMinNS >0{? ? ? ? ? ? ? next := timeSleepUntil()? ? ? ? ? ? ? now := nanotime()ifnext-now < osRelaxMinNS {? ? ? ? ? ? ? ? ? shouldRelax =false}? ? ? ? ? ? }ifshouldRelax {? ? ? ? ? ? ? osRelax(true)? ? ? ? ? ? }? ? ? ? ? ? notetsleep(&sched.sysmonnote, maxsleep)ifshouldRelax {? ? ? ? ? ? ? osRelax(false)? ? ? ? ? ? }? ? ? ? ? ? lock(&sched.lock)? ? ? ? ? ? atomic.Store(&sched.sysmonwait,0)? ? ? ? ? ? noteclear(&sched.sysmonnote)? ? ? ? ? ? idle =0delay =20}? ? ? ? unlock(&sched.lock)? ? ? }// trigger libc interceptors if neededif*cgo_yield !=nil{? ? ? ? asmcgocall(*cgo_yield,nil)? ? ? }// poll network if not polled for more than 10mslastpoll :=int64(atomic.Load64(&sched.lastpoll))? ? ? now := nanotime()ifnetpollinited() && lastpoll !=0&& lastpoll+10*1000*1000< now {? ? ? ? atomic.Cas64(&sched.lastpoll,uint64(lastpoll),uint64(now))? ? ? ? gp := netpoll(false)// non-blocking - returns list of goroutinesifgp !=nil{// Need to decrement number of idle locked M's// (pretending that one more is running) before injectglist.// Otherwise it can lead to the following situation:// injectglist grabs all P's but before it starts M's to run the P's,// another M returns from syscall, finishes running its G,// observes that there is no work to do and no other running M's// and reports deadlock.incidlelocked(-1)? ? ? ? ? ? injectglist(gp)? ? ? ? ? ? incidlelocked(1)? ? ? ? }? ? ? }// retake P's blocked in syscalls// and preempt long running G'sifretake(now) !=0{? ? ? ? idle =0}else{? ? ? ? idle++? ? ? }// check if we need to force a GCift := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) !=0{? ? ? ? lock(&forcegc.lock)? ? ? ? forcegc.idle =0forcegc.g.schedlink =0injectglist(forcegc.g)? ? ? ? unlock(&forcegc.lock)? ? ? }// scavenge heap once in a whileiflastscavenge+scavengelimit/2< now {? ? ? ? mheap_.scavenge(int32(nscavenge),uint64(now),uint64(scavengelimit))? ? ? ? lastscavenge = now? ? ? ? nscavenge++? ? ? }ifdebug.schedtrace >0&& lasttrace+int64(debug.schedtrace)*1000000<= now {? ? ? ? lasttrace = now? ? ? ? schedtrace(debug.scheddetail >0)? ? ? }? }}

2.5.8 retake函數(shù)

枚舉所有的P 如果P在系統(tǒng)調(diào)用中(_Psyscall), 且經(jīng)過(guò)了一次sysmon循環(huán)(20us~10ms), 則搶占這個(gè)P, 調(diào)用handoffp解除M和P之間的關(guān)聯(lián), 如果P在運(yùn)行中(_Prunning), 且經(jīng)過(guò)了一次sysmon循環(huán)并且G運(yùn)行時(shí)間超過(guò)forcePreemptNS(10ms), 則搶占這個(gè)P

并設(shè)置g.preempt = true,g.stackguard0 = stackPreempt。

為什么設(shè)置了stackguard就可以實(shí)現(xiàn)搶占?

因?yàn)檫@個(gè)值用于檢查當(dāng)前??臻g是否足夠, go函數(shù)的開頭會(huì)比對(duì)這個(gè)值判斷是否需要擴(kuò)張棧。

newstack函數(shù)判斷g.stackguard0等于stackPreempt, 就知道這是搶占觸發(fā)的, 這時(shí)會(huì)再檢查一遍是否要搶占。

搶占機(jī)制保證了不會(huì)有一個(gè)G長(zhǎng)時(shí)間的運(yùn)行導(dǎo)致其他G無(wú)法運(yùn)行的情況發(fā)生。

funcretake(nowint64)uint32{? n :=0// Prevent allp slice changes. This lock will be completely// uncontended unless we're already stopping the world.lock(&allpLock)// We can't use a range loop over allp because we may// temporarily drop the allpLock. Hence, we need to re-fetch// allp each time around the loop.fori :=0; i 0&& pd.syscallwhen+10*1000*1000> now {continue}// Drop allpLock so we can take sched.lock.unlock(&allpLock)// Need to decrement number of idle locked M's// (pretending that one more is running) before the CAS.// Otherwise the M from which we retake can exit the syscall,// increment nmidle and report deadlock.incidlelocked(-1)ifatomic.Cas(&_p_.status, s, _Pidle) {iftrace.enabled {? ? ? ? ? ? ? traceGoSysBlock(_p_)? ? ? ? ? ? ? traceProcStop(_p_)? ? ? ? ? ? }? ? ? ? ? ? n++? ? ? ? ? ? _p_.syscalltick++? ? ? ? ? ? handoffp(_p_)? ? ? ? }? ? ? ? incidlelocked(1)? ? ? ? lock(&allpLock)? ? ? }elseifs == _Prunning {// Preempt G if it's running for too long.t :=int64(_p_.schedtick)ifint64(pd.schedtick) != t {? ? ? ? ? ? pd.schedtick =uint32(t)? ? ? ? ? ? pd.schedwhen = nowcontinue}ifpd.schedwhen+forcePreemptNS > now {continue}? ? ? ? preemptone(_p_)? ? ? }? }? unlock(&allpLock)returnuint32(n)}

3、調(diào)度器總結(jié)

3.1 調(diào)度器的兩大思想

復(fù)用線程:協(xié)程本身就是運(yùn)行在一組線程之上,不需要頻繁的創(chuàng)建、銷毀線程,而是對(duì)線程的復(fù)用。在調(diào)度器中復(fù)用線程還有2個(gè)體現(xiàn):1)work stealing,當(dāng)本線程無(wú)可運(yùn)行的G時(shí),嘗試從其他線程綁定的P偷取G,而不是銷毀線程。2)handoff,當(dāng)本線程因?yàn)镚進(jìn)行系統(tǒng)調(diào)用阻塞時(shí),線程釋放綁定的P,把P轉(zhuǎn)移給其他空閑的線程執(zhí)行。

利用并行:GOMAXPROCS設(shè)置P的數(shù)量,當(dāng)GOMAXPROCS大于1時(shí),就最多有GOMAXPROCS個(gè)線程處于運(yùn)行狀態(tài),這些線程可能分布在多個(gè)CPU核上同時(shí)運(yùn)行,使得并發(fā)利用并行。另外,GOMAXPROCS也限制了并發(fā)的程度,比如GOMAXPROCS = 核數(shù)/2,則最多利用了一半的CPU核進(jìn)行并行。

3.2調(diào)度器的兩小策略:

搶占:在coroutine中要等待一個(gè)協(xié)程主動(dòng)讓出CPU才執(zhí)行下一個(gè)協(xié)程,在Go中,一個(gè)goroutine最多占用CPU 10ms,防止其他goroutine被餓死,這就是goroutine不同于coroutine的一個(gè)地方。

全局G隊(duì)列:在新的調(diào)度器中依然有全局G隊(duì)列,但功能已經(jīng)被弱化了,當(dāng)M執(zhí)行work stealing從其他P偷不到G時(shí),它可以從全局G隊(duì)列獲取G。

4、參考資料

Golang代碼倉(cāng)庫(kù):https://github.com/golang/go

《ScalableGo Schedule》:https://docs.google.com/docum...

《GoPreemptive Scheduler》:https://docs.google.com/docum...

網(wǎng)上文章:

https://studygolang.com/artic...

https://studygolang.com/artic...

https://studygolang.com/artic...

https://studygolang.com/artic...

https://studygolang.com/artic...?調(diào)度實(shí)例分析?

https://www.cnblogs.com/sunsk...?搶占式

https://blog.csdn.net/u010853...?schedule 剖析理解 分析的很到位--建議大家認(rèn)真閱讀幾遍-因?yàn)閳D形很形象。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 聽(tīng)起來(lái)是一件有趣的事。在滬語(yǔ)的黃金時(shí)期,民國(guó)上海老電影里,幾乎所有的演員都在說(shuō)普通話。甚至有些主角們的普通話聽(tīng)起來(lái)...
    神經(jīng)影FUN閱讀 1,410評(píng)論 0 0
  • 前言 iOS9之后的系統(tǒng)存在pdf展示中文的亂碼問(wèn)題,而且是部分pdf文檔,有些pdf文檔就不會(huì)出現(xiàn)亂碼現(xiàn)象。至于...
    霖溦閱讀 10,339評(píng)論 2 33
  • 也不知什么時(shí)候開始我就讀的初中宿舍那里多了一條通幽的路,我問(wèn)媽媽:“這路是什么時(shí)候有的啊,怎么我都不知道呢...
    陽(yáng)離閱讀 570評(píng)論 0 0

友情鏈接更多精彩內(nèi)容