[druid 源碼解析] 3 創(chuàng)建連接&銷毀連接

回想我們上節(jié)的內(nèi)容,有兩個(gè)地方跳過了,一個(gè)是啟動(dòng)創(chuàng)建鏈接的線程,一個(gè)是啟動(dòng)銷毀鏈接的線程,我們這次就來詳細(xì)的探究一番這兩個(gè)的用途。

創(chuàng)建連接線程

首先我們需要回頭介紹之前沒有講解的三個(gè) connection 數(shù)組的概念:

  • connections:空閑連接的數(shù)組
  • evictConnections:失效、過期的連接,會(huì)暫時(shí)放在這個(gè)數(shù)組里面
  • keepAliveConnections:銷毀線程會(huì)檢測線程,如果檢測存活的線程放暫時(shí)放在這里,然后統(tǒng)一放回connections中。
  • activeConnections:活動(dòng)的鏈接數(shù)組。
    CreateConnectionThread 的 run 方法如下(刪除部分代碼):
public void run() {
            initedLatch.countDown();

            long lastDiscardCount = 0;
            int errorCount = 0;
            for (; ; ) {
                // addLast
                try {
                    lock.lockInterruptibly();
                } catch (InterruptedException e2) {
                    break;
                }

                long discardCount = DruidDataSource.this.discardCount;
                boolean discardChanged = discardCount - lastDiscardCount > 0;
                lastDiscardCount = discardCount;

                try {
                    boolean emptyWait = true;
                    // 假如創(chuàng)建失敗,不需要重新等待 emptyWait
                    if (createError != null
                        && poolingCount == 0
                        && !discardChanged) {
                        emptyWait = false;
                    }
                    // 假如 異步初始化也無需等待。
                    if (emptyWait
                        && asyncInit && createCount < initialSize) {
                        emptyWait = false;
                    }

                    if (emptyWait) {
                        // poolingCount:連接池中的空閑連接數(shù)量
                        // notEmptyWaitThreadCount:等待連接的線程數(shù)量
                        // 必須存在線程等待,才創(chuàng)建連接
                        if (poolingCount >= notEmptyWaitThreadCount //
                            && (!(keepAlive && activeCount + poolingCount < minIdle))
                            && !isFailContinuous()
                        ) {
                            empty.await();
                        }

                        // 防止創(chuàng)建超過maxActive數(shù)量的連接
                        if (activeCount + poolingCount >= maxActive) {
                            empty.await();
                            continue;
                        }
                    }

                } catch (InterruptedException e) {
              ......
                } finally {
                    lock.unlock();
                }

                PhysicalConnectionInfo connection = null;

                try {
                    // 創(chuàng)建物理鏈接 最終調(diào)用 Connection nativeConnection = driver.connect(url, info); 創(chuàng)建鏈接
                    connection = createPhysicalConnection();
                } catch (SQLException e) {
                  .....
                }

                boolean result = put(connection);
             
            }
        }

其主要流程如下:

  1. 假如現(xiàn)在沒有等待獲取鏈接的線程,就調(diào)用 empty.await(); 等待信號創(chuàng)建連接。
  2. 通過驅(qū)動(dòng)程序創(chuàng)建物理鏈接.
  3. 將物理連接放入空閑連接數(shù)組中。
    什么時(shí)候會(huì)觸發(fā)這個(gè)創(chuàng)建鏈接的信號呢。這里給出了一個(gè)實(shí)例:
    截屏2021-11-09 下午10.25.30.png

    其中 takeLast 方法判斷假如連接池連接的數(shù)量為 0 就會(huì)調(diào)用這個(gè)信號觸發(fā),我們再來看看 put 方法。
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
        lock.lock();
        try {
            // 判斷是否已經(jīng)關(guān)閉
            if (this.closing || this.closed) {
                return false;
            }
            // 判斷是否大于存活數(shù)量
            if (poolingCount >= maxActive) {
                if (createScheduler != null) {
                    // 調(diào)用清理任務(wù),清除多余鏈接。
                    clearCreateTask(createTaskId);
                }
                return false;
            }

            if (checkExists) {
                for (int i = 0; i < poolingCount; i++) {
                    if (connections[i] == holder) {
                        return false;
                    }
                }
            }
            // 放到 connections 數(shù)組中
            connections[poolingCount] = holder;
            // 調(diào)用原子類增加 connection 統(tǒng)計(jì)數(shù)量
            incrementPoolingCount();
            // 統(tǒng)計(jì)連接池高峰
            if (poolingCount > poolingPeak) {
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();
            }
            // 通知獲取連接的線程。
            notEmpty.signal();
            notEmptySignalCount++;

            if (createScheduler != null) {
                clearCreateTask(createTaskId);

                if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                    && activeCount + poolingCount + createTaskCount < maxActive) {
                    emptySignal();
                }
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

我們來看一下主要流程:

  1. 首先獲取鎖,然后檢查是否存活,是否大于最大連接數(shù)量。
  2. 將該鏈接放入 connections 中,并將空閑數(shù)量+1。
  3. 通知獲取鏈接的線程。(看到這里,大家有沒有想到 ArrayBlockQueue,它也是通過兩個(gè) Condition 來控制生產(chǎn)者和消費(fèi)者的阻塞和活動(dòng)的)。
    接著我們來看銷毀線程。
    public class DestroyTask implements Runnable {
        @Override
        public void run() {
            // 銷毀超過最大空閑的鏈接
            shrink(true, keepAlive);
            // 回收超時(shí)的鏈接
            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }

銷毀線程主要做兩件事情,一個(gè)是銷毀超過最大空閑連接數(shù)量的鏈接,一個(gè)是回收超時(shí)的鏈接。我們先來看一下第一個(gè):

public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }

        boolean needFill = false;
        int evictCount = 0;
        int keepAliveCount = 0;
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        fatalErrorCountLastShrink = fatalErrorCount;

        try {
            if (!inited) {
                return;
            }
            // 需要檢查的鏈接數(shù)量,必須要保留 minIdle 以上的連接數(shù)量
            final int checkCount = poolingCount - minIdle;
            final long currentTimeMillis = System.currentTimeMillis();
            // 遍歷每一條鏈接
            for (int i = 0; i < poolingCount; ++i) {
                // 獲取該鏈接
                DruidConnectionHolder connection = connections[i];

                if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
                    // keepAliveConnections: 銷毀線程會(huì)檢測線程,如果檢測存活的線程放暫時(shí)放在這里
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                }

                if (checkTime) {
                    if (phyTimeoutMillis > 0) {
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            // evictConnections: 失效、過期的連接,會(huì)暫時(shí)放在這個(gè)數(shù)組里面
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
                    // 配置參數(shù)錯(cuò)誤
                    if (idleMillis < minEvictableIdleTimeMillis
                        && idleMillis < keepAliveBetweenTimeMillis
                    ) {
                        break;
                    }
                    // minEvictableIdleTimeMillis 最小丟棄空閑時(shí)間
                    if (idleMillis >= minEvictableIdleTimeMillis) {
                        if (checkTime && i < checkCount) {
                            evictConnections[evictCount++] = connection;
                            continue;
                            // maxEvictableIdleTimeMillis 最大丟棄空閑時(shí)間。
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }
                    // 假如沒有超過丟棄最小時(shí)間,看是否需要放到 keepAliveConnections 中。
                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    if (i < checkCount) {
                        // 強(qiáng)制移除
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }
            // 需要移除的數(shù)量:等于丟棄的數(shù)量和keepAlive的數(shù)量
            int removeCount = evictCount + keepAliveCount;
            // 清理 connections 中的需要移除的數(shù)量,這些都是連續(xù)的,我們可以從上面的循環(huán)中看出來
            if (removeCount > 0) {
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            }
            keepAliveCheckCount += keepAliveCount;

            if (keepAlive && poolingCount + activeCount < minIdle) {
                needFill = true;
            }
        } finally {
            lock.unlock();
        }
        // 關(guān)閉 evictConnections 中的鏈接
        if (evictCount > 0) {
            for (int i = 0; i < evictCount; ++i) {
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            }
            Arrays.fill(evictConnections, null);
        }
        // 檢查 keepAliveCount 中的鏈接是否健康,是的話放回 connections 中,假如不是就 close
        if (keepAliveCount > 0) {
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }

                boolean discard = !validate;
                if (validate) {
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) {
                        discard = true;
                    }
                }

                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        // skip
                    }

                    lock.lock();
                    try {
                        discardCount++;

                        if (activeCount + poolingCount <= minIdle) {
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }
        // 假如需要填滿,就調(diào)用 emptySignal 喚醒創(chuàng)建線程。
        if (needFill) {
            lock.lock();
            try {
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        } else if (onFatalError || fatalErrorIncrement > 0) {
            lock.lock();
            try {
                emptySignal();
            } finally {
                lock.unlock();
            }
        }
    }

其主要步驟如下:

  1. 首先通過計(jì)算得到需要消除的鏈接數(shù)量,然后遍歷空閑連接,從 0 開始(最老的連接,先銷毀)。
  2. 將連接分類,看是需要放到 keepAliveConnections 中,還是 evictConnections , 兩者必選其中一個(gè),具體分類邏輯可以看上面代碼,有具體的注釋。
  3. 將需要丟棄的連接從空閑連接數(shù)組中剔除。
  4. evictConnections 數(shù)組中的連接關(guān)閉。
  5. 檢查 keepAliveCount 中的鏈接是否健康,是的話放回 connections 中,假如不是就 close 。
    接下來我們再看一下 removeAbandoned 的邏輯。
 public int removeAbandoned() {
        int removeCount = 0;

        long currrentNanos = System.nanoTime();

        List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();

        activeConnectionLock.lock();
        try {
            // 遍歷所有活動(dòng)的鏈接,
            Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();

            for (; iter.hasNext(); ) {
                DruidPooledConnection pooledConnection = iter.next();
                // 檢查連接是否還在運(yùn)行,還在就跳到下一個(gè)
                if (pooledConnection.isRunning()) {
                    continue;
                }

                long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
                // 假如超時(shí),就放到丟棄隊(duì)列
                if (timeMillis >= removeAbandonedTimeoutMillis) {
                    iter.remove();
                    pooledConnection.setTraceEnable(false);
                    abandonedList.add(pooledConnection);
                }
            }
        } finally {
            activeConnectionLock.unlock();
        }

        if (abandonedList.size() > 0) {
            for (DruidPooledConnection pooledConnection : abandonedList) {
                final ReentrantLock lock = pooledConnection.lock;
                lock.lock();
                try {
                    if (pooledConnection.isDisable()) {
                        continue;
                    }
                } finally {
                    lock.unlock();
                }
                // 關(guān)閉鏈接
                JdbcUtils.close(pooledConnection);
                pooledConnection.abandond();
                removeAbandonedCount++;
                removeCount++;
                // 打印日志
               ..........
        }

        return removeCount;
    }

這里的邏輯比較簡單,首先是遍歷所有活動(dòng)的鏈接,假如連接在運(yùn)行就跳到下一個(gè),假如不在運(yùn)行,檢查是否超時(shí),假如是就將連接放入丟棄隊(duì)列中,然后遍歷丟棄隊(duì)列,將連接關(guān)閉。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容