深入分析go調(diào)度(二)

以下文章均為拜讀公眾號 源碼游記 的筆記 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect

1. Goroutine調(diào)度器

1. goroutine簡介

首先為什么Go語言為什么要引入?yún)f(xié)程Goroutine呢。

這主要是因?yàn)椴僮飨到y(tǒng)的線程太重了,具體表現(xiàn)在

  1. 創(chuàng)建和切換太重:所有的線程切換都需要系統(tǒng)調(diào)用進(jìn)入內(nèi)核,而進(jìn)入內(nèi)核的性能代價比較大
  2. 內(nèi)存使用太重:一方面,為了盡量避免極端情況下操作系統(tǒng)線程棧的溢出,內(nèi)核在創(chuàng)建操作系統(tǒng)線程時默認(rèn)會為其分配一個較大的棧內(nèi)存(虛擬地址空間,內(nèi)核并不會一開始就分配這么多的物理內(nèi)存),然而在絕大多數(shù)情況下,系統(tǒng)線程遠(yuǎn)遠(yuǎn)用不了這么多內(nèi)存,這導(dǎo)致了浪費(fèi);另一方面,棧內(nèi)存空間一旦創(chuàng)建和初始化完成之后其大小就不能再有變化,這決定了在某些特殊場景下系統(tǒng)線程棧還是有溢出的風(fēng)險(xiǎn)。

可以使用 ulimit -a命令查看本機(jī)的棧大小。如下圖,棧的代銷為8192kB=8MB。

image-20200512220534183

所有g(shù)o引進(jìn)了Goroutine,相比較舊特別輕量

  1. Goroutine是用戶態(tài)線程,創(chuàng)建和切換都可以再用戶態(tài)完成,無需進(jìn)入內(nèi)核,開銷比較小。
  2. Goroutine默認(rèn)大小是2k,相比操作系統(tǒng)線程的8M,簡直是太小了。不過雖然協(xié)程棧比較小,但是支持自動擴(kuò)容和收縮,所以也不用擔(dān)心內(nèi)存不夠用或者浪費(fèi)。

Goroutine的調(diào)度簡介

所謂對協(xié)程的調(diào)度,其實(shí)就是Go 按照一定的算法邏輯挑選出合適的 Goroutine并放到CPU上運(yùn)行的過程。

在go中 有專門負(fù)責(zé)調(diào)度的代碼,我們成為goroutine調(diào)度器。

大概邏輯如下:


for i:=0;i<N;i++{ // 創(chuàng)建N個操作系統(tǒng)執(zhí)行schedule函數(shù)
  create_os_thread(schedule)
}

func schedule(){
  for{
    g := find_a_runnable_g_from_M_gs()
    run_g(g)
    save_status_of_g(g)
  }
}

總結(jié):也就算程序開始創(chuàng)建N個線程去執(zhí)行schedule函數(shù)。

每個schedule函數(shù) 回去尋找一個可以執(zhí)行的g,然后執(zhí)行g(shù),在保存g的狀態(tài),循環(huán)往復(fù)。

調(diào)度器相關(guān)的數(shù)據(jù)結(jié)構(gòu)源碼概述

無他,所謂調(diào)度,本質(zhì)上都是對寄存器的一些操作。

只不過線程調(diào)度是在內(nèi)核態(tài),由操作系統(tǒng)完成。

而協(xié)程調(diào)度是在用戶態(tài),由go自己完成。

如上,go自己要完成對協(xié)程的調(diào)度。那么go就需要引入一個數(shù)據(jù)結(jié)構(gòu)來保存協(xié)程中的一些值(寄存器等)。

這就是g。

1. 當(dāng)協(xié)程被調(diào)離的時候,它的相關(guān)信息都保存在g中
2. 當(dāng)協(xié)程被恢復(fù)的時候,調(diào)度器又可以通過g將其狀態(tài)還原。

現(xiàn)在有了g,那么對g進(jìn)行調(diào)度就需要調(diào)度器。不過調(diào)度器本身也是需要保存自身狀態(tài)的,所以這就有了schedt結(jié)構(gòu)體。因?yàn)橐粋€go程序只有一個調(diào)度器,所以schedt全局唯一。

有了調(diào)度器,有了協(xié)程,我們還需要一個隊(duì)列來保存可以運(yùn)行的g,這就是局部運(yùn)行隊(duì)列??紤]到并發(fā),go為每個p都配了一個局部運(yùn)行隊(duì)列,他保存著一系列等待執(zhí)行的g.那么什么是p呢,它是processor的簡寫,是個掮客,協(xié)調(diào)著工作線程和g之間的關(guān)系。

除了這些,還有一個結(jié)構(gòu),它需要跟工作線程打交道,同時接收g,這就是m。每個工作線程都有唯一的一個m結(jié)構(gòu)體與之對應(yīng),m結(jié)構(gòu)體記錄著線程的的棧信息等。

除此,還有一個全局運(yùn)行隊(duì)列,它是一個保存在schedt中的運(yùn)行隊(duì)列。具體后面再說。

下面是上面幾種結(jié)構(gòu)的對應(yīng)關(guān)系:

<img src="http://picgo.vipkk.work/20200513211350.png" alt="image-20200513211350567" style="zoom:40%;" />

一個細(xì)節(jié)

前面我們提到m會跟工作線程綁定,但是我們有多個m,那么是如何做到區(qū)分的呢。記得上面的偽代碼嗎,我們調(diào)度的程序是一致的,那怎么區(qū)分呢。這就用到了前面提到的本地存儲,threadlocal。

具體結(jié)構(gòu)體代碼

TODO 因?yàn)樘嗖辉谶@里展示

2. 調(diào)度器初始化

本章將以一個hello world程序?yàn)槔ㄟ^跟蹤其從啟動到退出的這一完整的運(yùn)行流程來分析Go語言調(diào)度器的初始化、goroutine的創(chuàng)建與退出、工作線程的調(diào)度循環(huán)以及goroutine的切換等內(nèi)容。

package main

import "fmt"

func main(){
  fmt.Println("Hello World!")
}

這節(jié)我們主要來分析調(diào)度器的初始化。

任何一個編譯型語言(不管是C、C++、go)所編寫的程序在操作系統(tǒng)上運(yùn)行都有以下幾個階段

  1. 從磁盤上把可執(zhí)行程序讀入內(nèi)存
  2. 創(chuàng)建進(jìn)程和主線程
  3. 為主線程分配??臻g
  4. 把用戶在命令行輸入的參數(shù)拷貝到主線程的棧上
  5. 把主線程放在操作系統(tǒng)的運(yùn)行隊(duì)列等待被調(diào)度執(zhí)行起來。

在主線程第一次被調(diào)度起來之前,棧如下圖所示:

<img src="http://picgo.vipkk.work/20200513221220.png" alt="image-20200513221220322" style="zoom:50%;" />

接下來我們從代碼出發(fā)講解下 調(diào)度器中的各個數(shù)據(jù)結(jié)構(gòu)是怎么關(guān)聯(lián)起來的

程序入口

首先我們需要找到夢(代碼)開始的地方。

首先我們編譯程序,然后通過gdb調(diào)試go build -o hello;gdb hello,然后輸入info files查看調(diào)試信息

image-20200515215956835

可以發(fā)現(xiàn)入口在0x454b70,然后 在這里打斷點(diǎn)b *0x454b70,可以發(fā)現(xiàn)程序入口的地方。對應(yīng)代碼如下,暫時不接受。

以下代碼太多,建議結(jié)合源碼食用


// 夢開始的地方 runtime/rt0_linux_amd64.s:8
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
    JMP _rt0_amd64(SB)

TEXT _rt0_amd64(SB),NOSPLIT,$-8
    MOVQ    0(SP), DI   // argc
    LEAQ    8(SP), SI   // argv
    JMP runtime·rt0_go(SB)
    
// runtime/asm_amd64.s  
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    // copy arguments forward on an even stack
    MOVQ    DI, AX      // argc
    MOVQ    SI, BX      // argv
    SUBQ    $(4*8+7), SP        // 2args 2auto 16字節(jié)對齊
    ANDQ    $~15, SP
    MOVQ    AX, 16(SP)
    MOVQ    BX, 24(SP)

g0初始化

rt0_go中將要完成了我們程序啟動所有的的初始化工作。

上面代碼基本上就是將SP對齊,接著往下

// create istack out of the given (operating system) stack.
    // _cgo_init may update stackguard.
    // g0 -> DI
    MOVQ    $runtime·g0(SB), DI

    LEAQ    (-64*1024+104)(SP), BX  // BX = SP - 64 * 1024 + 104
    MOVQ    BX, g_stackguard0(DI)   // g0.stackguard0 =  SP - 64 * 1024 + 104
    MOVQ    BX, g_stackguard1(DI)   // g0.stackguard1 =  SP - 64 * 1024 + 104
    MOVQ    BX, (g_stack+stack_lo)(DI)  // g0.stack.lo = SP - 64 * 1024 + 104
    MOVQ    SP, (g_stack+stack_hi)(DI)  // g0.stack.l1 = SP

這里完成了對g0的初始化, 可見g0的棧大小為 64K。

此時g0與程序棧之間的關(guān)系如圖

<img src="http://picgo.vipkk.work/20200515221806.png" alt="image-20200515221806358" style="zoom:40%;" />

注意,g0是全局變量,所以這時候g0的內(nèi)存已經(jīng)申請好了,在全局變量區(qū)(data+bss).

上面圖的關(guān)系 我們可以通過gdb 打印下地址查看下

image-20200515222801728

從圖中可以看到 hi 指向了sp,其他亦然。

m0初始化&綁定OS線程

g0的棧好之后,我們來看下m0的初始化。

跳過CPU型號檢查和cgo初始化的代碼,往下看

    // 初始化m的tls。DI= &m0.tls, 取 m0的tls成員地址到DI寄存器
    LEAQ    runtime·m0+m_tls(SB), DI
    // 調(diào)用 settls設(shè)置 線程本地存儲,settls 函數(shù)的參數(shù)在DI寄存器中。 之后,可以通過fs斷寄存器找到 m.tls。
    CALL    runtime·settls(SB)

    // store through it, to make sure it works
    get_tls(BX) // 獲取fs段基址并放入BX寄存器, 其實(shí)就是 m0.tls[1] 的地址,get_tls的代碼由編譯器生成
    MOVQ    $0x123, g(BX) // set m0.tls[0] = 0x123, 也就算 fs地址-8的內(nèi)存位置。 g 代碼-8
    MOVQ    runtime·m0+m_tls(SB), AX // AX = m0.tls[0]
    CMPQ    AX, $0x123 // 比較
    JEQ 2(PC)
    CALL    runtime·abort(SB) // 如果線程本地存儲不能正常工作,退出程序

這段代碼調(diào)用 settls 來實(shí)例化 m0.tls, tls就是對應(yīng)之前提到thread local storage,目的是為了把m0這個數(shù)據(jù)結(jié)構(gòu)和線程關(guān)聯(lián)在一起,從而我們可以把m0當(dāng)做線程的抽象。

其中settls函數(shù)就是把fs寄存器的地址設(shè)置為了tls[1]的地址。

繼續(xù):

ok:
    // set the per-goroutine and per-mach "registers"
    get_tls(BX)
    LEAQ    runtime·g0(SB), CX // CX = g0
    MOVQ    CX, g(BX) // m0.tls[0]= g0
    LEAQ    runtime·m0(SB), AX // AX = &m0

  // 以下兩行代碼是把 m0和g0串了起來
    // save m->g0 = g0
    MOVQ    CX, m_g0(AX)
    // save m0 to g0->m
    MOVQ    AX, g_m(CX)

以上,在m0于主線程綁定之后,我們又通過指針把m0和g0關(guān)聯(lián)在了一起。此時內(nèi)存布局如下

image-20200516160536516

可以看到,內(nèi)存指針一致。

另外m0.tls[0] 存儲的也是g0地址,tls[1]的地址其實(shí)是fs基地址(這個可以參考之前的tls章節(jié))。

此時內(nèi)存如下圖:

<img src="http://picgo.vipkk.work/20200516172239.png" alt="image-20200516172239213" style="zoom:50%;" />

繼續(xù)

    MOVL    16(SP), AX      // copy argc
    MOVL    AX, 0(SP)
    MOVQ    24(SP), AX      // copy argv
    MOVQ    AX, 8(SP)
    CALL    runtime·args(SB)
    CALL    runtime·osinit(SB)
    CALL    runtime·schedinit(SB) // 調(diào)度器初始化

m0初始化完成之后,開始對調(diào)度器初始化

調(diào)度器初始化

在這里就是go 語言了

// The bootstrap sequence is:
//
//  call osinit
//  call schedinit
//  make & queue new G
//  call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
    // raceinit must be the first call to race detector.
    // In particular, it must be done before mallocinit below calls racemapshadow.
    // getg 是個什么鬼,getg 由編譯器實(shí)現(xiàn),大概原理就是 通過fs 找到 tls[1], 再-8找到g0,也就算當(dāng)前運(yùn)行的g
    _g_ := getg() // _g_=&g0
    //  ......

  //設(shè)置最多啟動10000個操作系統(tǒng)線程,也是最多10000個M
  sched.maxmcount = 10000

  // ......

  mcommoninit(_g_.m) //初始化m0,因?yàn)閺那懊娴拇a我們知道g0->m = &m0

  // ......

  sched.lastpoll = uint64(nanotime())
  procs := ncpu  //系統(tǒng)中有多少核,就創(chuàng)建和初始化多少個p結(jié)構(gòu)體對象
  if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
    procs = n //如果環(huán)境變量指定了GOMAXPROCS,則創(chuàng)建指定數(shù)量的p
  }
  if procresize(procs) != nil {//創(chuàng)建和初始化全局變量allp
    throw("unknown runnable goroutine during bootstrap")
  }

  // ......

}

以上我們對schedinit函數(shù)提取了下大綱。這次我們主要關(guān)注以下幾點(diǎn)

  1. getg()

    getg()是匯編實(shí)現(xiàn)的,大概就是從tls取出當(dāng)前運(yùn)行的g,在這里就是g0。

  2. 設(shè)置最多線程個數(shù)為10000個,(對應(yīng)內(nèi)存也就是80000M ~= 80G)

  3. mcommoninit

    1. 對m0
    2. 對p

mcommoninit

  1. 對m0做另外的一些初始化
  2. 創(chuàng)建和實(shí)例化p
func mcommoninit(mp *m) {
    _g_ := getg()

    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {
        callers(1, mp.createstack[:])
    }

    // 因?yàn)閟ched是一個全局變量,多個線程同時操作 sched會有并發(fā)問題,因此要先加鎖,操作結(jié)束后再解鎖。
    lock(&sched.lock)
    if sched.mnext+1 < sched.mnext {
        throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext // 給id賦值
    sched.mnext++       // m0的 id就是0,并且之后創(chuàng)建的m的id是遞增的。
    checkmcount()       // 檢查是否超過數(shù)量限制

    // random 初始化
    mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
    mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
    if mp.fastrand[0]|mp.fastrand[1] == 0 {
        mp.fastrand[1] = 1
    }

    // 創(chuàng)建用于信號處理的goroutine ,gsignal,只是簡單的從堆上分配一個g結(jié)構(gòu)體對象,然后把棧設(shè)置好久返回了
    mpreinit(mp)
    if mp.gsignal != nil {
        mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
    }

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    // 把m 掛入全局鏈表 allm之中
    // 類比 m0.next = allm allm = m0
    mp.alllink = allm

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
        mp.cgoCallers = new(cgoCallers)
    }
}

這里主要對m0做了以下幾個事情

  1. 賦值id
  2. 初始化m的隨機(jī)數(shù)
  3. 信號初始化(后續(xù)分析搶占調(diào)度的時候用得到)
  4. 把m0自己放到allm全局變量里。

此時 allm 指向m0

image-20200516175358260

procresize

// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
    old := gomaxprocs
    if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
    if trace.enabled {
        traceGomaxprocs(nprocs)
    }

    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    if nprocs > int32(len(allp)) { // 初始化的時候 len(allp) = 0,因?yàn)槟壳斑€沒有P
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)]) // 代碼中途調(diào)整p的數(shù)量?
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    // 循環(huán)創(chuàng)建 nproc個p,并完成基本初始化
    for i := old; i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p) // 調(diào)用內(nèi)存分配器從堆上分配一個 struct p
        }
        pp.init(i) // 設(shè)置p的id,mcache等。
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }

    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
        _g_.m.p.ptr().mcache.prepareForSweep()
    } else { // 初始化的時候執(zhí)行這個分支
        // release the current P and acquire allp[0].
        //
        // We must do this before destroying our current P
        // because p.destroy itself has write barriers, so we
        // need to do that from a valid P.
        if _g_.m.p != 0 {
            if trace.enabled {
                // Pretend that we were descheduled
                // and then scheduled again to keep
                // the trace sane.
                traceGoSched()
                traceProcStop(_g_.m.p.ptr())
            }
            _g_.m.p.ptr().m = 0
        }
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 把 p和m0關(guān)聯(lián)起來
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }

    // release resources from unused P's
    // 調(diào)整P的個數(shù)
    for i := nprocs; i < old; i++ {
        p := allp[i]
        p.destroy()
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs {
        lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }

    // 把所有空閑的P放入空閑鏈表
    // 它將除了p0的所有非空閑的P,放入P鏈表runnablePs,并返回procresize函數(shù)的調(diào)用者,
    // 并由其來調(diào)度這些P
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p { // allp[0] 跟m0 關(guān)聯(lián)了,所有不放入
            continue
        }
        p.status = _Pidle
        // 如果P 是空閑的,則放入 idle里
        if runqempty(p) { // 通過 runqempty 判斷p 的LRQ中是否包含G,
            pidleput(p) // 初始化除了allp[0]其他P 全部執(zhí)行這個分支,分入空閑鏈表
        } else {
            // ??
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    // 初始化一個隨機(jī)的分配器
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}

這個代碼干了以下幾個事情

  1. 初始化全局變量allp
  2. 填充allp
  3. m0和allp[0]綁定
  4. 除allp[0]之外的所有p放到 等待隊(duì)列中

allp是全局變量

并且會把m0和allp[0]關(guān)聯(lián)起來。

image-20200516181153374
image-20200516181924462

這里沒太搞明白內(nèi)存的對應(yīng)關(guān)系

至此,m0,g0和p就關(guān)聯(lián)在一起了。

如圖:

<img src="http://picgo.vipkk.work/20200516181939.png" alt="image-20200516181939265" style="zoom:50%;" />

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容