@[toc]
引出線程池
線程是并發(fā)編程的基礎(chǔ),前面的文章里,我們的實(shí)例基本都是基于線程開發(fā)作為實(shí)例,并且都是使用的時(shí)候就創(chuàng)建一個(gè)線程。這種方式比較簡單,但是存在一個(gè)問題,那就是線程的數(shù)量問題。
假設(shè)有一個(gè)系統(tǒng)比較復(fù)雜,需要的線程數(shù)很多,如果都是采用這種方式來創(chuàng)建線程的話,那么就會(huì)極大的消耗系統(tǒng)資源。首先是因?yàn)榫€程本身的創(chuàng)建和銷毀需要時(shí)間,如果每個(gè)小任務(wù)都創(chuàng)建一個(gè)線程,那么就會(huì)大大降低系統(tǒng)的效率。其次是線程本身也是占用內(nèi)存空間的,大量的線程運(yùn)行會(huì)搶占內(nèi)存資源,處理不當(dāng)很可能會(huì)內(nèi)存溢出,這顯然不是我們想看到的。
那么有什么辦法解決呢?有一個(gè)好的思路就是對(duì)線程進(jìn)行復(fù)用,因?yàn)樗械木€程并不都是同一時(shí)間一起運(yùn)行的,有些線程在某個(gè)時(shí)刻可能是空閑狀態(tài),如果這部分空閑線程能有效利用起來,那么就能讓線程的運(yùn)行被充分的利用,這樣就不需要?jiǎng)?chuàng)建那么多的線程了。我們可以把特定數(shù)量的線程放在一個(gè)容器里,需要使用線程時(shí),從容器里拿出空閑線程使用,線程工作完后不急著關(guān)閉,而是退回到線程池等待使用。這樣的容器一般被稱為線程池。用線程池來管理線程是非常有效的方法,用一張圖片可以簡單的展示出線程池的管理流程:

Executor框架
Java中也有一套框架來控制管理線程,那就是Executor框架。Executor框架是JDK1.5之后才引入的,位于java.util.cocurrent 包下,可以通過該框架來控制線程的啟動(dòng)、執(zhí)行和關(guān)閉,從而簡化并發(fā)編程的操作,這是它的核心成員類圖:

Executor:最上層的接口,定義了一個(gè)基本方法execute,接受一個(gè)Runnable參數(shù),用來替代通常創(chuàng)建或啟動(dòng)線程的方法。
ExecutorService:繼承自Executor接口,提供了處理多線程的方法。
ScheduledExecutorService:定時(shí)調(diào)度接口,繼承自ExecutorService。
AbstractExecutorService:執(zhí)行框架的抽象類。
ThreadPoolExecutor:線程池中最核心的一個(gè)類,提供了線程池操作的基本方法。
Executors:線程池工廠類,可用于創(chuàng)建一系列有特定功能的線程池。
ThreadPoolExecutor詳解
以上Executor框架中的基本成員,其中最核心的的成員無疑就是ThreadPoolExecutor,想了解Java中線程池的運(yùn)行機(jī)制,就必須先了解這個(gè)類,而最好的了解方式無疑就是看源碼。
構(gòu)造函數(shù)
打開ThreadPoolExecutor的源碼,發(fā)現(xiàn)類中提供了四個(gè)構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看出,ThreadPoolExecutor的構(gòu)造函數(shù)中的參數(shù)還是比較多的,并且最核心的是第四個(gè)構(gòu)造函數(shù),其中完成了底層的初始化工作。
下面解釋一下構(gòu)造函數(shù)參數(shù)的含義:
- corePoolSize:線程池的基本大小。當(dāng)提交一個(gè)任務(wù)到線程池后,線程池會(huì)創(chuàng)建一個(gè)線程執(zhí)行任務(wù),重復(fù)這種操作,直到線程池中的數(shù)目達(dá)到corePoolSize后不再創(chuàng)建新線程,而是把任務(wù)放到緩存隊(duì)列中。
- maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)。
- workQueue:阻塞隊(duì)列,用于存儲(chǔ)等待執(zhí)行的任務(wù),并且只能存儲(chǔ)調(diào)用
execute方法提交的任務(wù)。常用的有三種隊(duì)列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。 - keepAliveTime:線程池中線程的最大空閑時(shí)間,這種情況一般是線程數(shù)目大于任務(wù)的數(shù)量導(dǎo)致。
- unit:keepAliveTime的時(shí)間單位,TimeUnit是一個(gè)枚舉類型,位于java.util.concurrent包下。
- threadFactory:線程工廠,用于創(chuàng)建線程。
- handler:拒絕策略,當(dāng)任務(wù)太多來不及處理時(shí)所采用的處理策略。
重要的變量
看完了構(gòu)造函數(shù),我們來看下ThreadPoolExecutor類中幾個(gè)重要的成員變量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl:控制線程運(yùn)行狀態(tài)的一個(gè)字段。同時(shí),根據(jù)下面的幾個(gè)方法runStateOf,workerCountOf,ctlOf可以看出,該字段還包含了兩部分的信息:線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),并且使用的是Integar類型,高3位保存runState,低29位保存workerCount。
COUNT_BITS:值為29的常量,在字段CAPACITY被引用計(jì)算。
CAPACITY:表示有效線程數(shù)量(workerCount)的上限,大小為 (1<<29) - 1。
下面5個(gè)變量表示的是線程的運(yùn)行狀態(tài),分別是:
- RUNNING :接受新提交的任務(wù),并且能處理阻塞隊(duì)列中的任務(wù);
- SHUTDOWN:不接受新的任務(wù),但會(huì)執(zhí)行隊(duì)列中的任務(wù)。
- STOP:不接受新任務(wù),也不處理隊(duì)列中的任務(wù),同時(shí)中斷正在處理任務(wù)的線程。
- TIDYING:如果所有的任務(wù)都已終止了,workerCount (有效線程數(shù)) 為0,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)用 terminated() 方法進(jìn)入TERMINATED 狀態(tài)。
- TERMINATED:terminated( ) 方法執(zhí)行完畢。
用一個(gè)狀態(tài)轉(zhuǎn)換圖表示大概如下 (圖片來源于https://www.cnblogs.com/liuzhihu/p/8177371.html):

構(gòu)造函數(shù)和基本參數(shù)都了解后,接下來就是對(duì)類中重要方法的研究了。
線程池執(zhí)行流程
execute方法
ThreadPoolExecutor類的核心調(diào)度方法是execute(),通過調(diào)用這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。而ThreadPoolExecutor的工作邏輯也可以藉由這個(gè)方法來一步步理清。這是方法的源碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取ctl的值,前面說了,該值記錄著runState和workerCount
int c = ctl.get();
/*
* 調(diào)用workerCountOf得到當(dāng)前活動(dòng)的線程數(shù);
* 當(dāng)前活動(dòng)線程數(shù)小于corePoolSize,新建一個(gè)線程放入線程池中;
* addWorker(): 把任務(wù)添加到該線程中。
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//如果上面的添加線程操作失敗,重新獲取ctl值
c = ctl.get();
}
//如果當(dāng)前線程池是運(yùn)行狀態(tài),并且往工作隊(duì)列中添加該任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/*
* 如果當(dāng)前線程不是運(yùn)行狀態(tài),把任務(wù)從隊(duì)列中移除
* 調(diào)用reject(內(nèi)部調(diào)用handler)拒絕接受任務(wù)
*/
if (! isRunning(recheck) && remove(command))
reject(command);
//獲取線程池中的有效線程數(shù),如果為0,則執(zhí)行addWorker創(chuàng)建一個(gè)新線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果執(zhí)行到這里,有兩種情況:
* 1. 線程池已經(jīng)不是RUNNING狀態(tài);
* 2. 線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且workQueue已滿。
* 這時(shí),再次調(diào)用addWorker方法,但第二個(gè)參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
* 如果失敗則拒絕該任務(wù)
*/
else if (!addWorker(command, false))
reject(command);
}
簡單概括一下代碼的邏輯,大概是這樣:
1、判斷當(dāng)前運(yùn)行中的線程數(shù)是否小于corePoolSize,是的話則調(diào)用addWorker創(chuàng)建線程執(zhí)行任務(wù)。
2、不滿足1的條件,就把任務(wù)放入工作隊(duì)列workQueue中。
3、如果任務(wù)成功加入workQueue,判斷線程池是否是運(yùn)行狀態(tài),不是的話先把任務(wù)移出工作隊(duì)列,并調(diào)用reject方法,使用拒絕策略拒絕該任務(wù)。線程如果是非運(yùn)行中,調(diào)用addWorker創(chuàng)建一個(gè)新線程。
4、如果放入workQueue失敗 (隊(duì)列已滿),則調(diào)用addWorker創(chuàng)建線程執(zhí)行任務(wù),如果這時(shí)創(chuàng)建線程失敗 (addWorker傳進(jìn)去的第二個(gè)參數(shù)值是false,說明這種情況是當(dāng)前線程數(shù)不小于maximumPoolSize),就會(huì)調(diào)用reject(內(nèi)部調(diào)用handler)拒絕接受任務(wù)。
整個(gè)執(zhí)行流程用一張圖片表示大致如下:

以上就是execute方法的大概邏輯,接下來看看addWorker的方法實(shí)現(xiàn)。
addWorker方法
源碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**線程池狀態(tài)不為SHUTDOWN時(shí)
* 判斷隊(duì)列或者任務(wù)是否為空,是的話返回false
*/.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/* 這里可以看出core參數(shù)決定著活動(dòng)線程數(shù)的大小比較對(duì)象
* core為true表示與 corePoolSize大小進(jìn)行比較
* core為false表示與 maximumPoolSize大小進(jìn)行比較
* 當(dāng)前活動(dòng)線程數(shù)大于比較對(duì)象就返回false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試增加workerCount,如果成功,則跳出第一個(gè)for循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失敗,則重新獲取ctl的值
c = ctl.get(); // Re-read ctl
// 如果當(dāng)前的運(yùn)行狀態(tài)不等于rs,說明狀態(tài)已被改變,返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創(chuàng)建一個(gè)worker對(duì)象w
w = new Worker(firstTask);
//實(shí)例化w的線程t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個(gè)HashSet,保存著任務(wù)的worker對(duì)象
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動(dòng)線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
從代碼中可以看出,addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行,其中firstTask參數(shù)指定的是新線程需要執(zhí)行的第一個(gè)任務(wù),core參數(shù)決定于活動(dòng)線程數(shù)的比較對(duì)象是corePoolSize還是maximumPoolSize。根據(jù)傳進(jìn)來的參數(shù)首先對(duì)線程池和隊(duì)列的狀態(tài)進(jìn)行判斷,滿足條件就新建一個(gè)Worker對(duì)象,并實(shí)例化該對(duì)象的線程,最后啟動(dòng)線程。
Worker類
根據(jù)addWorker源碼中的邏輯,我們可以發(fā)現(xiàn),線程池中的每一個(gè)線程其實(shí)都是對(duì)應(yīng)的Worker對(duì)象在維護(hù)的,所以我們有必要對(duì)Worker類一探究竟,先看一下類的源碼:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
從Worker類的構(gòu)造函數(shù)可以看出,當(dāng)實(shí)例化一個(gè)Worker對(duì)象時(shí),Worker對(duì)象會(huì)把傳進(jìn)來的Runnable參數(shù)firstTask賦值給自己的同名屬性,并且用線程工廠也就是當(dāng)前的ThreadFactory來新建一個(gè)線程。
同時(shí),因?yàn)閃orker實(shí)現(xiàn)了Runnable接口,所以當(dāng)Worker類中的線程啟動(dòng)時(shí),調(diào)用的其實(shí)是run()方法。run方法中調(diào)用的是runWorker方法,我們來看下它的具體實(shí)現(xiàn):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取第一個(gè)任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
//允許中斷
w.unlock(); // allow interrupts
//是否因?yàn)楫惓M顺鲅h(huán)的標(biāo)志,processWorkerExit方法會(huì)對(duì)該參數(shù)做判斷
boolean completedAbruptly = true;
try {
//判斷task是否為null,是的話通過getTask()從隊(duì)列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/* 這里的判斷主要邏輯是這樣:
* 如果線程池正在停止,那么就確保當(dāng)前線程是中斷狀態(tài);
* 如果不是的話,就要保證不是中斷狀態(tài)
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//用于記錄任務(wù)執(zhí)行前需要做哪些事,屬于ThreadPoolExecutor類中的方法, //是空的,需要子類具體實(shí)現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
總結(jié)一下runWorker方法的運(yùn)行邏輯:
1、通過while循環(huán)不斷地通過getTask()方法從隊(duì)列中獲取任務(wù);
2、如果線程池正在停止?fàn)顟B(tài),確保當(dāng)前的線程是中斷狀態(tài),否則確保當(dāng)前線程不中斷;
3、調(diào)用task的run()方法執(zhí)行任務(wù),執(zhí)行完畢后需要置為null;
4、循環(huán)調(diào)用getTask()取不到任務(wù)了,跳出循環(huán),執(zhí)行processWorkerExit()方法。
過完runWorker()的運(yùn)行流程,我們來看下getTask()是怎么實(shí)現(xiàn)的。
getTask方法
getTask()方法的作用是從隊(duì)列中獲取任務(wù),下面是該方法的源碼:
private Runnable getTask() {
//記錄上次從隊(duì)列獲取任務(wù)是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//將workerCount減1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/* timed變量用于判斷線程的操作是否需要進(jìn)行超時(shí)判斷
* allowCoreThreadTimeOut不管它,默認(rèn)是false
* wc > corePoolSize,當(dāng)前線程是如果大于核心線程數(shù)corePoolSize
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/* 根據(jù)timed變量判斷,如果為true,調(diào)用workQueue的poll方法獲取任務(wù),
* 如果在keepAliveTime時(shí)間內(nèi)沒有獲取到任務(wù),則返回null;
* timed為false的話,就調(diào)用workQueue的take方法阻塞隊(duì)列,
* 直到隊(duì)列中有任務(wù)可取。
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//r為null,說明time為true,超時(shí)了,把timedOut也設(shè)置為true
timedOut = true;
} catch (InterruptedException retry) {
//發(fā)生異常,把timedOut也設(shè)置為false,重新跑循環(huán)
timedOut = false;
}
}
}
getTask的代碼看上去比較簡單,但其實(shí)內(nèi)有乾坤,我們來重點(diǎn)分析一下兩個(gè)if判斷的邏輯:
1、當(dāng)進(jìn)入getTask方法后,先判斷當(dāng)前線程池狀態(tài),如果線程池狀態(tài)rs >= SHUTDOWN,再進(jìn)行以下判斷:
1)rs 的狀態(tài)是否大于STOP;2)隊(duì)列是否為空;
滿足以上條件其中之一,就將workerCount減1并返回null,也就是表示隊(duì)列中不再有任務(wù)。因?yàn)榫€程池的狀態(tài)值是SHUTDOWN以上時(shí),隊(duì)列中不再允許添加新任務(wù),所以上面兩個(gè)條件滿足一個(gè)都說明隊(duì)列中的任務(wù)都取完了。
2、進(jìn)入第二個(gè)if判斷,這里的邏輯有點(diǎn)繞,但作用比較重要,是為了控制線程池的有效線程數(shù)量,我們來具體解析下代碼:
wc > maximumPoolSize:判斷當(dāng)前線程數(shù)是否大于maximumPoolSize,這種情況一般很少發(fā)生,除非是maximumPoolSize的大小在該程序執(zhí)行的同時(shí)被進(jìn)行設(shè)置,比如調(diào)用ThreadPoolExecutor中的setMaximumPoolSize方法。
timed && timedOut:如果為true,表示當(dāng)前的操作需要進(jìn)行超時(shí)判斷,并且上次從隊(duì)列獲取任務(wù)已經(jīng)超時(shí)。
wc > 1 || workQueue.isEmpty():如果工作線程大于1,或者阻塞隊(duì)列是空的。
compareAndDecrementWorkerCount:比較并將線程池中的workerCount減1
在上文中,我們解析execute方法的邏輯時(shí)了解到,如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時(shí),仍然可以增加工作線程。
但調(diào)用getTask()取任務(wù)的過程中,如果超時(shí)沒有獲取到任務(wù),也就是timedOut為true的情況,說明workQueue已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,也就是不斷的讓任務(wù)被取出,讓線程數(shù)量保持在corePoolSize即可,直到getTask方法返回null。
而當(dāng)getTask方法返回null后,runWorker方法中就會(huì)因?yàn)槿〔坏饺蝿?wù)而執(zhí)行processWorkerExit()方法。
processWorkerExit方法
processWorkerExit方法的作用主要是對(duì)worker對(duì)象的移除,下面是方法的源碼:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//是異常退出的話,執(zhí)行程序?qū)orkerCount數(shù)量減1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從workers的集合中移除worker對(duì)象,也就表示著從線程池中移除了一個(gè)工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,從executor方法開始的整個(gè)運(yùn)行過程就完畢了,總結(jié)一下該流程:
執(zhí)行executor --> 新建Worker對(duì)象,并實(shí)例化線程 --> 調(diào)用runWorker方法,通過getTask()獲取任務(wù),并執(zhí)行run方法 --> getTask()方法中不斷向隊(duì)列取任務(wù),并將workerCount數(shù)量減1,直至返回null --> 調(diào)用processWorkerExit清除worker對(duì)象。
用一張流程圖表示如下所示 (圖片來源于https://www.cnblogs.com/liuzhihu/p/8177371.html):

任務(wù)隊(duì)列workQueue
前面我們多次提到了workQueue,這是一個(gè)任務(wù)隊(duì)列,用來存放等待執(zhí)行的任務(wù),它是BlockingQueue<Runnable>類型的對(duì)象,而在ThreadPoolExecutor的源碼注釋中,詳細(xì)介紹了三種常用的Queue類型,分別是:
SynchronousQueue:直接提交的隊(duì)列。這個(gè)隊(duì)列沒有容量,當(dāng)接收到任務(wù)的時(shí)候,會(huì)直接提交給線程處理,而不保留它。如果沒有空閑的線程,就新建一個(gè)線程來處理這個(gè)任務(wù)!如果線程數(shù)量達(dá)到最大值,就會(huì)執(zhí)行拒絕策略。所以,使用這個(gè)類型隊(duì)列的時(shí)候,一般都是將maximumPoolSize一般指定成Integer.MAX_VALUE,避免容易被拒絕。
ArrayBlockingQueue:有界的任務(wù)隊(duì)列。需要給定一個(gè)參數(shù)來限制隊(duì)列的長度,接收到任務(wù)的時(shí)候,如果沒有達(dá)到corePoolSize的值,則新建線程 (核心線程) 執(zhí)行任務(wù),如果達(dá)到了,則將任務(wù)放入等待隊(duì)列。如果隊(duì)列已滿,則在總線程數(shù)不到maximumPoolSize的前提下新建線程執(zhí)行任務(wù),若大于maximumPoolSize,則執(zhí)行拒絕策略。
LinkedBlockingQueue:無界的任務(wù)隊(duì)列。該隊(duì)列沒有任務(wù)數(shù)量的限制,所以任務(wù)可以一直入隊(duì),知道耗盡系統(tǒng)資源。當(dāng)接收任務(wù),如果當(dāng)前線程數(shù)小于corePoolSize,則新建線程處理任務(wù);如果當(dāng)前線程數(shù)等于corePoolSize,則進(jìn)入隊(duì)列等待。
任務(wù)拒絕策略
當(dāng)線程池的任務(wù)隊(duì)列已滿并且線程數(shù)目達(dá)到maximumPoolSize時(shí),對(duì)于新加的任務(wù)一般會(huì)采取拒絕策略,通常有以下四種策略:
- AbortPolicy:直接拋出異常,這是默認(rèn)策略;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:直接丟棄任務(wù);
線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow():
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
代碼邏輯就不一一進(jìn)行解析了,總結(jié)一下兩個(gè)方法的特點(diǎn)就是:
- shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)
- shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)
ThreadPoolExecutor創(chuàng)建線程池實(shí)例
ThreadPoolExecutor的運(yùn)行機(jī)制講完了,接下來展示一下如何用ThreadPoolExecutor創(chuàng)建線程池實(shí)例,具體代碼如下:
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
//用lambda表達(dá)式編寫方法體中的邏輯
Runnable run = () -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 10; i++) {
service.execute(run);
}
//這里一定要做關(guān)閉
service.shutdown();
}
上面的代碼中,我們創(chuàng)建的ThreadPoolExecutor線程池的核心線程數(shù)為5個(gè),所以,當(dāng)調(diào)用線程池執(zhí)行任務(wù)時(shí),同時(shí)運(yùn)行的線程最多也是5個(gè),執(zhí)行main方法,輸出結(jié)果如下:
pool-1-thread-3正在執(zhí)行
pool-1-thread-1正在執(zhí)行
pool-1-thread-4正在執(zhí)行
pool-1-thread-5正在執(zhí)行
pool-1-thread-3正在執(zhí)行
pool-1-thread-2正在執(zhí)行
pool-1-thread-1正在執(zhí)行
pool-1-thread-4正在執(zhí)行
pool-1-thread-5正在執(zhí)行
看到出來,線程池確實(shí)只有5個(gè)線程在工作,也就是真正的實(shí)現(xiàn)了線程的復(fù)用,說明我們的ThreadPoolExecutor實(shí)例是有效的。
參考:
https://www.cnblogs.com/liuzhihu/p/8177371.html
https://www.cnblogs.com/dolphin0520/p/3932921.html
《實(shí)戰(zhàn)Java:高并發(fā)程序設(shè)計(jì)》