golang 調(diào)度器

今天來講一下調(diào)度器,我本來寫了兩個(gè)版本,后面發(fā)現(xiàn)都好像不太好,其實(shí)核心差不太多,就是層次不夠清晰,然后在度娘上又啃了幾篇相關(guān)的文章,又進(jìn)行了綜合一下,文章末尾有引用的文章鏈接。不得不說,大佬們畫圖還是非常厲害的。其實(shí)突然在這個(gè)期間發(fā)現(xiàn)一些問題:就是markdown模式下的話容易讓人看不清重點(diǎn),最近在找替代簡書的地方,有推薦的可以留言推薦

(一)調(diào)度器的核心點(diǎn):

  • 1.復(fù)用線程
    避免頻繁的創(chuàng)建銷毀線程,我們知道線程的啟停銷毀是很耗費(fèi)性能的一件事情,我們就要reuse thread(線程復(fù)用) ,那么具體該怎么來處理。golang :”create threads when needed ;keep them around for reuse“,當(dāng)我們需要?jiǎng)?chuàng)建的時(shí)候就創(chuàng)建,然后保留它來復(fù)用,看過M結(jié)構(gòu)的人就知道M有個(gè)allM的字段就是保存所有的M的鏈表。
  • 2.利用并行
    設(shè)置GOMAXPROCS 來進(jìn)行設(shè)置go program的核心數(shù),程序的并行
  • 3.stealing working
    利用小偷算法,當(dāng)本地的隊(duì)列沒有g(shù)了,去別的地方偷一半的g進(jìn)行運(yùn)行,保證任務(wù)的公平性
  • 4.handoff
    利用移交算法,當(dāng)本線程因?yàn)橄到y(tǒng)調(diào)用進(jìn)行阻塞的時(shí)候,線程釋放綁定的P,把P給其他的M執(zhí)行

值得一說的是:Go1.1之前只有G-M模型,沒有P,Dmitry Vyukov在Scalable Go Scheduler Design Doc提出該模型在并發(fā)伸縮性方面的問題,并通過加入P(Processors)來改進(jìn)該問題。

(二)重要結(jié)構(gòu)體

G:goroutine

每次go調(diào)用的時(shí)候,都會(huì)創(chuàng)建一個(gè)G對(duì)象,它包括棧、指令指針以及對(duì)于調(diào)用goroutines很重要的其它信息,比如阻塞它的任何channel,其主要數(shù)據(jù)結(jié)構(gòu)

// Go1.11版本默認(rèn)stack大小為2KB

_StackMin = 2048
 
// 創(chuàng)建一個(gè)g對(duì)象,然后放到g隊(duì)列
// 等待被執(zhí)行
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
    _g_ := getg()

    _g_.m.locks++
    siz := narg
    siz = (siz + 7) &^ 7

    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)    
    if newg == nil {        
       // 初始化g stack大小
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg)
    }    
    // 以下省略}
M:mechine

代表一個(gè)線程,每次創(chuàng)建一個(gè)M的時(shí)候,都會(huì)有一個(gè)底層線程創(chuàng)建;所有的G任務(wù),最終還是在M上執(zhí)行,其主要數(shù)據(jù)結(jié)構(gòu)

type m struct {    
    /*
        1.  所有調(diào)用棧的Goroutine,這是一個(gè)比較特殊的Goroutine。
        2.  普通的Goroutine棧是在Heap分配的可增長的stack,而g0的stack是M對(duì)應(yīng)的線程棧。
        3.  所有調(diào)度相關(guān)代碼,會(huì)先切換到該Goroutine的棧再執(zhí)行。
    */
    g0       *g
    curg     *g         // M當(dāng)前綁定的結(jié)構(gòu)體G

    // SP、PC寄存器用于現(xiàn)場(chǎng)保護(hù)和現(xiàn)場(chǎng)恢復(fù)
    vdsoSP uintptr
    vdsoPC uintptr

    // 省略…}
P:Processor

代表一個(gè)處理器,每一個(gè)運(yùn)行的M都必須綁定一個(gè)P,就像線程必須在么一個(gè)CPU核上執(zhí)行一樣,由P來調(diào)度G在M上的運(yùn)行,P的個(gè)數(shù)就是GOMAXPROCS(最大256),啟動(dòng)時(shí)固定的,一般不修改;M的個(gè)數(shù)和P的個(gè)數(shù)不一定一樣多(會(huì)有休眠的M或者不需要太多的M)(最大10000);每一個(gè)P保存著本地G任務(wù)隊(duì)列,也有一個(gè)全局G任務(wù)隊(duì)列。P的數(shù)據(jù)結(jié)構(gòu)

// 自定義設(shè)置GOMAXPROCS數(shù)量
func GOMAXPROCS(n int) int {    
    /*
        1.  GOMAXPROCS設(shè)置可執(zhí)行的CPU的最大數(shù)量,同時(shí)返回之前的設(shè)置。
        2.  如果n < 1,則不更改當(dāng)前的值。
    */
    ret := int(gomaxprocs)

    stopTheWorld("GOMAXPROCS")    
    // startTheWorld啟動(dòng)時(shí),使用newprocs。
    newprocs = int32(n)
    startTheWorld()    
    return ret
}

// 默認(rèn)P被綁定到所有CPU核上
// P == cpu.cores

func getproccount() int32 {    
    const maxCPUs = 64 * 1024
    var buf [maxCPUs / 8]byte


    // 獲取CPU Core
    r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])

    n := int32(0)    
    for _, v := range buf[:r] {        
       for v != 0 {
            n += int32(v & 1)
            v >>= 1
        }
    }    
    if n == 0 {
       n = 1
    }    
    return n
}
// 一個(gè)進(jìn)程默認(rèn)被綁定在所有CPU核上,返回所有CPU core。
// 獲取進(jìn)程的CPU親和性掩碼系統(tǒng)調(diào)用
// rax 204                          ; 系統(tǒng)調(diào)用碼
// system_call sys_sched_getaffinity; 系統(tǒng)調(diào)用名稱
// rid  pid                         ; 進(jìn)程號(hào)
// rsi unsigned int len             
// rdx unsigned long *user_mask_ptr
sys_linux_amd64.s:
TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0
    MOVQ    pid+0(FP), DI
    MOVQ    len+8(FP), SI
    MOVQ    buf+16(FP), DX
    MOVL    $SYS_sched_getaffinity, AX
    SYSCALL
    MOVL    AX, ret+24(FP)
    RET

(三)調(diào)度過程

image
  • 我們通過 go func()來創(chuàng)建一個(gè)goroutine;g 的結(jié)構(gòu)是可復(fù)用的,對(duì)于可復(fù)用的g也是有l(wèi)ocal隊(duì)列和global隊(duì)列的,用:p.freeg 這個(gè)參數(shù),全局隊(duì)列就是sched.pfree,獲取參數(shù)都是差不多的,優(yōu)先從p.gfree中獲取,這一步是無鎖的,否者就從sched.pfree中獲取一部分過來這是有鎖的一個(gè)操作

  • 有兩個(gè)存儲(chǔ)G的隊(duì)列,一個(gè)是局部調(diào)度器P的本地隊(duì)列、一個(gè)是全局G隊(duì)列。新創(chuàng)建的G會(huì)優(yōu)先嘗試放到p的runnext中,作為下一個(gè)執(zhí)行G,如果不行就得放到我們的本地隊(duì)列中,如果P的本地隊(duì)列已經(jīng)滿了就會(huì)保存在全局的隊(duì)列中;

  • G只能運(yùn)行在M中,一個(gè)M必須持有一個(gè)P,M與P是1:1的關(guān)系。M會(huì)從P的本地隊(duì)列彈出一個(gè)可執(zhí)行狀態(tài)的G來
    執(zhí)行,如果P的本地隊(duì)列為空,就會(huì)想其他的MP組合偷取一個(gè)可執(zhí)行的G來執(zhí)行;

  • 一個(gè)M調(diào)度G執(zhí)行的過程是一個(gè)循環(huán)機(jī)制;

  • 當(dāng)M執(zhí)行某一個(gè)G時(shí)候如果發(fā)生了syscall或則其余阻塞操作,M會(huì)阻塞,如果當(dāng)前有一些G在執(zhí)行,runtime會(huì)把
    這個(gè)線程M從P中摘除(detach),然后再創(chuàng)建一個(gè)新的操作系統(tǒng)的線程(如果有空閑的線程可用就復(fù)用空閑線程)來
    服務(wù)于這個(gè)P;

  • 當(dāng)M系統(tǒng)調(diào)用結(jié)束時(shí)候,這個(gè)G會(huì)嘗試獲取一個(gè)空閑的P執(zhí)行,并放入到這個(gè)P的本地隊(duì)列。如果獲取不到P,
    那么這個(gè)線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個(gè)G會(huì)被放入全局隊(duì)列中

3.1G的幾種暫停方式
    1. gosched: 將當(dāng)前的G暫停,保存堆棧狀態(tài),以_GRunnable狀態(tài)放入Global隊(duì)列中,讓當(dāng)前M繼續(xù)執(zhí)行其它任務(wù)。無需對(duì)G進(jìn)行喚醒操作,因?yàn)榭倳?huì)有M從Global隊(duì)列取得并執(zhí)行該G。搶占調(diào)度即使用該方式
  • 2.gopark: 與goched的最大區(qū)別在于gopark沒有將G放回執(zhí)行隊(duì)列,而是位于某個(gè)等待隊(duì)列中(如channel的waitq,此時(shí)G狀態(tài)為_Gwaitting),因此G必須被手動(dòng)喚醒(通過goready),否則會(huì)丟失任務(wù)。應(yīng)用層阻塞通常使用這種方式。
  • 3.notesleep: 既不讓出M,也不讓G和P重新調(diào)度,直接讓線程休眠直到被喚醒(notewakeup),該方式更快,通常用于gcMark,stopm這類自旋場(chǎng)景
  • 4.notesleepg: 阻塞G和M,放飛P,P可以和其它M綁定繼續(xù)執(zhí)行,比如可能阻塞的系統(tǒng)調(diào)用會(huì)主動(dòng)調(diào)用entersyscallblock,則會(huì)觸發(fā) notesleepg
  • 5.goexit: 立即終止G任務(wù),不管其處于調(diào)用堆棧的哪個(gè)層次,在終止前,確保所有defer正確執(zhí)行。

(四)調(diào)度源碼


// go1.9.1  src/runtime/proc.go
// 省略了GC檢查等其它細(xì)節(jié),只保留了主要流程
// g:       G結(jié)構(gòu)體定義
// sched:   Global隊(duì)列
// 獲取一個(gè)待執(zhí)行的G
func findrunnable() (gp *g, inheritTime bool) {
    // 獲取當(dāng)前的G對(duì)象
    _g_ := getg()

top:
    // 獲取當(dāng)前P對(duì)象
    _p_ := _g_.m.p.ptr()

    // 1. 嘗試從P的Local隊(duì)列中取得G 優(yōu)先_p_.runnext 然后再從Local隊(duì)列中取
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // 2. 嘗試從Global隊(duì)列中取得G
    if sched.runqsize != 0 {
        lock(&sched.lock)
        // globrunqget從Global隊(duì)列中獲取G 并轉(zhuǎn)移一批G到_p_的Local隊(duì)列
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // 3. 檢查netpoll任務(wù):檢測(cè)是否存在M阻塞
    if netpollinited() && sched.lastpoll != 0 {
        if gp := netpoll(false); gp != nil { // non-blocking
            // netpoll返回的是G鏈表,將其它G放回Global隊(duì)列
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // 4. 嘗試從其它P竊取任務(wù)
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    for i := 0; i < 4; i++ {
        // 隨機(jī)P的遍歷順序
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // runqsteal執(zhí)行實(shí)際的steal工作,從目標(biāo)P的Local隊(duì)列轉(zhuǎn)移一般的G過來
            // stealRunNextG指是否steal目標(biāo)P的p.runnext G
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
    ...
}

4.1用戶態(tài)的阻塞

當(dāng)Goroutine因?yàn)镃hannel操作而阻塞(通過gopark)時(shí),對(duì)應(yīng)的G會(huì)被放置到某個(gè)wait隊(duì)列(如channel的waitq),該G的狀態(tài)由_Gruning變?yōu)開Gwaitting,而M會(huì)跳過該G嘗試獲取并執(zhí)行下一個(gè)G。

當(dāng)阻塞的G被G2喚醒(通過goready)時(shí)(比如channel可讀/寫),G會(huì)嘗試加入G2所在P的runnext,然后再是P Local隊(duì)列和Global隊(duì)列。簡單解釋一下:當(dāng)G是chan<-的接收消息,被阻塞了,如果G2是Chan的寫消息,當(dāng)G阻塞,G2寫入了一個(gè)數(shù)據(jù),那么G就被G2喚醒了,G就被放到了G2的P的runnext,如果放成功了,就是跳過了排隊(duì),然后執(zhí)行,如果失敗了就丟入local隊(duì)列

4.2系統(tǒng)調(diào)用阻塞:syscall

當(dāng)G被阻塞在某個(gè)系統(tǒng)調(diào)用上時(shí),此時(shí)G會(huì)阻塞在_Gsyscall狀態(tài),M也處于block on syscall狀態(tài),此時(shí)仍然可被搶占調(diào)度: 執(zhí)行該G的M會(huì)與P解綁,而P則嘗試與其它idle的M綁定,繼續(xù)執(zhí)行其它G。如果沒有其它idle的M,但隊(duì)列中仍然有G需要執(zhí)行,則創(chuàng)建一個(gè)新的M。

當(dāng)系統(tǒng)調(diào)用完成后,G會(huì)重新嘗試獲取一個(gè)idle的P,并恢復(fù)執(zhí)行,如果沒有idle的P,G將加入到Global隊(duì)列。

系統(tǒng)調(diào)用能被調(diào)度的關(guān)鍵有兩點(diǎn):

runtime/syscall包中,將系統(tǒng)調(diào)用分為SysCall和RawSysCall,前者和后者的區(qū)別是前者會(huì)在系統(tǒng)調(diào)用前后分別調(diào)用entersyscall和exitsyscall(位于src/runtime/proc.go),做一些現(xiàn)場(chǎng)保存和恢復(fù)操作,這樣才能使P安全地與M解綁,并在其它M上繼續(xù)執(zhí)行其它G。某些系統(tǒng)調(diào)用本身可以確定會(huì)長時(shí)間阻塞(比如鎖),會(huì)調(diào)用entersyscallblock在發(fā)起系統(tǒng)調(diào)用前直接讓P和M解綁(handoffp)。

4.3GMP的幾個(gè)狀態(tài)
  • P的幾個(gè)狀態(tài):

(五)sysmon

sysmon是一個(gè)由runtime啟動(dòng)的M,也叫監(jiān)控線程,它無需P也可以運(yùn)行,它每20us~10ms喚醒一次,主要執(zhí)行:

釋放閑置超過5分鐘的span物理內(nèi)存;
如果超過2分鐘沒有垃圾回收,強(qiáng)制執(zhí)行;
將長時(shí)間未處理的netpoll結(jié)果添加到任務(wù)隊(duì)列;
向長時(shí)間運(yùn)行的G任務(wù)發(fā)出搶占調(diào)度;
收回因syscall長時(shí)間阻塞的P;

入口在src/runtime/proc.go:sysmon函數(shù),它通過retake實(shí)現(xiàn)對(duì)syscall和長時(shí)間運(yùn)行的G進(jìn)行調(diào)度:

func retake(now int64) uint32 {
    n := 0
    for i := int32(0); i < gomaxprocs; i++ {
        _p_ := allp[i]
        if _p_ == nil {
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // 如果當(dāng)前P Local隊(duì)列沒有其它G,當(dāng)前有其它P處于Idle狀態(tài),并且syscall執(zhí)行事件不超過10ms,則不用解綁當(dāng)前P(handoffp)
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // handoffp
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_)
            }
            incidlelocked(1)
        } else if s == _Prunning {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            // 如果當(dāng)前G執(zhí)行時(shí)間超過10ms,則搶占(preemptone)
            if pd.schedwhen+forcePreemptNS > now {
                continue
            }
            // 執(zhí)行搶占
            preemptone(_p_)
        }
    }
    return uint32(n)
}

搶占式調(diào)度

當(dāng)某個(gè)goroutine執(zhí)行超過10ms,sysmon會(huì)向其發(fā)起搶占調(diào)度請(qǐng)求,由于Go調(diào)度不像OS調(diào)度那樣有時(shí)間片的概念,因此實(shí)際搶占機(jī)制要弱很多: Go中的搶占實(shí)際上是為G設(shè)置搶占標(biāo)記(g.stackguard0),當(dāng)G調(diào)用某函數(shù)時(shí)(更確切說,在通過newstack分配函數(shù)棧時(shí)),被編譯器安插的指令會(huì)檢查這個(gè)標(biāo)記,并且將當(dāng)前G以runtime.Goched的方式暫停,并加入到全局隊(duì)列。源代碼如下:

// src/runtime/stack.go
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
func newstack(ctxt unsafe.Pointer) {
    ...
    // gp為當(dāng)前G
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
    if preempt {
        ...

        // Act like goroutine called runtime.Gosched.
        // G狀態(tài)由_Gwaiting變?yōu)?_Grunning 這是為了能以Gosched的方式暫停Go
        casgstatus(gp, _Gwaiting, _Grunning)
        gopreempt_m(gp) // never return
    }
}

// 以goched的方式將G重新放入
func goschedImpl(gp *g) {
    status := readgstatus(gp)
    // 由Running變?yōu)镽unnable
    casgstatus(gp, _Grunning, _Grunnable)
    // 與M解除綁定
    dropg()
    lock(&sched.lock)
    // 將G放入Global隊(duì)列
    globrunqput(gp)
    unlock(&sched.lock)
    // 重新調(diào)度
    schedule()
}


func gopreempt_m(gp *g) {
    if trace.enabled {
        traceGoPreempt()
    }
    goschedImpl(gp)
}

netpoll

前面的findrunnable,G的獲取除了p.runnext,p.runq和sched.runq外,還有一中G從netpoll中獲取,netpoll是Go針對(duì)網(wǎng)絡(luò)IO的一種優(yōu)化,本質(zhì)上為了避免網(wǎng)絡(luò)IO陷入系統(tǒng)調(diào)用之中,這樣使得即便G發(fā)起網(wǎng)絡(luò)I/O操作也不會(huì)導(dǎo)致M被阻塞(僅阻塞G),從而不會(huì)導(dǎo)致大量M被創(chuàng)建出來。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 該文章主要詳細(xì)具體的介紹Goroutine調(diào)度器過程及原理,可以對(duì)Go調(diào)度器的詳細(xì)調(diào)度過程有一個(gè)清晰的理解,花 ...
    劉丹冰Aceld閱讀 11,933評(píng)論 5 44
  • 前言 隨著服務(wù)器硬件迭代升級(jí),配置也越來越高。為充分利用服務(wù)器資源,并發(fā)編程也變的越來越重要。在開始之前,需要了解...
    云爬蟲技術(shù)研究筆記閱讀 3,895評(píng)論 0 7
  • Go語言雖然使用一個(gè)Go關(guān)鍵字即可實(shí)現(xiàn)并發(fā)編程,但Goroutine被調(diào)度到后端之后,具體的實(shí)現(xiàn)比較復(fù)雜。先看看調(diào)...
    我愛張智容閱讀 898評(píng)論 0 0
  • 概要 本文從幾個(gè)角度入手,描述和學(xué)習(xí)調(diào)度器原理 講解調(diào)度器的基本概念 go語言的作者實(shí)現(xiàn)的C的協(xié)程庫 libtas...
    zengfan閱讀 6,721評(píng)論 0 21
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)、焦點(diǎn)、注意力、語言聯(lián)想、情景聯(lián)想 觀點(diǎn): 1.統(tǒng)計(jì)學(xué)現(xiàn)在叫數(shù)據(jù)分析,社會(huì)...
    Jenaral閱讀 6,037評(píng)論 0 5

友情鏈接更多精彩內(nèi)容