為什么需要Java中的線程池?答案通常是,當(dāng)您在Java中開發(fā)一個(gè)簡(jiǎn)單的并發(fā)應(yīng)用程序時(shí),您創(chuàng)建一些Runnable的對(duì)象,然后創(chuàng)建相應(yīng)的線程對(duì)象來(lái)執(zhí)行它們。在Java中創(chuàng)建線程是一項(xiàng)昂貴的操作。如果每次執(zhí)行任務(wù)時(shí)都開始創(chuàng)建新的線程實(shí)例,那么應(yīng)用程序的性能肯定會(huì)下降。
1. 線程池在java中是如何工作的
線程池是預(yù)初始化線程的集合。通常集合的大小是固定的,但不是強(qiáng)制的。它有助于使用相同線程執(zhí)行N個(gè)任務(wù)。如果線程任務(wù)比線程多,那么任務(wù)需要在類似隊(duì)列的結(jié)構(gòu)中等待(FIFO -先進(jìn)先出)。
當(dāng)任何線程完成它的執(zhí)行時(shí),它可以從隊(duì)列中獲取一個(gè)新任務(wù)并執(zhí)行它。當(dāng)所有任務(wù)完成時(shí),線程保持活動(dòng)狀態(tài)并等待線程池中的更多任務(wù)。

監(jiān)視程序一直監(jiān)視隊(duì)列(通常是阻塞隊(duì)列BlockingQueue)以執(zhí)行任何新任務(wù)。一旦任務(wù)出現(xiàn),線程就會(huì)再次開始接收并執(zhí)行任務(wù)。
2. ThreadPoolExecutor
自Java 5以來(lái),Java并發(fā)API提供了一個(gè)機(jī)制執(zhí)行器框架。類似的接口為Executor接口、其子接口ExecutorService和實(shí)現(xiàn)這兩個(gè)接口的ThreadPoolExecutor類。
ThreadPoolExecutor分離任務(wù)的創(chuàng)建和執(zhí)行。使用ThreadPoolExecutor,您只需要實(shí)現(xiàn)可運(yùn)行對(duì)象并將它們發(fā)送到執(zhí)行程序。它負(fù)責(zé)它們的執(zhí)行、實(shí)例化和使用必要的線程運(yùn)行。
不僅如此,使用線程池還提高了性能。當(dāng)您將任務(wù)發(fā)送給執(zhí)行程序時(shí),它會(huì)嘗試使用一個(gè)線程池的線程來(lái)執(zhí)行此任務(wù),以避免線程的連續(xù)生成。
3.如何創(chuàng)建ThreadPoolExecutor
我們可以使用java.util.concurrent.Executors接口中的預(yù)先構(gòu)建的方法創(chuàng)建以下5種類型的線程池執(zhí)行器。
-
固定線程池執(zhí)行程序——?jiǎng)?chuàng)建一個(gè)線程池,該線程池重用固定數(shù)量的線程來(lái)執(zhí)行任意數(shù)量的任務(wù)。如果在所有線程都處于活動(dòng)狀態(tài)時(shí)提交了其他任務(wù),它們將在隊(duì)列中等待,直到線程可用。它最適合在現(xiàn)實(shí)生活中使用。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); -
緩存的線程池執(zhí)行程序——?jiǎng)?chuàng)建一個(gè)線程池,該線程池根據(jù)需要?jiǎng)?chuàng)建新線程,但在可用時(shí)將重用以前構(gòu)造的線程。如果任務(wù)長(zhǎng)時(shí)間運(yùn)行,則不要使用此線程池。如果線程的數(shù)量超出了系統(tǒng)所能處理的范圍,則會(huì)導(dǎo)致系統(tǒng)崩潰。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); -
調(diào)度線程池執(zhí)行程序——?jiǎng)?chuàng)建一個(gè)線程池,該線程池可以調(diào)度命令在給定延遲后運(yùn)行或定期執(zhí)行。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newScheduledThreadPool(10); -
單線程池執(zhí)行程序-創(chuàng)建單線程來(lái)執(zhí)行所有任務(wù)。當(dāng)您只有一個(gè)任務(wù)要執(zhí)行時(shí)就可以使用它。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor(); -
工作竊取線程池執(zhí)行程序-創(chuàng)建一個(gè)線程池,該線程池維護(hù)足夠的線程來(lái)支持給定的并行級(jí)別。這里的并行級(jí)別是指在多處理器機(jī)器上,在單點(diǎn)時(shí)間內(nèi)執(zhí)行給定任務(wù)所使用的線程的最大數(shù)量。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newWorkStealingPool(4);
4. ThreadPoolExecutor示例
4.1. 創(chuàng)建任務(wù)
讓我們創(chuàng)建一個(gè)任務(wù),每次都需要隨機(jī)時(shí)間來(lái)完成它。
public class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : " + name);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.2. 使用線程池執(zhí)行器執(zhí)行任務(wù)
給定的程序創(chuàng)建5個(gè)任務(wù)并提交到executor隊(duì)列。Executor使用兩個(gè)線程執(zhí)行所有任務(wù)。
public class ThreadPoolExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
for (int i = 1; i <= 5; i++) {
Task task = new Task("Task " + i);
System.out.println("Created : " + task.getName());
executor.execute(task);
}
executor.shutdown();
}
}
Program output:
Created : Task 1
Created : Task 2
Created : Task 3
Created : Task 4
Created : Task 5
Executing : Task 1
Executing : Task 2
Executing : Task 3
Executing : Task 4
Executing : Task 5
5. ScheduledThreadPoolExecutor
當(dāng)您必須只執(zhí)行一次惟一任務(wù)時(shí),固定線程池或緩存線程池是很好的選擇。當(dāng)您需要執(zhí)行一個(gè)任務(wù),重復(fù)N次,或者N次固定次數(shù),或者在固定延遲之后無(wú)限次,您應(yīng)該使用ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor提供了4種方法,它們提供不同的功能以重復(fù)的方式執(zhí)行任務(wù)。
-
ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行在給定延遲之后啟用的任務(wù)。 -
ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行在給定延遲之后啟用的ScheduledFuture。 -
ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行一個(gè)周期性動(dòng)作,該動(dòng)作在給定的初始延遲之后首先啟用,然后在給定的延遲期間啟用。如果此任務(wù)的任何執(zhí)行花費(fèi)的時(shí)間超過(guò)其周期,則后續(xù)執(zhí)行可能會(huì)延遲開始,但不會(huì)并發(fā)執(zhí)行。 -
ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行一個(gè)周期性動(dòng)作,該動(dòng)作在給定的初始延遲之后首先啟用,然后在給定的延遲期間啟用。無(wú)論一個(gè)長(zhǎng)時(shí)間運(yùn)行的任務(wù)花費(fèi)多少時(shí)間,兩次執(zhí)行之間都會(huì)有一個(gè)固定的延遲時(shí)間間隔。
5.1. ScheduledThreadPoolExecutor示例
public class ScheduledThreadPoolExecutorExample {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);
Task task = new Task("Repeat Task");
System.out.println("Created : " + task.getName());
executor.scheduleWithFixedDelay(task, 2, 2, TimeUnit.SECONDS);
}
}
public class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : " + name + ", Current Seconds : " + new Date());
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出結(jié)果為:
Created : Repeat Task
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:05 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:13 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:17 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:22 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:27 CST 2019
6. java中的自定義線程池實(shí)現(xiàn)
盡管Java通過(guò)Executor框架具有非常健壯的線程池功能。如果沒(méi)有executor,就不應(yīng)該創(chuàng)建自己的自定義線程池。我強(qiáng)烈反對(duì)這種企圖。然而,如果您想創(chuàng)建它來(lái)學(xué)習(xí),下面給出的是Java中的線程池實(shí)現(xiàn)。
@SuppressWarnings("unused")
public class CustomThreadPool {
// 線程池大小
private final int poolSize;
// 線程池內(nèi)部是一個(gè)數(shù)組
private final WorkerThread[] workers;
// 先進(jìn)先出隊(duì)列
private final LinkedBlockingQueue<Runnable> queue;
public CustomThreadPool(int poolSize) {
this.poolSize = poolSize;
queue = new LinkedBlockingQueue<Runnable>();
workers = new WorkerThread[poolSize];
for (int i = 0; i < poolSize; i++) {
workers[i] = new WorkerThread();
workers[i].start();
}
}
public void execute(Runnable task) {
synchronized (queue) {
queue.add(task);
queue.notify();
}
}
private class WorkerThread extends Thread {
public void run() {
Runnable task;
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException e) {
System.out.println("An error occurred while queue is waiting: " + e.getMessage());
}
}
task = queue.poll();
}
try {
task.run();
} catch (RuntimeException e) {
System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());
}
}
}
}
public void shutdown() {
System.out.println("Shutting down thread pool");
for (int i = 0; i < poolSize; i++) {
workers[i] = null;
}
}
}
我們使用ThreadPoolExecutor執(zhí)行同樣的任務(wù)
public class CustomThreadPoolExample {
public static void main(String[] args) {
CustomThreadPool customThreadPool = new CustomThreadPool(2);
for (int i = 1; i <= 5; i++) {
Task task = new Task("Task " + i);
System.out.println("Created : " + task.getName());
customThreadPool.execute(task);
}
}
}
程序輸出為:
Created : Task 1
Created : Task 2
Created : Task 3
Created : Task 4
Created : Task 5
Executing : Task 1, Current Seconds : Mon Mar 04 16:24:53 CST 2019
Executing : Task 2, Current Seconds : Mon Mar 04 16:24:53 CST 2019
Executing : Task 3, Current Seconds : Mon Mar 04 16:24:58 CST 2019
Executing : Task 4, Current Seconds : Mon Mar 04 16:25:00 CST 2019
Executing : Task 5, Current Seconds : Mon Mar 04 16:25:01 CST 2019
上面是非常原始的線程池實(shí)現(xiàn),有很多改進(jìn)。但是,與其完善上面的代碼,不如專注于學(xué)習(xí)Java executor框架。
還要注意,不正確的池或隊(duì)列處理也會(huì)導(dǎo)致死鎖或資源抖動(dòng)。Java社區(qū)對(duì)Executor框架進(jìn)行了良好的測(cè)試,您當(dāng)然可以避免這些問(wèn)題。
7. 總結(jié)
-
ThreadPoolExecutor類有四個(gè)不同的構(gòu)造函數(shù),但是由于它們的復(fù)雜性,Java并發(fā)API提供了Executors類來(lái)構(gòu)造executor和其他相關(guān)對(duì)象。盡管我們可以直接使用其構(gòu)造函數(shù)之一創(chuàng)建ThreadPoolExecutor,但建議使用Executors類。 - 我們?cè)谏厦鎰?chuàng)建的緩存線程池,如果需要執(zhí)行新任務(wù),它會(huì)創(chuàng)建新線程;如果已經(jīng)完成了正在運(yùn)行的任務(wù)(這些任務(wù)現(xiàn)在是可用的)的執(zhí)行,它會(huì)重用現(xiàn)有的線程。但是,緩存的線程池有一個(gè)缺點(diǎn),即新任務(wù)的線程總是處于不穩(wěn)定狀態(tài),因此,如果您向這個(gè)執(zhí)行程序發(fā)送太多的任務(wù),您可能會(huì)使系統(tǒng)過(guò)載。這可以通過(guò)使用固定線程池來(lái)解決,我們將在下一教程中學(xué)習(xí)這個(gè)問(wèn)題。
-
ThreadPoolExecutor類和一般的執(zhí)行器的一個(gè)關(guān)鍵方面是,必須顯式地結(jié)束它。如果不這樣做,執(zhí)行程序?qū)⒗^續(xù)執(zhí)行,程序?qū)⒉粫?huì)結(jié)束。如果執(zhí)行程序沒(méi)有要執(zhí)行的任務(wù),它將繼續(xù)等待新任務(wù),并且不會(huì)結(jié)束執(zhí)行。Java應(yīng)用程序在其所有非守護(hù)進(jìn)程線程完成執(zhí)行之前不會(huì)結(jié)束,因此,如果不終止執(zhí)行程序,應(yīng)用程序?qū)⒂肋h(yuǎn)不會(huì)結(jié)束。 - 要向執(zhí)行程序表明想要完成它,可以使用
ThreadPoolExecutor類的shutdown()方法。當(dāng)執(zhí)行程序完成所有掛起任務(wù)的執(zhí)行時(shí),它就完成了執(zhí)行。在您調(diào)用shutdown()方法之后,如果您嘗試將另一個(gè)任務(wù)發(fā)送給執(zhí)行程序,它將被拒絕,執(zhí)行程序?qū)伋?code>RejectedExecutionException異常。 -
ThreadPoolExecutor類提供了許多方法來(lái)獲取關(guān)于其狀態(tài)的信息。在示例中,我們使用getPoolSize()、getActiveCount()和getCompletedTaskCount()方法來(lái)獲取關(guān)于池大小、線程數(shù)和執(zhí)行程序完成的任務(wù)數(shù)的信息。您還可以使用getLargestPoolSize()方法,該方法每次返回池中已經(jīng)存在的最大線程數(shù)。 -
ThreadPoolExecutor類還提供了與執(zhí)行器的終結(jié)相關(guān)的其他方法。這些方法有:
- shutdownNow():該方法立即關(guān)閉執(zhí)行程序。它不執(zhí)行掛起的任務(wù)。它返回一個(gè)包含所有這些掛起任務(wù)的列表。調(diào)用此方法時(shí)正在運(yùn)行的任務(wù)將繼續(xù)執(zhí)行,但該方法不會(huì)等待它們的結(jié)束。
-
isTerminated():如果您調(diào)用了
shutdown()或shutdownNow()方法,并且執(zhí)行程序完成了關(guān)閉該方法的過(guò)程,則該方法返回true。 - isShutdown():如果調(diào)用了executor的shutdown()方法,則該方法返回true。
-
awaitTermination(long timeout,TimeUnitunit):該方法阻塞調(diào)用線程,直到執(zhí)行程序的任務(wù)結(jié)束或超時(shí)發(fā)生。
TimeUnit類是一個(gè)枚舉,它具有以下常量:DAYS,HOURS,MICROSECONDS等。