歡迎訪問我的博客,同步更新: 楓山別院
源代碼版本2.4.5-SNAPSHOT
大家好,今天我們一起分析下 HikariCP 的核心ConcurrentBag,它是管理連接池的最重要的核心類。從它的名字大家可以看得出來,它是一個(gè)并發(fā)管理類,性能非常好,這是它性能甩其他連接池十條街的秘密所在。
代碼概覽
我們先看一下代碼,注意這不是全部的代碼,省略了不太重要的部分。大家可以看到我加了非常詳細(xì)的注釋,對(duì)詳解不太感興趣的朋友可以直接讀一下代碼即可,不過這部分歷時(shí)好幾個(gè)夜晚我才寫完,大家可以稍稍捧個(gè)場(chǎng):
//可用連接同步器, 用于線程間空閑連接數(shù)的通知, synchronizer.currentSequence()方法可以獲取當(dāng)前數(shù)量
//其實(shí)就是一個(gè)計(jì)數(shù)器, 連接池中創(chuàng)建了一個(gè)連接或者還回了一個(gè)連接就 + 1, 但是連接池的連接被借走, 是不會(huì) -1 的, 只加不減
//用于在線程從連接池中獲取連接時(shí), 查詢是否有空閑連接添加到連接池, 詳見borrow方法
private final QueuedSequenceSynchronizer synchronizer;
//sharedList保存了所有的連接
private final CopyOnWriteArrayList<T> sharedList;
//threadList可能會(huì)保存sharedList中連接的引用
private final ThreadLocal<List<Object>> threadList;
//對(duì)HikariPool的引用, 用于請(qǐng)求創(chuàng)建新連接
private final IBagStateListener listener;
//當(dāng)前等待獲取連接的線程數(shù)
private final AtomicInteger waiters;
//標(biāo)記連接池是否關(guān)閉的狀態(tài)
private volatile boolean closed;
/**
* 該方法會(huì)從連接池中獲取連接, 如果沒有連接可用, 會(huì)一直等待timeout超時(shí)
*
* @param timeout 超時(shí)時(shí)間
* @param timeUnit 時(shí)間單位
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
//①
//先嘗試從ThreadLocal中獲取
List<Object> list = threadList.get();
if (weakThreadLocals && list == null) {
//如果ThreadLocal是 null, 就初始化, 防止后面 npe
list = new ArrayList<>(16);
threadList.set(list);
}
//②
//如果ThreadLocal中有連接的話, 就遍歷, 嘗試獲取
//從后往前反向遍歷是有好處的, 因?yàn)樽詈笠淮问褂玫倪B接, 空閑的可能性比較大, 之前的連接可能會(huì)被其他線程偷竊走了
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
//③
//如果沒有從ThreadLocal中獲取到連接, 那么就sharedList連接池中遍歷, 獲取連接, timeout時(shí)間后超時(shí)
//因?yàn)門hreadLocal中保存的連接是當(dāng)前線程使用過的, 才會(huì)在ThreadLocal中保留引用, 連接池中可能還有其他空閑的連接, 所以要遍歷連接池
//看一下requite(final T bagEntry)方法的實(shí)現(xiàn), 還回去的連接放到了ThreadLocal中
timeout = timeUnit.toNanos(timeout);
Future<Boolean> addItemFuture = null;
//記錄從連接池獲取連接的開始時(shí)間, 后面用
final long startScan = System.nanoTime();
final long originTimeout = timeout;
long startSeq;
//將等待連接的線程計(jì)數(shù)器加 1
waiters.incrementAndGet();
try {
do {
// scan the shared list
do {
//④
//當(dāng)前連接池中的連接數(shù), 在連接池中添加新連接的時(shí)候, 該值會(huì)增加
startSeq = synchronizer.currentSequence();
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// if we might have stolen another thread's new connection, restart the add...
//⑤
//如果waiters大于 1, 說明除了當(dāng)前線程之外, 還有其他線程在等待空閑連接
//這里, 當(dāng)前線程的addItemFuture是 null, 說明自己沒有請(qǐng)求創(chuàng)建新連接, 但是拿到了連接, 這就說明是拿到了其他線程請(qǐng)求創(chuàng)建的連接, 這就是所謂的偷竊了其他線程的連接, 然后當(dāng)前線程請(qǐng)求創(chuàng)建一個(gè)新連接, 補(bǔ)償給其他線程
if (waiters.get() > 1 && addItemFuture == null) {
//提交一個(gè)異步添加新連接的任務(wù)
listener.addBagItem();
}
return bagEntry;
}
}
} while (startSeq < synchronizer.currentSequence()); //如果連接池中的空閑連接數(shù)量比循環(huán)之前多了, 說明有新連接加入, 繼續(xù)循環(huán)獲取
//⑥
//循環(huán)完一遍連接池(也可能循環(huán)多次, 如果正好在第一次循環(huán)完連接池后有新連接加入, 那么會(huì)繼續(xù)循環(huán)), 還是沒有能拿到空閑連接, 就請(qǐng)求創(chuàng)建新的連接
if (addItemFuture == null || addItemFuture.isDone()) {
addItemFuture = listener.addBagItem();
}
//計(jì)算 剩余的超時(shí)時(shí)間 = 用戶設(shè)置的connectionTimeout - (系統(tǒng)當(dāng)前時(shí)間 - 開始獲取連接的時(shí)間_代碼①處 即從連接池中獲取連接一共使用的時(shí)間)
timeout = originTimeout - (System.nanoTime() - startScan);
} while (timeout > 10_000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout)); //③
//⑦
//這里的循環(huán)條件比較復(fù)雜
//1. 如果剩余的超時(shí)時(shí)間, 大于10_000納秒
//2. startSeq的數(shù)量, 即空閑連接數(shù)超過循環(huán)之前的數(shù)量
//3. 沒有超過超時(shí)時(shí)間timeout
//滿足以上 3 個(gè)條件才會(huì)繼續(xù)循環(huán), 否則阻塞線程, 直到滿足以上條件
//如果一直等到timeout超時(shí)時(shí)間用完都沒有滿足條件, 結(jié)束阻塞, 往下走
//有可能會(huì)動(dòng)態(tài)改變的條件, 只有startSeq數(shù)量改變, 是②處添加的創(chuàng)建連接請(qǐng)求
} finally {
waiters.decrementAndGet();
}
return null;
}
/**
* 該方法將借出去的連接還回到連接池中
* 不通過該方法還回的連接會(huì)造成內(nèi)存泄露
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the requited value was not borrowed from the bag
*/
public void requite(final T bagEntry) {
//⑧
//lazySet方法不能保證連接會(huì)立刻被設(shè)置成可用狀態(tài), 這是個(gè)延遲方法
//這是一種優(yōu)化, 如果要立即生效的話, 可能會(huì)需要使用volatile等, 讓其他線程立即發(fā)現(xiàn), 這會(huì)降低性能, 使用lazySet浪費(fèi)不了多少時(shí)間, 但是不會(huì)浪費(fèi)性能
bagEntry.lazySet(STATE_NOT_IN_USE);
//⑨
//將連接放回到threadLocal中
final List<Object> threadLocalList = threadList.get();
if (threadLocalList != null) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
//通知等待線程, 有可用連接
synchronizer.signal();
}
/**
* 在連接池中添加一個(gè)連接
* 新連接都是添加到sharedList中, threadList是sharedList中的部分連接的引用
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
//⑩
sharedList.add(bagEntry);
synchronizer.signal();
}
/**
* 從連接池中移除一個(gè)連接.
* 這個(gè)方法只能用于從<code>borrow(long, TimeUnit)</code> 或者 <code>reserve(T)</code>方法中獲取到的連接
* 也就是說, 這個(gè)方法只能移除處于使用中和保留狀態(tài)的連接
*
* @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first
*/
public boolean remove(final T bagEntry) {
//?
//嘗試標(biāo)記移除使用中和保留狀態(tài)的連接, 如果標(biāo)記失敗, 就是空閑的連接, 直接返回 false
//也就是檢查連接的狀態(tài), 不能移除空閑的連接或者已經(jīng)標(biāo)記移除的連接
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
//如果上面標(biāo)記成功了, 那么從連接池中移除這個(gè)連接
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
// synchronizer.signal();
return removed;
}
上面的代碼是ConcurrentBag中的成員變量和最重要的四個(gè)方法,ConcurrentBag中的屬性我們穿插在代碼中解釋。
borrow方法
borrow方法應(yīng)該是整個(gè) HikariCP 中最最核心的方法,它是我們從連接池中獲取連接的時(shí)候最終會(huì)調(diào)用到的方法,一切秘密都在這里了。我們分析下:
①ThreadLocal
//①
//先嘗試從ThreadLocal中獲取
List<Object> list = threadList.get();
if (weakThreadLocals && list == null) {
//如果ThreadLocal是 null, 就初始化, 防止后面 npe
list = new ArrayList<>(16);
threadList.set(list);
}
threadList是在ConcurrentBag上的成員變量,它的定義是private final ThreadLocal<List> threadList;,可見它是一個(gè)ThreadLocal,也就是每個(gè)線程都有獨(dú)享的一個(gè) List,它是用于保存當(dāng)前線程用過的連接。注意這里我說的是"用過",不是所有的連接。因?yàn)檫€有一個(gè)成員變量private final CopyOnWriteArrayList<T> sharedList;,它是真正的保存所有的連接的地方,它是一個(gè)CopyOnWriteArrayList,在寫入的時(shí)候會(huì)先復(fù)制一個(gè) sharedList2,然后修改這個(gè)新的 sharedList2,最后將變量地址指向新的sharedList2,不是直接寫入當(dāng)前的sharedList,典型的空間換時(shí)間的一個(gè)做法,可以避免寫入前要鎖住sharedList,從而導(dǎo)致降低性能。
HikariCP 使用過的連接,在還回連接池的時(shí)候,是直接放在了ThreadLocal中。說到這里,可能會(huì)有同學(xué)問了:sharedList中保存了所有的連接,當(dāng)用戶借走了一個(gè)連接,不是應(yīng)該把這個(gè)連接從sharedList中移除,然后還回來的時(shí)候再把連接加入到sharedList中?為什么還回去的時(shí)候,沒有放到sharedList中呢?
首先,明確一點(diǎn),HikariCP 不是這樣做的。為什么呢?如果用戶借用連接的時(shí)候,你從sharedList中移除了,那么相當(dāng)于這個(gè)連接脫離了 HikariCP 的管理,后面 HikariCP 還怎么管理這個(gè)連接呢?比如這個(gè)連接的生命周期到時(shí)間了,連接都讓用戶拐跑了,我還怎么關(guān)閉這個(gè)連接呢?所以,所有的連接都不能脫離掌控,一個(gè)都不能少。其實(shí),我們?cè)?code>sharedList中保存的僅僅是數(shù)據(jù)庫(kù)連接的引用,這些連接是所有的線程都可見的,各個(gè)線程也可以隨意保存連接的引用,只是要使用的時(shí)候必須要走borrow方法,按流程來。
為什么要放到線程的threadList中?
因?yàn)橄麓潍@取的時(shí)候比較方便,也許會(huì)提高性能。每個(gè)線程都優(yōu)先從自己的本地線程中拿,競(jìng)爭(zhēng)的可能性大大降低啊,也許這個(gè)連接剛剛用完到再次獲取的時(shí)間極短,這個(gè)連接很可能還空閑著。只有在本地線程中的連接都不能使用的時(shí)候,才去sharedList這個(gè) HikariCP的總倉(cāng)庫(kù)里獲取。
舉一個(gè)生活例子:假如你是一個(gè)連鎖店老板,提供汽車出租服務(wù),有一個(gè)總倉(cāng)庫(kù),所有的連鎖店都從這里提車出租給用戶。剛開始,你是每租一輛車都去倉(cāng)庫(kù)直接提貨,用戶還車的時(shí)候,你直接送到倉(cāng)庫(kù)。過了一段時(shí)間,你覺得這樣不行啊,太浪費(fèi)時(shí)間了,而且所有的連鎖店都這樣,各個(gè)店的老板都去提車,太忙了,還得排隊(duì)。要不用戶還回來的車先放店里吧,這樣下次有用戶租車就不用去倉(cāng)庫(kù)了,直接給他,方便很多,店里沒車了再去總倉(cāng)提車。其他連鎖店都開始這么搞,大家都先用店里的車不夠再去總倉(cāng)。生意火爆,有一天店里沒車了,你去倉(cāng)庫(kù)提車,倉(cāng)庫(kù)管理員說:倉(cāng)庫(kù)也沒車了,天通苑的連鎖店里有閑著的,你去那里提吧,于是你把天通苑連鎖店的車借走了。所以各個(gè)連鎖店之間也有相互借車。
例子可能不太恰當(dāng),一時(shí)也想不到同樣道理的生活例子,但是就這個(gè)意思。HikariCP 也是這樣,用戶使用的連接,還回連接池的時(shí)候,直接放到線程的本地threadList中,如果用戶又要借用連接,先看本地有沒有,優(yōu)先使用本地連接,只有本地沒有或者都不可用的時(shí)候,再去 HikariCP 的連接池里獲取。但是跟借車不同,因?yàn)槲覀儽镜厥潜4娴?code>sharedList中連接的引用,雖然你還有這個(gè)連接的引用,但是很可能它已經(jīng)被其他線程從sharedList借走了,這就是HikariCP所謂的線程間的連接竊取。所以線程在本地的threadList就算拿到了連接,也必須檢查下狀態(tài),是不是可用的。
說到這里,還沒有解析代碼,扯遠(yuǎn)了。①出代碼就是先從本地的threadList里取出連接的 List,然后檢查下List 是否為空,是空的直接初始化一個(gè) List,因?yàn)橄旅嬉玫?,防止拋空指針了。大家可以看到判空的時(shí)候,還有一個(gè)條件是weakThreadLocals,這個(gè)標(biāo)識(shí)是表示threadList是否是弱引用。如果是弱引用,那么很可能 GC 的時(shí)候會(huì)被回收掉,所以變成 null 了,但是如果不是弱引用的話,那么它是在初始化ConcurrentBag的時(shí)候,就是一個(gè)FastList了,不用擔(dān)心是 null。那么什么情況下threadList會(huì)是弱引用呢?當(dāng) HikariCP 運(yùn)行在容器中時(shí),會(huì)使用弱引用,因?yàn)樵谌萜髦匦虏渴鸬臅r(shí)候,可能會(huì)導(dǎo)致內(nèi)存泄露,具體大家可以看下#39 的 issue。