以下文章均為拜讀公眾號 源碼游記 的筆記 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect
3. Goroutine調(diào)度策略
前面提過,所謂的goroutine調(diào)度,是指程序代碼按照一定的算法找到合適的g并放到CPU上去運(yùn)行的過程。
所以有以下三個(gè)問題需要思考解決
- 調(diào)度時(shí)機(jī):什么時(shí)候發(fā)生調(diào)度
- 調(diào)度策略:選取哪個(gè)
g去調(diào)度 - 切換機(jī)制:怎么把
g放到CPU中(這就是我們說的gogo)
下面我們主要未然調(diào)度策略展開,看下我們是怎么選擇g的。
1. schedule的操作
之前我們看到schedule在調(diào)度循環(huán)中將一個(gè)g放到CPU上去運(yùn)行,但是在之前它還負(fù)責(zé)找到一個(gè)合適的g去調(diào)度。
這次我們主要關(guān)注那些調(diào)度算法
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg() //_g_ = m.g0
......
var gp *g
......
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
//為了保證調(diào)度的公平性,每個(gè)工作線程每進(jìn)行61次調(diào)度就需要優(yōu)先從全局運(yùn)行隊(duì)列中獲取goroutine出來運(yùn)行,
//因?yàn)槿绻徽{(diào)度本地運(yùn)行隊(duì)列中的goroutine,則全局運(yùn)行隊(duì)列中的goroutine有可能得不到運(yùn)行
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock) //所有工作線程都能訪問全局運(yùn)行隊(duì)列,所以需要加鎖
gp = globrunqget(_g_.m.p.ptr(), 1) //從全局運(yùn)行隊(duì)列中獲取1個(gè)goroutine
unlock(&sched.lock)
}
}
if gp == nil {
//從與m關(guān)聯(lián)的p的本地運(yùn)行隊(duì)列中獲取goroutine
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
//如果從本地運(yùn)行隊(duì)列和全局運(yùn)行隊(duì)列都沒有找到需要運(yùn)行的goroutine,
//則調(diào)用findrunnable函數(shù)從其它工作線程的運(yùn)行隊(duì)列中偷取,如果偷取不到,則當(dāng)前工作線程進(jìn)入睡眠,
//直到獲取到需要運(yùn)行的goroutine之后findrunnable函數(shù)才會返回。
gp, inheritTime = findrunnable() // blocks until work is available
}
......
//當(dāng)前運(yùn)行的是runtime的代碼,函數(shù)調(diào)用棧使用的是g0的棧空間
//調(diào)用execte切換到gp的代碼和棧空間去運(yùn)行
execute(gp, inheritTime)
}
由代碼可見,schedule函數(shù)大概分為了三步去尋找可運(yùn)行的g
三步查找
第一步
從全局隊(duì)列中尋找g。為了保證調(diào)度的公平性,每個(gè)工作線程每經(jīng)過61次調(diào)度就需要優(yōu)先嘗試從全局隊(duì)列中找出一個(gè)g來運(yùn)行,這樣保證每個(gè)g都有機(jī)會得到調(diào)用。
之前提過曲劇隊(duì)列是所有工作線程都可以訪問的,所以在訪問它之前需要加鎖。
第二步
從本地運(yùn)行隊(duì)列尋找g,這個(gè)是常態(tài),基本上都是從這里獲取。這個(gè)的前提是在上一步?jīng)]有取到g
第三步
從其他工作線程的運(yùn)行隊(duì)列中偷取g。
下面我們詳細(xì)看下這三步
2.全局運(yùn)行隊(duì)列獲取goroutine
下面我們來看下獲取g的函數(shù)globrunqget
// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 { // 全局運(yùn)行隊(duì)列為空,前面已經(jīng)加鎖了
return nil
}
// 以下為一些數(shù)量限制
n := sched.runqsize/gomaxprocs + 1 // 按照p數(shù)量均分
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max { // 不超過最大值
n = max
}
if n > int32(len(_p_.runq))/2 { // 不超過一半,如果runq為1個(gè)是不是永遠(yuǎn)取不走???
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n // 更新個(gè)數(shù)
gp := sched.runq.pop() // 至少取走一個(gè),
n--
for ; n > 0; n-- { // 循環(huán)取走,并放入本地運(yùn)行隊(duì)列
gp1 := sched.runq.pop() // 取
runqput(_p_, gp1, false) // 放
}
return gp // 把第一個(gè)return ,用做運(yùn)行
}
該函數(shù)的第一個(gè)參數(shù)當(dāng)前工作線程綁定的p,第二個(gè)參數(shù)是最多可以取多少個(gè)。
并且如果多于1個(gè),那么其他取出放入本地運(yùn)行隊(duì)列。
3. 本地運(yùn)行隊(duì)列中獲取goroutine
函數(shù)runqget
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
for {
next := _p_.runnext // runnext 高優(yōu)插隊(duì)者,如果有的話 優(yōu)先runnext TODO 查詢r(jià)unnext在什么條件下出現(xiàn)
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for {
// 獲取首尾
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
// 原子+1
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
這里需要注意的是取gp時(shí)候是加鎖的。
因?yàn)橛衅渌墓ぷ骶€程有可能也在訪問這2個(gè)成員,比如有人正在偷你的
g
對于語義
<img src="http://picgo.vipkk.work/20200523210047.png" alt="image-20200523210047055" style="zoom:50%;" />
CAS操作和ABA問題
CAS 操作會造成[ABA問題][ABA]
那么在runqget中會不會呢,答案是不會的,分析如下
對于runnext
- 只有runqget才會將其改為0,其他地方都只是將他改為非0值
- 任何時(shí)刻只有一個(gè)當(dāng)前工作線程可以執(zhí)行runqget,不會并發(fā),也就不存在其他人將其改為0
對于runq
- 只有當(dāng)前線程才會去往自己的隊(duì)列添加g,知會去偷g,從而修改runqhead
- 所以,當(dāng)我們這個(gè)工作線程從runqhead讀取到值A(chǔ)之后,其它工作線程也就不可能修改runqhead的值為B之后再第二次把它修改為值A(chǔ)(因?yàn)閞unqtail在這段時(shí)間之內(nèi)不可能被修改,runqhead的值也就無法越過runqtail再回繞到A值),也就是說,代碼從邏輯上已經(jīng)杜絕了引發(fā)ABA的條件。
4. 從其他線程偷取G
//如果從本地運(yùn)行隊(duì)列和全局運(yùn)行隊(duì)列都沒有找到需要運(yùn)行的goroutine, //則調(diào)用findrunnable函數(shù)從其它工作線程的運(yùn)行隊(duì)列中偷取,如果偷取不到,則當(dāng)前工作線程進(jìn)入睡眠, //直到獲取到需要運(yùn)行的goroutine之后findrunnable函數(shù)才會返回。 gp, inheritTime = findrunnable() // blocks until work is available
findrunnable函數(shù)負(fù)責(zé)處理與盜取相關(guān)的邏輯,另外該函數(shù)還負(fù)責(zé)gc和netpoll等相關(guān)的事情。目前我們先關(guān)注偷取算法。因?yàn)榇a太長我們分步驟來看
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
}
在偷取之前再看下本地和全局有沒有可運(yùn)行的隊(duì)列,有的話取出返回收工
// local runq 檢查本地隊(duì)列是否有可以運(yùn)行的
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq 檢查全局隊(duì)列是否有可以運(yùn)行的
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
接下來是一些netpoll相關(guān),跳過。
再然后是fastpath,快速失敗
需要注意fastpath的2個(gè)點(diǎn)
- m 沒有在自旋
- 自旋的個(gè)數(shù) 大于 work的個(gè)數(shù)。
2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle)
// Steal work from other P's.
procs := uint32(gomaxprocs)
ranTimer := false
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
// 這里的意思是 m還沒有開始尋找,但是已經(jīng)有超多一半的m在尋找了,那么它就不找了,這樣省CPU。
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
接著開始偷取,一共進(jìn)行4次嘗試。如果成功,則偷取一半并返回。當(dāng)進(jìn)行第3、4次嘗試的時(shí)候,也會嘗試偷取runnext。
這里的偷取為了公平是隨機(jī)選p的。不論位置還是下一個(gè)都是隨機(jī)的詳見[隨機(jī)偷取算法](# 隨機(jī)偷取算法)
for i := 0; i < 4; i++ {
// 隨機(jī)開始,隨機(jī)下一個(gè)
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 如果成功取出一半
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
}
}
當(dāng)找到的時(shí)候,通過runqsteal來偷取,
這個(gè)邏輯比較簡單,grab之后獲取到根據(jù)大小來跟新 runqhead & runqtail
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
接下來就是runqgrad這里負(fù)責(zé)了真正的偷取
grad 根據(jù)_p_的大小判斷偷取的一半,然后放進(jìn)batch里,再更新head&tail。 至此整個(gè)偷取算法完成。
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n == 0 {
...
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
continue
}
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
再回到findrunnable,如果四次都沒有成功則進(jìn)入stop模式(ps 忽略runtime,暫時(shí)沒看懂)
在這里首先將m與p解除綁定,然后準(zhǔn)備去休眠,然后把p放到空閑隊(duì)列,不過在結(jié)束之前進(jìn)行了最后一次判斷。
最后stopm
stop:
......
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
......
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 當(dāng)前工作線程解除與p之間的綁定,準(zhǔn)備去休眠
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
//把p放入空閑隊(duì)列
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
//m即將睡眠,狀態(tài)不再是spinning
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 休眠之前再看一下是否有工作要做
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
......
//休眠
stopm()
goto top
}
最后萬般無奈進(jìn)入stopm
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) //把m結(jié)構(gòu)體對象放入sched.midle空閑隊(duì)列
unlock(&sched.lock)
notesleep(&_g_.m.park) //進(jìn)入睡眠狀態(tài)
//被其它工作線程喚醒
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
stopm的主要操作就是將m放到 midle隊(duì)列里,然后notesleep 進(jìn)入睡眠狀態(tài)。note的是go的一次性睡眠和喚醒機(jī)制,具體參考note細(xì)節(jié)。
繼續(xù)當(dāng)notesleep返回后,繼續(xù)綁定p 然后開始新的循環(huán)調(diào)度。
至于notesleep,它是通過futex完成的sleep。
從前面的代碼我們已經(jīng)看到,stopm調(diào)用notesleep時(shí)給它傳遞的參數(shù)是m結(jié)構(gòu)體的park成員,而m又早已通過mput放入了全局的milde空閑隊(duì)列,這樣其它運(yùn)行著的線程一旦發(fā)現(xiàn)有更多的goroutine需要運(yùn)行時(shí)就可以通過全局的m空閑隊(duì)列找到處于睡眠狀態(tài)的m,然后調(diào)用notewakeup(&m.park)將其喚醒,至于怎么喚醒,我們在其它章節(jié)繼續(xù)討論.
補(bǔ)充
隨機(jī)偷取算法
type randomEnum struct {
i uint32
count uint32
pos uint32
inc uint32
}
func (ord *randomOrder) reset(count uint32) {
ord.count = count
ord.coprimes = ord.coprimes[:0]
for i := uint32(1); i <= count; i++ {
if gcd(i, count) == 1 {
ord.coprimes = append(ord.coprimes, i) // 找比count小的質(zhì)數(shù)作為incr
}
}
}
func (ord *randomOrder) start(i uint32) randomEnum {
return randomEnum{
count: ord.count,
pos: i % ord.count,
inc: ord.coprimes[i%uint32(len(ord.coprimes))], // 隨機(jī)選一個(gè)質(zhì)數(shù)
}
}
func (enum *randomEnum) done() bool {
return enum.i == enum.count
}
func (enum *randomEnum) next() {
enum.i++
enum.pos = (enum.pos + enum.inc) % enum.count // 隨機(jī)下一個(gè)
}
下面舉例說明一下上述算法過程,現(xiàn)假設(shè)nprocs為8,也就是一共有8個(gè)p。
如果第一次隨機(jī)選擇的offset = 6,coprime = 3(3與8互質(zhì),滿足算法要求)的話,則從allp切片中偷取的下標(biāo)順序?yàn)?, 1, 4, 7, 2, 5, 0, 3,計(jì)算過程:
6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3
如果第二次隨機(jī)選擇的offset = 4,coprime = 5的話,則從allp切片中偷取的下標(biāo)順序?yàn)?, 6, 3, 0, 5, 2, 7, 4,計(jì)算過程:
1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4
可以看到只要隨機(jī)數(shù)不一樣,偷取p的順序也不一樣,但可以保證經(jīng)過8次循環(huán),每個(gè)p都會被訪問到??梢杂脭?shù)論知識證明,不管nprocs是多少,這個(gè)算法都可以保證經(jīng)過nprocs次循環(huán),每個(gè)p都可以得到訪問。
公平公正
note細(xì)節(jié)
note是go runtime實(shí)現(xiàn)的一次性睡眠和喚醒機(jī)制,一個(gè)線程可以通過調(diào)用notesleep(*note)進(jìn)入睡眠狀態(tài),而另外一個(gè)線程則可以通過notewakeup(*note)把其喚醒。note的底層實(shí)現(xiàn)機(jī)制跟操作系統(tǒng)相關(guān),不同系統(tǒng)使用不同的機(jī)制,比如linux下使用的futex系統(tǒng)調(diào)用,而mac下則是使用的pthread_cond_t條件變量,note對這些底層機(jī)制做了一個(gè)抽象和封裝,這種封裝給擴(kuò)展性帶來了很大的好處,比如當(dāng)睡眠和喚醒功能需要支持新平臺時(shí),只需要在note層增加對特定平臺的支持即可,不需要修改上層的任何代碼。