以下文章均為拜讀公眾號 源碼游記 的筆記 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect
1. Goroutine調(diào)度器
1. goroutine簡介
首先為什么Go語言為什么要引入?yún)f(xié)程Goroutine呢。
這主要是因?yàn)椴僮飨到y(tǒng)的線程太重了,具體表現(xiàn)在
- 創(chuàng)建和切換太重:所有的線程切換都需要系統(tǒng)調(diào)用進(jìn)入內(nèi)核,而進(jìn)入內(nèi)核的性能代價比較大
- 內(nèi)存使用太重:一方面,為了盡量避免極端情況下操作系統(tǒng)線程棧的溢出,內(nèi)核在創(chuàng)建操作系統(tǒng)線程時默認(rèn)會為其分配一個較大的棧內(nèi)存(虛擬地址空間,內(nèi)核并不會一開始就分配這么多的物理內(nèi)存),然而在絕大多數(shù)情況下,系統(tǒng)線程遠(yuǎn)遠(yuǎn)用不了這么多內(nèi)存,這導(dǎo)致了浪費(fèi);另一方面,棧內(nèi)存空間一旦創(chuàng)建和初始化完成之后其大小就不能再有變化,這決定了在某些特殊場景下系統(tǒng)線程棧還是有溢出的風(fēng)險(xiǎn)。
可以使用
ulimit -a命令查看本機(jī)的棧大小。如下圖,棧的代銷為8192kB=8MB。image-20200512220534183
所有g(shù)o引進(jìn)了Goroutine,相比較舊特別輕量
- Goroutine是用戶態(tài)線程,創(chuàng)建和切換都可以再用戶態(tài)完成,無需進(jìn)入內(nèi)核,開銷比較小。
- Goroutine默認(rèn)大小是2k,相比操作系統(tǒng)線程的8M,簡直是太小了。不過雖然協(xié)程棧比較小,但是支持自動擴(kuò)容和收縮,所以也不用擔(dān)心內(nèi)存不夠用或者浪費(fèi)。
Goroutine的調(diào)度簡介
所謂對協(xié)程的調(diào)度,其實(shí)就是Go 按照一定的算法邏輯挑選出合適的 Goroutine并放到CPU上運(yùn)行的過程。
在go中 有專門負(fù)責(zé)調(diào)度的代碼,我們成為goroutine調(diào)度器。
大概邏輯如下:
for i:=0;i<N;i++{ // 創(chuàng)建N個操作系統(tǒng)執(zhí)行schedule函數(shù)
create_os_thread(schedule)
}
func schedule(){
for{
g := find_a_runnable_g_from_M_gs()
run_g(g)
save_status_of_g(g)
}
}
總結(jié):也就算程序開始創(chuàng)建N個線程去執(zhí)行schedule函數(shù)。
每個schedule函數(shù) 回去尋找一個可以執(zhí)行的g,然后執(zhí)行g(shù),在保存g的狀態(tài),循環(huán)往復(fù)。
調(diào)度器相關(guān)的數(shù)據(jù)結(jié)構(gòu)源碼概述
無他,所謂調(diào)度,本質(zhì)上都是對寄存器的一些操作。
只不過線程調(diào)度是在內(nèi)核態(tài),由操作系統(tǒng)完成。
而協(xié)程調(diào)度是在用戶態(tài),由go自己完成。
如上,go自己要完成對協(xié)程的調(diào)度。那么go就需要引入一個數(shù)據(jù)結(jié)構(gòu)來保存協(xié)程中的一些值(寄存器等)。
這就是g。
1. 當(dāng)協(xié)程被調(diào)離的時候,它的相關(guān)信息都保存在g中
2. 當(dāng)協(xié)程被恢復(fù)的時候,調(diào)度器又可以通過g將其狀態(tài)還原。
現(xiàn)在有了g,那么對g進(jìn)行調(diào)度就需要調(diào)度器。不過調(diào)度器本身也是需要保存自身狀態(tài)的,所以這就有了schedt結(jié)構(gòu)體。因?yàn)橐粋€go程序只有一個調(diào)度器,所以schedt全局唯一。
有了調(diào)度器,有了協(xié)程,我們還需要一個隊(duì)列來保存可以運(yùn)行的g,這就是局部運(yùn)行隊(duì)列??紤]到并發(fā),go為每個p都配了一個局部運(yùn)行隊(duì)列,他保存著一系列等待執(zhí)行的g.那么什么是p呢,它是processor的簡寫,是個掮客,協(xié)調(diào)著工作線程和g之間的關(guān)系。
除了這些,還有一個結(jié)構(gòu),它需要跟工作線程打交道,同時接收g,這就是m。每個工作線程都有唯一的一個m結(jié)構(gòu)體與之對應(yīng),m結(jié)構(gòu)體記錄著線程的的棧信息等。
除此,還有一個全局運(yùn)行隊(duì)列,它是一個保存在schedt中的運(yùn)行隊(duì)列。具體后面再說。
下面是上面幾種結(jié)構(gòu)的對應(yīng)關(guān)系:
<img src="http://picgo.vipkk.work/20200513211350.png" alt="image-20200513211350567" style="zoom:40%;" />
一個細(xì)節(jié)
前面我們提到m會跟工作線程綁定,但是我們有多個m,那么是如何做到區(qū)分的呢。記得上面的偽代碼嗎,我們調(diào)度的程序是一致的,那怎么區(qū)分呢。這就用到了前面提到的本地存儲,threadlocal。
具體結(jié)構(gòu)體代碼
TODO 因?yàn)樘嗖辉谶@里展示
2. 調(diào)度器初始化
本章將以一個hello world程序?yàn)槔ㄟ^跟蹤其從啟動到退出的這一完整的運(yùn)行流程來分析Go語言調(diào)度器的初始化、goroutine的創(chuàng)建與退出、工作線程的調(diào)度循環(huán)以及goroutine的切換等內(nèi)容。
package main import "fmt" func main(){ fmt.Println("Hello World!") }
這節(jié)我們主要來分析調(diào)度器的初始化。
任何一個編譯型語言(不管是C、C++、go)所編寫的程序在操作系統(tǒng)上運(yùn)行都有以下幾個階段
- 從磁盤上把可執(zhí)行程序讀入內(nèi)存
- 創(chuàng)建進(jìn)程和主線程
- 為主線程分配??臻g
- 把用戶在命令行輸入的參數(shù)拷貝到主線程的棧上
- 把主線程放在操作系統(tǒng)的運(yùn)行隊(duì)列等待被調(diào)度執(zhí)行起來。
在主線程第一次被調(diào)度起來之前,棧如下圖所示:
<img src="http://picgo.vipkk.work/20200513221220.png" alt="image-20200513221220322" style="zoom:50%;" />
接下來我們從代碼出發(fā)講解下 調(diào)度器中的各個數(shù)據(jù)結(jié)構(gòu)是怎么關(guān)聯(lián)起來的
程序入口
首先我們需要找到夢(代碼)開始的地方。
首先我們編譯程序,然后通過gdb調(diào)試go build -o hello;gdb hello,然后輸入info files查看調(diào)試信息

可以發(fā)現(xiàn)入口在0x454b70,然后 在這里打斷點(diǎn)b *0x454b70,可以發(fā)現(xiàn)程序入口的地方。對應(yīng)代碼如下,暫時不接受。
以下代碼太多,建議結(jié)合源碼食用
// 夢開始的地方 runtime/rt0_linux_amd64.s:8
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
JMP _rt0_amd64(SB)
TEXT _rt0_amd64(SB),NOSPLIT,$-8
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
// runtime/asm_amd64.s
TEXT runtime·rt0_go(SB),NOSPLIT,$0
// copy arguments forward on an even stack
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto 16字節(jié)對齊
ANDQ $~15, SP
MOVQ AX, 16(SP)
MOVQ BX, 24(SP)
g0初始化
在rt0_go中將要完成了我們程序啟動所有的的初始化工作。
上面代碼基本上就是將SP對齊,接著往下
// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
// g0 -> DI
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX // BX = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard0(DI) // g0.stackguard0 = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard1(DI) // g0.stackguard1 = SP - 64 * 1024 + 104
MOVQ BX, (g_stack+stack_lo)(DI) // g0.stack.lo = SP - 64 * 1024 + 104
MOVQ SP, (g_stack+stack_hi)(DI) // g0.stack.l1 = SP
這里完成了對g0的初始化, 可見g0的棧大小為 64K。
此時g0與程序棧之間的關(guān)系如圖
<img src="http://picgo.vipkk.work/20200515221806.png" alt="image-20200515221806358" style="zoom:40%;" />
注意,g0是全局變量,所以這時候g0的內(nèi)存已經(jīng)申請好了,在全局變量區(qū)(data+bss).
上面圖的關(guān)系 我們可以通過gdb 打印下地址查看下

從圖中可以看到 hi 指向了sp,其他亦然。
m0初始化&綁定OS線程
g0的棧好之后,我們來看下m0的初始化。
跳過CPU型號檢查和cgo初始化的代碼,往下看
// 初始化m的tls。DI= &m0.tls, 取 m0的tls成員地址到DI寄存器
LEAQ runtime·m0+m_tls(SB), DI
// 調(diào)用 settls設(shè)置 線程本地存儲,settls 函數(shù)的參數(shù)在DI寄存器中。 之后,可以通過fs斷寄存器找到 m.tls。
CALL runtime·settls(SB)
// store through it, to make sure it works
get_tls(BX) // 獲取fs段基址并放入BX寄存器, 其實(shí)就是 m0.tls[1] 的地址,get_tls的代碼由編譯器生成
MOVQ $0x123, g(BX) // set m0.tls[0] = 0x123, 也就算 fs地址-8的內(nèi)存位置。 g 代碼-8
MOVQ runtime·m0+m_tls(SB), AX // AX = m0.tls[0]
CMPQ AX, $0x123 // 比較
JEQ 2(PC)
CALL runtime·abort(SB) // 如果線程本地存儲不能正常工作,退出程序
這段代碼調(diào)用 settls 來實(shí)例化 m0.tls, tls就是對應(yīng)之前提到thread local storage,目的是為了把m0這個數(shù)據(jù)結(jié)構(gòu)和線程關(guān)聯(lián)在一起,從而我們可以把m0當(dāng)做線程的抽象。
其中settls函數(shù)就是把fs寄存器的地址設(shè)置為了tls[1]的地址。
繼續(xù):
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX // CX = g0
MOVQ CX, g(BX) // m0.tls[0]= g0
LEAQ runtime·m0(SB), AX // AX = &m0
// 以下兩行代碼是把 m0和g0串了起來
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
以上,在m0于主線程綁定之后,我們又通過指針把m0和g0關(guān)聯(lián)在了一起。此時內(nèi)存布局如下

可以看到,內(nèi)存指針一致。
另外m0.tls[0] 存儲的也是g0地址,tls[1]的地址其實(shí)是fs基地址(這個可以參考之前的tls章節(jié))。
此時內(nèi)存如下圖:
<img src="http://picgo.vipkk.work/20200516172239.png" alt="image-20200516172239213" style="zoom:50%;" />
繼續(xù)
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB) // 調(diào)度器初始化
m0初始化完成之后,開始對調(diào)度器初始化
調(diào)度器初始化
在這里就是go 語言了
// The bootstrap sequence is:
//
// call osinit
// call schedinit
// make & queue new G
// call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
// raceinit must be the first call to race detector.
// In particular, it must be done before mallocinit below calls racemapshadow.
// getg 是個什么鬼,getg 由編譯器實(shí)現(xiàn),大概原理就是 通過fs 找到 tls[1], 再-8找到g0,也就算當(dāng)前運(yùn)行的g
_g_ := getg() // _g_=&g0
// ......
//設(shè)置最多啟動10000個操作系統(tǒng)線程,也是最多10000個M
sched.maxmcount = 10000
// ......
mcommoninit(_g_.m) //初始化m0,因?yàn)閺那懊娴拇a我們知道g0->m = &m0
// ......
sched.lastpoll = uint64(nanotime())
procs := ncpu //系統(tǒng)中有多少核,就創(chuàng)建和初始化多少個p結(jié)構(gòu)體對象
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n //如果環(huán)境變量指定了GOMAXPROCS,則創(chuàng)建指定數(shù)量的p
}
if procresize(procs) != nil {//創(chuàng)建和初始化全局變量allp
throw("unknown runnable goroutine during bootstrap")
}
// ......
}
以上我們對schedinit函數(shù)提取了下大綱。這次我們主要關(guān)注以下幾點(diǎn)
-
getg()
getg()是匯編實(shí)現(xiàn)的,大概就是從tls取出當(dāng)前運(yùn)行的g,在這里就是g0。
設(shè)置最多線程個數(shù)為10000個,(對應(yīng)內(nèi)存也就是80000M ~= 80G)
-
mcommoninit
- 對m0
- 對p
mcommoninit
- 對m0做另外的一些初始化
- 創(chuàng)建和實(shí)例化p
func mcommoninit(mp *m) {
_g_ := getg()
// g0 stack won't make sense for user (and is not necessary unwindable).
if _g_ != _g_.m.g0 {
callers(1, mp.createstack[:])
}
// 因?yàn)閟ched是一個全局變量,多個線程同時操作 sched會有并發(fā)問題,因此要先加鎖,操作結(jié)束后再解鎖。
lock(&sched.lock)
if sched.mnext+1 < sched.mnext {
throw("runtime: thread ID overflow")
}
mp.id = sched.mnext // 給id賦值
sched.mnext++ // m0的 id就是0,并且之后創(chuàng)建的m的id是遞增的。
checkmcount() // 檢查是否超過數(shù)量限制
// random 初始化
mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
if mp.fastrand[0]|mp.fastrand[1] == 0 {
mp.fastrand[1] = 1
}
// 創(chuàng)建用于信號處理的goroutine ,gsignal,只是簡單的從堆上分配一個g結(jié)構(gòu)體對象,然后把棧設(shè)置好久返回了
mpreinit(mp)
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
}
// Add to allm so garbage collector doesn't free g->m
// when it is just in a register or thread-local storage.
// 把m 掛入全局鏈表 allm之中
// 類比 m0.next = allm allm = m0
mp.alllink = allm
// NumCgoCall() iterates over allm w/o schedlock,
// so we need to publish it safely.
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
// Allocate memory to hold a cgo traceback if the cgo call crashes.
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
mp.cgoCallers = new(cgoCallers)
}
}
這里主要對m0做了以下幾個事情
- 賦值id
- 初始化m的隨機(jī)數(shù)
- 信號初始化(后續(xù)分析搶占調(diào)度的時候用得到)
- 把m0自己放到allm全局變量里。
此時 allm 指向m0

procresize
// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// Grow allp if necessary.
if nprocs > int32(len(allp)) { // 初始化的時候 len(allp) = 0,因?yàn)槟壳斑€沒有P
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)]) // 代碼中途調(diào)整p的數(shù)量?
allp = nallp
}
unlock(&allpLock)
}
// initialize new P's
// 循環(huán)創(chuàng)建 nproc個p,并完成基本初始化
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p) // 調(diào)用內(nèi)存分配器從堆上分配一個 struct p
}
pp.init(i) // 設(shè)置p的id,mcache等。
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else { // 初始化的時候執(zhí)行這個分支
// release the current P and acquire allp[0].
//
// We must do this before destroying our current P
// because p.destroy itself has write barriers, so we
// need to do that from a valid P.
if _g_.m.p != 0 {
if trace.enabled {
// Pretend that we were descheduled
// and then scheduled again to keep
// the trace sane.
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
// 把 p和m0關(guān)聯(lián)起來
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
// release resources from unused P's
// 調(diào)整P的個數(shù)
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
// 把所有空閑的P放入空閑鏈表
// 它將除了p0的所有非空閑的P,放入P鏈表runnablePs,并返回procresize函數(shù)的調(diào)用者,
// 并由其來調(diào)度這些P
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p { // allp[0] 跟m0 關(guān)聯(lián)了,所有不放入
continue
}
p.status = _Pidle
// 如果P 是空閑的,則放入 idle里
if runqempty(p) { // 通過 runqempty 判斷p 的LRQ中是否包含G,
pidleput(p) // 初始化除了allp[0]其他P 全部執(zhí)行這個分支,分入空閑鏈表
} else {
// ??
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
// 初始化一個隨機(jī)的分配器
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}
這個代碼干了以下幾個事情
- 初始化全局變量allp
- 填充allp
- m0和allp[0]綁定
- 除allp[0]之外的所有p放到 等待隊(duì)列中
allp是全局變量
并且會把m0和allp[0]關(guān)聯(lián)起來。


這里沒太搞明白內(nèi)存的對應(yīng)關(guān)系
至此,m0,g0和p就關(guān)聯(lián)在一起了。
如圖:
<img src="http://picgo.vipkk.work/20200516181939.png" alt="image-20200516181939265" style="zoom:50%;" />
