回想我們上節(jié)的內(nèi)容,有兩個(gè)地方跳過了,一個(gè)是啟動(dòng)創(chuàng)建鏈接的線程,一個(gè)是啟動(dòng)銷毀鏈接的線程,我們這次就來詳細(xì)的探究一番這兩個(gè)的用途。
創(chuàng)建連接線程
首先我們需要回頭介紹之前沒有講解的三個(gè) connection 數(shù)組的概念:
- connections:空閑連接的數(shù)組
- evictConnections:失效、過期的連接,會(huì)暫時(shí)放在這個(gè)數(shù)組里面
- keepAliveConnections:銷毀線程會(huì)檢測線程,如果檢測存活的線程放暫時(shí)放在這里,然后統(tǒng)一放回connections中。
- activeConnections:活動(dòng)的鏈接數(shù)組。
CreateConnectionThread的 run 方法如下(刪除部分代碼):
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (; ; ) {
// addLast
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
boolean emptyWait = true;
// 假如創(chuàng)建失敗,不需要重新等待 emptyWait
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
// 假如 異步初始化也無需等待。
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// poolingCount:連接池中的空閑連接數(shù)量
// notEmptyWaitThreadCount:等待連接的線程數(shù)量
// 必須存在線程等待,才創(chuàng)建連接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止創(chuàng)建超過maxActive數(shù)量的連接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
} catch (InterruptedException e) {
......
} finally {
lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {
// 創(chuàng)建物理鏈接 最終調(diào)用 Connection nativeConnection = driver.connect(url, info); 創(chuàng)建鏈接
connection = createPhysicalConnection();
} catch (SQLException e) {
.....
}
boolean result = put(connection);
}
}
其主要流程如下:
- 假如現(xiàn)在沒有等待獲取鏈接的線程,就調(diào)用
empty.await();等待信號創(chuàng)建連接。 - 通過驅(qū)動(dòng)程序創(chuàng)建物理鏈接.
- 將物理連接放入空閑連接數(shù)組中。
什么時(shí)候會(huì)觸發(fā)這個(gè)創(chuàng)建鏈接的信號呢。這里給出了一個(gè)實(shí)例:
截屏2021-11-09 下午10.25.30.png
其中takeLast方法判斷假如連接池連接的數(shù)量為 0 就會(huì)調(diào)用這個(gè)信號觸發(fā),我們再來看看 put 方法。
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
lock.lock();
try {
// 判斷是否已經(jīng)關(guān)閉
if (this.closing || this.closed) {
return false;
}
// 判斷是否大于存活數(shù)量
if (poolingCount >= maxActive) {
if (createScheduler != null) {
// 調(diào)用清理任務(wù),清除多余鏈接。
clearCreateTask(createTaskId);
}
return false;
}
if (checkExists) {
for (int i = 0; i < poolingCount; i++) {
if (connections[i] == holder) {
return false;
}
}
}
// 放到 connections 數(shù)組中
connections[poolingCount] = holder;
// 調(diào)用原子類增加 connection 統(tǒng)計(jì)數(shù)量
incrementPoolingCount();
// 統(tǒng)計(jì)連接池高峰
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 通知獲取連接的線程。
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
我們來看一下主要流程:
- 首先獲取鎖,然后檢查是否存活,是否大于最大連接數(shù)量。
- 將該鏈接放入 connections 中,并將空閑數(shù)量+1。
- 通知獲取鏈接的線程。(看到這里,大家有沒有想到 ArrayBlockQueue,它也是通過兩個(gè) Condition 來控制生產(chǎn)者和消費(fèi)者的阻塞和活動(dòng)的)。
接著我們來看銷毀線程。
public class DestroyTask implements Runnable {
@Override
public void run() {
// 銷毀超過最大空閑的鏈接
shrink(true, keepAlive);
// 回收超時(shí)的鏈接
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
銷毀線程主要做兩件事情,一個(gè)是銷毀超過最大空閑連接數(shù)量的鏈接,一個(gè)是回收超時(shí)的鏈接。我們先來看一下第一個(gè):
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
// 需要檢查的鏈接數(shù)量,必須要保留 minIdle 以上的連接數(shù)量
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 遍歷每一條鏈接
for (int i = 0; i < poolingCount; ++i) {
// 獲取該鏈接
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
// keepAliveConnections: 銷毀線程會(huì)檢測線程,如果檢測存活的線程放暫時(shí)放在這里
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
// evictConnections: 失效、過期的連接,會(huì)暫時(shí)放在這個(gè)數(shù)組里面
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// 配置參數(shù)錯(cuò)誤
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// minEvictableIdleTimeMillis 最小丟棄空閑時(shí)間
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
// maxEvictableIdleTimeMillis 最大丟棄空閑時(shí)間。
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// 假如沒有超過丟棄最小時(shí)間,看是否需要放到 keepAliveConnections 中。
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
// 強(qiáng)制移除
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
// 需要移除的數(shù)量:等于丟棄的數(shù)量和keepAlive的數(shù)量
int removeCount = evictCount + keepAliveCount;
// 清理 connections 中的需要移除的數(shù)量,這些都是連續(xù)的,我們可以從上面的循環(huán)中看出來
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
// 關(guān)閉 evictConnections 中的鏈接
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
// 檢查 keepAliveCount 中的鏈接是否健康,是的話放回 connections 中,假如不是就 close
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
// 假如需要填滿,就調(diào)用 emptySignal 喚醒創(chuàng)建線程。
if (needFill) {
lock.lock();
try {
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
其主要步驟如下:
- 首先通過計(jì)算得到需要消除的鏈接數(shù)量,然后遍歷空閑連接,從 0 開始(最老的連接,先銷毀)。
- 將連接分類,看是需要放到
keepAliveConnections中,還是evictConnections, 兩者必選其中一個(gè),具體分類邏輯可以看上面代碼,有具體的注釋。 - 將需要丟棄的連接從空閑連接數(shù)組中剔除。
- 將
evictConnections數(shù)組中的連接關(guān)閉。 - 檢查
keepAliveCount中的鏈接是否健康,是的話放回 connections 中,假如不是就 close 。
接下來我們再看一下removeAbandoned的邏輯。
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {
// 遍歷所有活動(dòng)的鏈接,
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext(); ) {
DruidPooledConnection pooledConnection = iter.next();
// 檢查連接是否還在運(yùn)行,還在就跳到下一個(gè)
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
// 假如超時(shí),就放到丟棄隊(duì)列
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
} finally {
activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
if (pooledConnection.isDisable()) {
continue;
}
} finally {
lock.unlock();
}
// 關(guān)閉鏈接
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
// 打印日志
..........
}
return removeCount;
}
這里的邏輯比較簡單,首先是遍歷所有活動(dòng)的鏈接,假如連接在運(yùn)行就跳到下一個(gè),假如不在運(yùn)行,檢查是否超時(shí),假如是就將連接放入丟棄隊(duì)列中,然后遍歷丟棄隊(duì)列,將連接關(guān)閉。
