
image
針對Golang 1.9的sync.Mutex進(jìn)行分析,與Golang 1.10基本一樣除了將panic改為了throw之外其他的都一樣。
源代碼位置:sync\mutex.go。
可以看到注釋如下:
Mutex can be in 2 modes of operations: normal and starvation.
In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. New arriving goroutines have an advantage -- they are already running on CPU and there can be lots of them, so a woken up waiter has good chances of losing. In such case it is queued at front of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.
In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue. New arriving goroutines don't try to acquire the mutex even if it appears to be unlocked, and don't try to spin. Instead they queue themselves at the tail of the wait queue.
If a waiter receives ownership of the mutex and sees that either (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.
Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters.
Starvation mode is important to prevent pathological cases of tail latency.
博主英文很爛,就粗略翻譯一下,僅供參考:
互斥量可分為兩種操作模式:正常和饑餓。
在正常模式下,等待的goroutines按照FIFO(先進(jìn)先出)順序排隊(duì),但是goroutine被喚醒之后并不能立即得到mutex鎖,它需要與新到達(dá)的goroutine爭奪mutex鎖。
因?yàn)樾碌竭_(dá)的goroutine已經(jīng)在CPU上運(yùn)行了,所以被喚醒的goroutine很大概率是爭奪mutex鎖是失敗的。出現(xiàn)這樣的情況時(shí)候,被喚醒的goroutine需要排隊(duì)在隊(duì)列的前面。
如果被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那么它就會(huì)變?yōu)轲囸I模式。
在饑餓模式中,mutex鎖直接從解鎖的goroutine交給隊(duì)列前面的goroutine。新達(dá)到的goroutine也不會(huì)去爭奪mutex鎖(即使沒有鎖,也不能去自旋),而是到等待隊(duì)列尾部排隊(duì)。
在饑餓模式下,有一個(gè)goroutine獲取到mutex鎖了,如果它滿足下條件中的任意一個(gè),mutex將會(huì)切換回去正常模式:
1. 是等待隊(duì)列中的最后一個(gè)goroutine
2. 它的等待時(shí)間不超過1ms。
正常模式有更好的性能,因?yàn)間oroutine可以連續(xù)多次獲得mutex鎖;
饑餓模式對于預(yù)防隊(duì)列尾部goroutine一致無法獲取mutex鎖的問題。
看了這段解釋,那么基本的業(yè)務(wù)邏輯也就了解了,可以整理一下衣裝,準(zhǔn)備看代碼。
打開mutex.go看到如下代碼:
type Mutex struct {
state int32 // 將一個(gè)32位整數(shù)拆分為 當(dāng)前阻塞的goroutine數(shù)(29位)|饑餓狀態(tài)(1位)|喚醒狀態(tài)(1位)|鎖狀態(tài)(1位) 的形式,來簡化字段設(shè)計(jì)
sema uint32 // 信號量
}
const (
mutexLocked = 1 << iota // 1 0001 含義:用最后一位表示當(dāng)前對象鎖的狀態(tài),0-未鎖住 1-已鎖住
mutexWoken // 2 0010 含義:用倒數(shù)第二位表示當(dāng)前對象是否被喚醒 0-喚醒 1-未喚醒
mutexStarving // 4 0100 含義:用倒數(shù)第三位表示當(dāng)前對象是否為饑餓模式,0為正常模式,1為饑餓模式。
mutexWaiterShift = iota // 3,從倒數(shù)第四位往前的bit位表示在排隊(duì)等待的goroutine數(shù)
starvationThresholdNs = 1e6 // 1ms
)
可以看到Mutex中含有:
一個(gè)非負(fù)數(shù)信號量sema;
state表示Mutex的狀態(tài)。
常量:
mutexLocked表示鎖是否可用(0可用,1被別的goroutine占用)
mutexWoken=2表示mutex是否被喚醒
mutexWaiterShift=4表示統(tǒng)計(jì)阻塞在該mutex上的goroutine數(shù)目需要移位的數(shù)值。
將3個(gè)常量映射到state上就是
state: |32|31|...| |3|2|1|
\__________/ | | |
| | | |
| | | mutex的占用狀態(tài)(1被占用,0可用)
| | |
| | mutex的當(dāng)前goroutine是否被喚醒
| |
| 饑餓位,0正常,1饑餓
|
等待喚醒以嘗試鎖定的goroutine的計(jì)數(shù),0表示沒有等待者
如果同學(xué)們熟悉Java的鎖,就會(huì)發(fā)現(xiàn)與AQS的設(shè)計(jì)是類似,只是沒有AQS設(shè)計(jì)的那么精致,不得不感嘆,JAVA的牛逼。
有同學(xué)是否會(huì)有疑問為什么使用的是int32而不是int64呢,因?yàn)?2位原子性操作更好,當(dāng)然也滿足的需求。
Mutex在1.9版本中就兩個(gè)函數(shù)Lock()和Unlock()。
下面我們先來分析最難的Lock()函數(shù):
func (m *Mutex) Lock() {
// 如果m.state=0,說明當(dāng)前的對象還沒有被鎖住,進(jìn)行原子性賦值操作設(shè)置為mutexLocked狀態(tài),CompareAnSwapInt32返回true
// 否則說明對象已被其他goroutine鎖住,不會(huì)進(jìn)行原子賦值操作設(shè)置,CopareAndSwapInt32返回false
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 開始等待時(shí)間戳
var waitStartTime int64
// 饑餓模式標(biāo)識
starving := false
// 喚醒標(biāo)識
awoke := false
// 自旋次數(shù)
iter := 0
// 保存當(dāng)前對象鎖狀態(tài)
old := m.state
// 看到這個(gè)for {}說明使用了cas算法
for {
// 相當(dāng)于xxxx...x0xx & 0101 = 01,當(dāng)前對象鎖被使用
if old&(mutexLocked|mutexStarving) == mutexLocked &&
// 判斷當(dāng)前goroutine是否可以進(jìn)入自旋鎖
runtime_canSpin(iter) {
// 主動(dòng)旋轉(zhuǎn)是有意義的。試著設(shè)置mutexwake標(biāo)志,告知解鎖,不要喚醒其他阻塞的goroutines。
if !awoke &&
// 再次確定是否被喚醒: xxxx...xx0x & 0010 = 0
old&mutexWoken == 0 &&
// 查看是否有g(shù)oroution在排隊(duì)
old>>mutexWaiterShift != 0 &&
// 將對象鎖改為喚醒狀態(tài):xxxx...xx0x | 0010 = xxxx...xx1x
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}//END_IF_Lock
// 進(jìn)入自旋鎖后當(dāng)前goroutine并不掛起,仍然在占用cpu資源,所以重試一定次數(shù)后,不會(huì)再進(jìn)入自旋鎖邏輯
runtime_doSpin()
// 自加,表示自旋次數(shù)
iter++
// 保存mutex對象即將被設(shè)置成的狀態(tài)
old = m.state
continue
}// END_IF_spin
// 以下代碼是不使用**自旋**的情況
new := old
// 不要試圖獲得饑餓的互斥,新來的goroutines必須排隊(duì)。
// 對象鎖饑餓位被改變,說明處于饑餓模式
// xxxx...x0xx & 0100 = 0xxxx...x0xx
if old&mutexStarving == 0 {
// xxxx...x0xx | 0001 = xxxx...x0x1,標(biāo)識對象鎖被鎖住
new |= mutexLocked
}
// xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;當(dāng)前mutex處于饑餓模式并且鎖已被占用,新加入進(jìn)來的goroutine放到隊(duì)列后面
if old&(mutexLocked|mutexStarving) != 0 {
// 更新阻塞goroutine的數(shù)量,表示mutex的等待goroutine數(shù)目加1
new += 1 << mutexWaiterShift
}
// 當(dāng)前的goroutine將互斥鎖轉(zhuǎn)換為饑餓模式。但是,如果互斥鎖當(dāng)前沒有解鎖,就不要打開開關(guān),設(shè)置mutex狀態(tài)為饑餓模式。Unlock預(yù)期有饑餓的goroutine
if starving &&
// xxxx...xxx1 & 0001 != 0;鎖已經(jīng)被占用
old&mutexLocked != 0 {
// xxxx...xxx | 0101 => xxxx...x1x1,標(biāo)識對象鎖被鎖住
new |= mutexStarving
}
// goroutine已經(jīng)被喚醒,因此需要在兩種情況下重設(shè)標(biāo)志
if awoke {
// xxxx...xx1x & 0010 = 0,如果喚醒標(biāo)志為與awoke不相協(xié)調(diào)就panic
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
// new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :設(shè)置喚醒狀態(tài)位0,被喚醒
new &^= mutexWoken
}
// 獲取鎖成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// xxxx...x0x0 & 0101 = 0,已經(jīng)獲取對象鎖
if old&(mutexLocked|mutexStarving) == 0 {
// 結(jié)束cas
break
}
// 以下的操作都是為了判斷是否從饑餓模式中恢復(fù)為正常模式
// 判斷處于FIFO還是LIFO模式
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// xxxx...x1xx & 0100 != 0
if old&mutexStarving != 0 {
// xxxx...xx11 & 0011 != 0
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
// 保存mutex對象狀態(tài)
old = m.state
}
}// cas結(jié)束
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
看了Lock()函數(shù)之后是不是覺得一片懵逼狀態(tài),告訴大家一個(gè)方法,看Lock()函數(shù)時(shí)候需要想著如何Unlock。下面就開始看看Unlock()函數(shù)。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// state-1標(biāo)識解鎖
new := atomic.AddInt32(&m.state, -mutexLocked)
// 驗(yàn)證鎖狀態(tài)是否符合
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
// xxxx...x0xx & 0100 = 0 ;判斷是否處于正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有等待的goroutine或goroutine已經(jīng)解鎖完成
if old>>mutexWaiterShift == 0 ||
// xxxx...x0xx & (0001 | 0010 | 0100) => xxxx...x0xx & 0111 != 0
old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// 饑餓模式:將mutex所有權(quán)移交給下一個(gè)等待的goroutine
// 注意:mutexlock沒有設(shè)置,goroutine會(huì)在喚醒后設(shè)置。
// 但是互斥鎖仍然被認(rèn)為是鎖定的,如果互斥對象被設(shè)置,所以新來的goroutines不會(huì)得到它
runtime_Semrelease(&m.sema, true)
}
}

image