前言
在編程中經(jīng)常會(huì)使用線(xiàn)程來(lái)異步處理任務(wù),但是每個(gè)線(xiàn)程的創(chuàng)建和銷(xiāo)毀都需要一定的開(kāi)銷(xiāo)。如果每次執(zhí)行一個(gè)任務(wù)都需要一個(gè)新進(jìn)程去執(zhí)行,則這些線(xiàn)程的創(chuàng)建和銷(xiāo)毀將消耗大量的資源;并且線(xiàn)程都是“各自為政”的,很難對(duì)其進(jìn)行控制,更何況有一堆的線(xiàn)程在執(zhí)行。這時(shí)候就需要線(xiàn)程池來(lái)對(duì)線(xiàn)程進(jìn)行管理。在Java 1.5中提供了Executor框架用于把任務(wù)的提交和執(zhí)行解耦。任務(wù)的提交交給RUnnable或者Callable,而Executor框架用來(lái)處理任務(wù)。Executor框架中最核心的成員就是ThreadPoolExecutor,它是線(xiàn)程池的核心實(shí)現(xiàn)類(lèi)。本篇文章就著重講解ThreadPoolExecutor。
ThreadPoolExecutor介紹
可以通過(guò)ThreadPoolExecutor開(kāi)創(chuàng)建一個(gè)線(xiàn)程池,ThreadPoolExecutor類(lèi)一共有四個(gè)構(gòu)造方法。下面展示的都是擁有最多參數(shù)的的構(gòu)造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:核心線(xiàn)程數(shù)。默認(rèn)情況下線(xiàn)程池是空的,只有任務(wù)提交時(shí)才會(huì)創(chuàng)建線(xiàn)程。如果當(dāng)前運(yùn)行的線(xiàn)程數(shù)少于corePoolSize,則會(huì)創(chuàng)建新線(xiàn)程來(lái)處理任務(wù);如果等于或者多于corePoolSize,則不會(huì)創(chuàng)建,如果調(diào)用線(xiàn)程池的prestartAllcoreThread()方法,線(xiàn)程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線(xiàn)程來(lái)等待任務(wù)。maximumPoolSize:線(xiàn)程池允許創(chuàng)建的最大線(xiàn)程數(shù),如果任務(wù)隊(duì)列滿(mǎn)了并且線(xiàn)程數(shù)小于maximumPoolSize時(shí),則線(xiàn)程池仍舊會(huì)創(chuàng)建新的線(xiàn)程來(lái)處理任務(wù)。keepAliveTime:非核心線(xiàn)程閑置的超時(shí)時(shí)間,超過(guò)這個(gè)事件則回收,如果任務(wù)很多,并且每個(gè)任務(wù)的執(zhí)行事件很短,則可以調(diào)用keepAliveTime來(lái)提高線(xiàn)程的利用率。另外,如果設(shè)置allowCoreThreadTimeOut屬性為true時(shí),keepAliveTime也會(huì)應(yīng)用到核心線(xiàn)程上。TimeUnit:keepAliveTime參數(shù)的時(shí)間單位,可選的單位有天(DAYS)、小時(shí)(HOURS)、分鐘(MINUTES)、秒(SECONDS)、毫秒(MILLOSECONDS)等。BlockingQueue<Runnable> :任務(wù)隊(duì)列,如果當(dāng)前線(xiàn)程數(shù)大于corePoolSize,則將任務(wù)添加到此任務(wù)隊(duì)列中。該任務(wù)隊(duì)列是BlockiingQueue類(lèi)型,也就是阻塞隊(duì)列。ThreadFactory:線(xiàn)程工廠??梢杂镁€(xiàn)程工廠給每個(gè)創(chuàng)建出來(lái)的線(xiàn)程設(shè)置名字。一般情況下無(wú)須設(shè)置參數(shù)。RejectedExecutionHandler :飽和策略,這是當(dāng)任務(wù)隊(duì)列中和線(xiàn)程池都滿(mǎn)了時(shí)所采取的對(duì)應(yīng)策略,默認(rèn)是ABordPolicy,表示無(wú)法處理新任務(wù),并拋出RejetctedExecutionException異常。此外還有3種策略,它們分別如下:
(1)CallerRunsPolicy:用調(diào)用者所在的線(xiàn)程來(lái)處理任務(wù),此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
(2)DiscardPolicy:不能執(zhí)行的任務(wù),并將該任務(wù)刪除。
(3)DiscardOldestPolicy:丟棄隊(duì)列最近的任務(wù),并執(zhí)行當(dāng)前的任務(wù)。
線(xiàn)程池的處理流程和原理

從上圖1中可以得知線(xiàn)程的處理流程主要分為3個(gè)步驟:
- 提交任務(wù)后,線(xiàn)程池先判斷線(xiàn)程數(shù)是否達(dá)到核心線(xiàn)程數(shù)(corePoolSize)。如果未核心線(xiàn)程數(shù),則創(chuàng)建核心線(xiàn)程處理任務(wù);否則就執(zhí)行下一步操作。
- 接著線(xiàn)程池判斷任務(wù)隊(duì)列是否滿(mǎn)了。如果沒(méi)滿(mǎn),則將任務(wù)添加到任務(wù)隊(duì)列中,否則,就執(zhí)行下一步操作。
- 接著因?yàn)槿蝿?wù)隊(duì)列滿(mǎn)了,線(xiàn)程池就會(huì)判斷線(xiàn)程數(shù)是否達(dá)到了最大線(xiàn)程數(shù),如果未達(dá)到,則創(chuàng)建非核心線(xiàn)程1處理任務(wù);否則,就執(zhí)行飽和策略,默認(rèn)會(huì)拋出RejectedExecutionException異常。
雖然上面介紹了線(xiàn)程池的處理流程,但還不是很直觀。我們結(jié)合下面的圖2來(lái)更好的了解線(xiàn)程池的原理。

從圖2中可以看到,如果我們執(zhí)行ThreadPoolExecutor的execute方法,會(huì)遇到各種情況:
(1)如果線(xiàn)程池中的線(xiàn)程數(shù)未達(dá)到核心線(xiàn)程數(shù),則創(chuàng)建核心線(xiàn)程處理任務(wù)。
(2)如果線(xiàn)程數(shù)大于或者等于核心線(xiàn)程數(shù),則將任務(wù)加入任務(wù)隊(duì)列,線(xiàn)程池中的空閑線(xiàn)程會(huì)不斷地從任務(wù)隊(duì)列中取出任務(wù)進(jìn)行處理。
(3)如果任務(wù)隊(duì)列滿(mǎn)了,并且線(xiàn)程數(shù)沒(méi)有達(dá)到最大線(xiàn)程數(shù),則創(chuàng)建非核心線(xiàn)程去處理任務(wù)。
(4)如果線(xiàn)程數(shù)超過(guò)了最大線(xiàn)程數(shù),則執(zhí)行飽和策略。
ThreadPoolExecutor的基本使用
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
tools:context=".MainActivity">
<Button
android:id="@+id/btn_start"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="啟動(dòng)" />
</LinearLayout>
package com.ju.executordemo;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MainActivity extends AppCompatActivity{
private Button btnStart;
private final int CORE_POOL_SIZE = 4;//核心線(xiàn)程數(shù)
private final int MAX_POOL_SIZE = 5;//最大線(xiàn)程數(shù)
private final long KEEP_ALIVE_TIME = 10;//空閑線(xiàn)程超時(shí)時(shí)間
private ThreadPoolExecutor executorPool;
private int songIndex = 0;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initView();
//創(chuàng)建線(xiàn)程池
initExec();
}
private void initView() {
btnStart = findViewById(R.id.btn_start);
btnStart.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
begin();
}
});
}
public void begin() {
songIndex++;
try {
executorPool.execute(new WorkerThread("歌曲" + songIndex));
} catch (Exception e) {
Log.e("threadtest", "AbortPolicy...已超出規(guī)定的線(xiàn)程數(shù)量,不能再增加了....");
}
// 所有任務(wù)已經(jīng)執(zhí)行完畢,我們?cè)诒O(jiān)聽(tīng)一下相關(guān)數(shù)據(jù)
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20 * 1000);
} catch (Exception e) {
}
sout("monitor after");
}
}).start();
}
private void sout(String msg) {
Log.i("threadtest", "monitor " + msg
+ " CorePoolSize:" + executorPool.getCorePoolSize()
+ " PoolSize:" + executorPool.getPoolSize()
+ " MaximumPoolSize:" + executorPool.getMaximumPoolSize()
+ " ActiveCount:" + executorPool.getActiveCount()
+ " TaskCount:" + executorPool.getTaskCount()
);
}
private void initExec() {
executorPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
}
class WorkerThread implements Runnable {
private String threadName;
public WorkerThread (String name){
threadName = name;
}
@Override
public void run() {
boolean flag = true;
try {
while (flag){
String tn = Thread.currentThread().getName();
//模擬耗時(shí)操作
Random random = new Random();
long time = (random.nextInt(5) + 1) * 1000;
Thread.sleep(time);
Log.e("threadtest","線(xiàn)程\"" + tn + "\"耗時(shí)了(" + time / 1000 + "秒)下載了第<" + threadName + ">");
//下載完畢跳出循環(huán)
flag = false;
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}

上述代碼模擬一個(gè)下載音樂(lè)的例子來(lái)演示ThreadPoolExecutor的基本使用,啟動(dòng)ThreadPoolExecutor的函數(shù)是
execute()方法,然后他需要一個(gè)Runnable的參數(shù)來(lái)進(jìn)行啟動(dòng)。ThreadPoolExecutor的其它種類(lèi)
通過(guò)直接或者間接地配置ThreadPoolExecutor的參數(shù)可以創(chuàng)建不同類(lèi)型的ThreadPoolExecutor,其中有 4 種線(xiàn)程池比較常用,它們分別是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool。下面分別介紹這4種線(xiàn)程池。
- FixedThreadPool
FixedThreadPool 是可重用固定線(xiàn)程數(shù)的線(xiàn)程池。在 Executors 類(lèi)中提供了創(chuàng)建FixedThreadPool的方法, 如下所示:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool指定的參數(shù)nThreads,也就意味著FixedThreadPool只有核心線(xiàn)程,并且數(shù)量是固定的,沒(méi)有非核心線(xiàn)程。keepAliveTime設(shè)置為0L 意味著多余的線(xiàn)程會(huì)被立即終止。因?yàn)椴粫?huì)產(chǎn)生多余的線(xiàn)程,所以keepAliveTime是無(wú)效的參數(shù)。另外,任 務(wù)隊(duì)列采用了無(wú)界的阻塞隊(duì)列LinkedBlockingQueue。FixedThreadPool的execute方法的執(zhí)行示意圖如圖4所 示。

當(dāng)執(zhí)行
execute()方法時(shí),如果當(dāng)前運(yùn)行的線(xiàn)程未達(dá)到corePoolSize(核心線(xiàn)程數(shù))時(shí) 就創(chuàng)建核心線(xiàn)程來(lái)處理任務(wù),如果達(dá)到了核心線(xiàn)程數(shù)則將任務(wù)添加到LinkedBlockingQueue中。 FixedThreadPool就是一個(gè)有固定數(shù)量核心線(xiàn)程的線(xiàn)程池,并且這些核心線(xiàn)程不會(huì)被回收。當(dāng)線(xiàn)程數(shù)超過(guò)corePoolSize時(shí),就將任務(wù)存儲(chǔ)在任務(wù)隊(duì)列中;當(dāng)線(xiàn)程池有空閑線(xiàn)程時(shí),則從任務(wù)隊(duì)列中去取任務(wù)執(zhí)行- CachedThreadPool
CachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建線(xiàn)程的線(xiàn)程池,創(chuàng)建CachedThreadPool的代碼如下所示:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize為0,maximumPoolSize設(shè)置為Integer.MAX_VALUE,這意味著 CachedThreadPool沒(méi)有核心線(xiàn)程,非核心線(xiàn)程是無(wú)界的。keepAliveTime設(shè)置為60L,則空閑線(xiàn)程等待新任務(wù) 的最長(zhǎng)時(shí)間為 60s。在此用了阻塞隊(duì)列 SynchronousQueue,它是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作 必須等待另一個(gè)線(xiàn)程的移除操作,同樣任何一個(gè)移除操作都等待另一個(gè)線(xiàn)程的插入操作。CachedThreadPool 的execute方法的執(zhí)行示意圖如圖5所示。

當(dāng)執(zhí)行
execute()方法時(shí),首先會(huì)執(zhí)行SynchronousQueue的offer()方法來(lái)提交任務(wù),并且查詢(xún)線(xiàn)程池中是否有空閑的線(xiàn)程執(zhí)行SynchronousQueue的poll()方法來(lái)移除任務(wù)。如果有則配對(duì)成功,將任務(wù)交給這個(gè)空閑的線(xiàn)程處理;如果沒(méi)有則配對(duì)失敗,創(chuàng)建新的線(xiàn)程去處理任務(wù)。當(dāng)線(xiàn)程池中的線(xiàn)程空閑時(shí),它會(huì)執(zhí)行 SynchronousQueue的poll()方法,等待SynchronousQueue中新提交的任務(wù)。如果超過(guò) 60s 沒(méi)有新任務(wù)提交到 SynchronousQueue,則這個(gè)空閑線(xiàn)程將終止。因?yàn)閙aximumPoolSize 是無(wú)界的,所以如果提交的任務(wù)大于線(xiàn) 程池中線(xiàn)程處理任務(wù)的速度就會(huì)不斷地創(chuàng)建新線(xiàn)程。另外,每次提交任務(wù)都會(huì)立即有線(xiàn)程去處理。所以,CachedThreadPool比較適于大量的需要立即處理并且耗時(shí)較少的任務(wù)。- SingleThreadExecutor
SingleThreadExecutor是使用單個(gè)工作線(xiàn)程的線(xiàn)程池,其創(chuàng)建源碼如下所示:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize和maximumPoolSize都為1,意味著SingleThreadExecutor只有一個(gè)核心線(xiàn)程,其他的參數(shù)都 和FixedThreadPool一樣,這里就不贅述了。SingleThreadExecutor的execute()方法的執(zhí)行示意圖如圖5所示。

當(dāng)執(zhí)行
execute()方法時(shí),如果當(dāng)前運(yùn)行的線(xiàn)程數(shù)未達(dá)到核心線(xiàn)程數(shù),也就是當(dāng)前沒(méi)有運(yùn)行的線(xiàn)程,則創(chuàng)建一個(gè)新線(xiàn)程來(lái)處理任務(wù)。如果當(dāng)前有運(yùn)行的線(xiàn)程,則將任務(wù)添加到阻塞隊(duì)列LinkedBlockingQueue中。因此,SingleThreadExecutor能確保所有的任務(wù)在一個(gè)線(xiàn)程中按照順序逐一執(zhí)行。- ScheduledThreadPool
ScheduledThreadPool是一個(gè)能實(shí)現(xiàn)定時(shí)和周期性任務(wù)的線(xiàn)程池,它的創(chuàng)建源碼如下所示:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
這里創(chuàng)建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,它主要用于給定延時(shí)之后的運(yùn)行任務(wù)或者定期處理任務(wù)。ScheduledThreadPoolExecutor 的構(gòu)造方法如下所示:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
從上面的代碼可以看出,ScheduledThreadPoolExecutor 的構(gòu)造方法最終調(diào)用的是ThreadPoolExecutor的 構(gòu)造方法。corePoolSize是傳進(jìn)來(lái)的固定數(shù)值,maximumPoolSize的值是Integer.MAX_VALUE。因?yàn)椴捎玫?DelayedWorkQueue是無(wú)界的,所以maximumPoolSize這個(gè)參數(shù)是無(wú)效的。ScheduledThreadPoolExecutor的 execute方法的執(zhí)行示意圖如圖6所示。

當(dāng)執(zhí)行ScheduledThreadPoolExecutor的
scheduleAtFixedRate()或者scheduleWithFixedDelay()方法時(shí),會(huì)向DelayedWorkQueue添加一個(gè) 實(shí)現(xiàn)RunnableScheduledFuture接口的ScheduledFutureTask(任務(wù)的包裝類(lèi)),并會(huì)檢查運(yùn)行的線(xiàn)程是否達(dá)到corePoolSize。如果沒(méi)有則新建線(xiàn)程并啟動(dòng)它,但并不是立即去執(zhí)行任務(wù),而是去DelayedWorkQueue中取ScheduledFutureTask,然后去執(zhí)行任務(wù)。如果運(yùn)行的線(xiàn)程達(dá)到了corePoolSize時(shí),則將任務(wù)添加到DelayedWorkQueue中。DelayedWorkQueue會(huì)將任務(wù)進(jìn)行排序,先要執(zhí)行的任務(wù)放在隊(duì)列的前面。其跟此前介紹的線(xiàn)程池不同的是,當(dāng)執(zhí)行完任務(wù)后,會(huì)將ScheduledFutureTask中time變量改為下次要執(zhí)行的時(shí)間并放回到DelayedWorkQueue中。參考
- [劉望舒]Android進(jìn)階之光
- Android線(xiàn)程池(一)簡(jiǎn)單使用