執(zhí)行多線程并發(fā)任務的時候,如果任務類型相同,一般會考慮使用線程池,一方面利用了并發(fā)的優(yōu)勢,一方面避免創(chuàng)建大量線程得不償失。使用線程池執(zhí)行的任務一般是我們自己的代碼,或者第三方的代碼,有沒有想過,如果這些代碼拋出異常時,線程池會怎么處理呢?如果不處理又會有什么影響?
異常的影響
Java 理論與實踐: 嗨,我的線程到哪里去了?這篇文章列舉了一個由于RuntimeException引發(fā)的線程泄漏問題:
考慮這樣一個假設的中間件服務器應用程序,它聚合來自各種輸入源的消息,然后將它們提交到外部服務器應用程序,從外部應用程序接收響應并將響應路由回適當?shù)妮斎朐础τ诿總€輸入源,都有一個以其自己的方式接受其輸入消息的插件(通過掃描文件目錄、等待套接字連接、輪詢數(shù)據庫表等)。插件可以由第三方編寫,即使它們是在服務器 JVM 上運行的。這個應用程序擁有(至少)兩個內部工作隊列 ― 從插件處接收的正在等待被發(fā)送到服務器的消息(“出站消息”隊列),以及從服務器接收的正在等待被傳遞到適當插件的響應(“入站響應”隊列)。通過調用插件對象上的服務例程 incomingResponse() ,消息被路由到最初發(fā)出請求的插件。
從插件接收消息后,就被排列到出站消息隊列中。由一個或多個從隊列讀取消息的線程處理出站消息隊列中的消息、記錄其來源并將它提交給遠程服務器應用程序(假定通過 Web 服務接口)。遠程應用程序最終通過 Web 服務接口返回響應,然后我們的服務器將接收的響應排列到入站響應隊列中。一個或多個響應線程從入站響應隊列讀取消息并將其路由到適當?shù)牟寮瑥亩瓿赏怠奥贸獭薄?br> 在這個應用程序中,有兩個消息隊列,分別用于出站請求和入站響應,不同的插件內可能也有另外的隊列。我們還有幾種服務線程,一個從出站消息隊列讀取請求并將其提交給外部服務器,一個從入站響應隊列讀取響應并將其路由到插件,在用于向套接字或其它外部請求源提供服務的插件中可能也有一些線程。
如果這些線程中的一個(如響應分派線程)消失了,將會發(fā)生什么?因為插件仍能夠提交新消息,所以它們可能不會立即注意到某些方面出錯了。消息仍將通過各種輸入源到達,并通過我們的應用程序提交到外部服務。因為插件并不期待立即獲得其響應,因此它仍沒有意識到出了問題。最后,接收的響應將排滿隊列。如果它們存儲在內存中,那么最終將耗盡內存。即使不耗盡內存,也會有人在某個時刻發(fā)現(xiàn)響應得不到傳遞 ― 但這可能需要一些時間,因為系統(tǒng)的其它方面仍能正常發(fā)揮作用。
當主要的任務處理方面由線程池而不是單個線程來處理時,對于偶然的線程泄漏的后果有一定程度的保護,因為一個執(zhí)行得很好的八線程的線程池,用七個線程完成其工作的效率可能仍可以接受。起初,可能沒有任何顯著的差異。但是,系統(tǒng)性能最終將下降,雖然這種下降的方式不易被察覺。
服務器應用程序中的線程泄漏問題在于不是總是容易從外部檢測它。因為大多數(shù)線程只處理服務器的部分工作負載,或可能僅處理特定類型的后臺任務,所以當程序實際上遭遇嚴重故障時,在用戶看來它仍在正常工作。這一點,再加上引起線程泄漏的因素并不總是留下明顯痕跡,就會引起令人驚訝甚或使人迷惑的應用程序行為。
我們在使用線程池處理并行任務時,在線程池的生命周期當中,將通過某種抽象機制(Runnable)調用許多未知的代碼,這些代碼有可能是我們自己寫的,也有可能來自第三方。任何代碼都有可能拋出一個RuntimeException,如果這些提交的Runnable拋出了RuntimeException,線程池可以捕獲他,線程池有可能會創(chuàng)建一個新的線程來代替這個因為拋出異常而結束的線程,也有可能什么也不做(這要看線程池的策略)。即使不會造成線程泄漏,我們也會丟失這個任務的執(zhí)行情況,無法感知任務執(zhí)行出現(xiàn)了異常。
所以,有必要處理提交到線程池運行的代碼拋出的異常。
如何處理異常
簡單了解線程池

上面是我畫的思維導圖
先介紹一下jdk中線程池的實現(xiàn):

Executor定義了一個通用的并發(fā)任務框架,即通過execute方法執(zhí)行一個任務。
ExecutorService定義了并發(fā)框架(線程池)的生命周期。
AbstractExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor實現(xiàn)了并發(fā)任務框架(線程池)。其中ScheduledThreadPoolExecutor支持定時及周期性任務的執(zhí)行。
Executors相當于一個線程池工廠類,返回了不同執(zhí)行策略的線程池對象。
我們一般使用Executors.new...方法來得到某種線程池:
newCachedThreadPool
創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool
創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行。
其中,前三者返回ExecutorService實例,他們的實現(xiàn)為ThreadPoolExecutor或其包裝類;newScheduledThreadPool返回的是ScheduledExecutorService實例,他的實現(xiàn)為ScheduledThreadPoolExecutor或其包裝類。
ExecutorService exec = Executors.newFixedThreadPool(8);
以上述代碼為例,得到ExecutorService實例后,我們可以通過兩種方式提交任務(Runnable):
exec.execute(runnable)exec.submit(runnable)
對于這兩種不同的任務提交方式,我們有不同的異常處理辦法。
exec.submit(runnable)

使用exec.submit(runnable)這種方式提交任務時,submit方法會將我們的Runnable包裝為一個RunnableFuture對象,這個對象實際上是FutureTask實例,然后將這個FutureTask交給execute方法執(zhí)行。

Future用來管理任務的生命周期,將Future實例提交給異步線程執(zhí)行后,可以調用Future.get方法獲取任務執(zhí)行的結果。我們知道Runnable執(zhí)行是沒有返回結果的,那么這個結果是怎么來的?

可以看到,在FutureTask的構造方法中,將Runnable包裝成了一個Callable類型的對象。

FutureTask的run方法中,調用了callable對象的call方法,也就調用了我們傳入的Runnable對象的run方法??梢钥吹?,如果代碼(Runnable)拋出異常,會被捕獲并且把這個異常保存下來。


可以看到,在調用get方法時,會將保存的異常重新拋出。所以,我們在使用submit方法提交任務的時候,利用返回的Future對象,通過他的get方法可以得到任務運行中拋出的異常,然后針對異常做一些處理。
由于我們在調用submit時并沒有給Runnable指定返回結果,所以在將Runnable包裝為Callable的時候,會傳入一個null,故get方法返回一個null.
當然,我們也可以直接傳入Callable類型的任務,這樣就可以獲取任務執(zhí)行返回結果,并且得到任務執(zhí)行拋出的異常。
這就是使用線程池時處理任務中拋出異常的第一種方法:使用ExecutorService.submit執(zhí)行任務,利用返回的Future對象的get方法接收拋出的異常,然后進行處理
exec.execute(runnable)
利用Future.get得到任務拋出的異常的缺點在于,我們需要顯式的遍歷Future,調用get方法獲取每個任務執(zhí)行拋出的異常,然后處理。
很多時候我們僅僅是使用exec.execute(runnable)這種方法來提交我們的任務。這種情況下任務拋出的異常如何處理呢?
在使用exec.execute(runnable)提交任務的時候(submit其實也是調用execute方法執(zhí)行),我們的任務最終會被一個Worker對象執(zhí)行。這個Worker內部封裝了一個Thread對象,這個Thread就是線程池的工作者線程。工作者線程會調用runWorker方法來執(zhí)行我們提交的任務:(代碼比較長,就直接粘過來了)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
上面代碼的基本意思就是不停的從任務隊列中取出任務執(zhí)行,如果任務代碼(task.run)拋出異常,會被最內層的try--catch塊捕獲,然后重新拋出。注意到最里面的finally塊,在重新
拋出異常之前,要先執(zhí)行afterExecute方法,這個方法的默認實現(xiàn)為空,即什么也不做。我們可以在這個方法上做點文章,這就是我們的第二種方法,
重寫ThreadPoolExecutor.afterExecute方法,處理傳遞到afterExecute方法中的異常:
class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method. If you would like to trap both kinds of failures in this method, you can further probe for such cases, as in this sample subclass that prints either the direct cause or the underlying exception if a task has been aborted:
上面是java doc給出的建議??梢钥吹?,代碼中還處理了task是FutureTask的情況。回想一下submit方式提交任務的情況:
在submit方法中,我們傳入的Runnable/Callable(要執(zhí)行的任務)被封裝為FutureTask對象,交給
execute方法執(zhí)行-
經過一系列操作,提交的FutureTask對象被Worker對象中的工作者線程所執(zhí)行,也就是runWorker方法
此時的代碼運行情況:runWorker->submit方法封裝的FutureTask的run方法->我們提交的Runnable的run方法
此時從
我們提交的Runnable的run方法中拋出了一個未檢測異常RunnableException,被FutureTask的run方法捕獲FutureTask的run方法捕獲異常后保存,不再重新拋出。同時意味著run方法執(zhí)行結束。
runWorker方法沒有檢測到異常,
task.run當作正常運行結束。但是還是會執(zhí)行afterExecute方法。
經過這樣的梳理,上面的代碼為什么這么寫就一目了然了。
上面已經提到了兩種解決任務代碼拋出未檢測異常的方案。接下來是第三種:
當一個線程因為未捕獲的異常而退出時,JVM會把這個事件報告給應用提供的UncaughtExceptionHandler異常處理器,如果沒有提供任何的異常處理器,那么默認的行為就是將堆棧信息輸送到System.err。
看一下上面的runWorker方法,如果task.run(任務代碼)拋出了異常,異常會層層拋出,最終導致這個線程退出。此時這個拋出的異常就會傳遞到UncaughtExceptionHandler實例當中,由uncaughtException(Thread t,Throwable e)這個方法處理。
于是就有了第三種解決任務代碼拋出異常的方案:為工作者線程設置UncaughtExceptionHandler,在uncaughtException方法中處理異常
注意,這個方案不適用與使用submit方式提交任務的情況,原因上面也提到了,F(xiàn)utureTask的run方法捕獲異常后保存,不再重新拋出,意味著runWorker方法并不會捕獲到拋出的異常,線程也就不會退出,也不會執(zhí)行我們設置的UncaughtExceptionHandler。
如何為工作者線程設置UncaughtExceptionHandler呢?ThreadPoolExecutor的構造函數(shù)提供一個ThreadFactory,可以在其中設置我們自定義的UncaughtExceptionHandler,這里不再贅述。
至于第四中方案,就很簡單了:在我們提供的Runnable的run方法中捕獲任務代碼可能拋出的所有異常,包括未檢測異常。這種方法比較簡單,也有他的局限性,不夠靈活,我們的處理被局限在了線程代碼邊界之內。
總結
通過上面的分析我們得到了四種解決任務代碼拋異常的方案:
在我們提供的Runnable的run方法中捕獲任務代碼可能拋出的所有異常,包括未檢測異常
使用ExecutorService.submit執(zhí)行任務,利用返回的Future對象的get方法接收拋出的異常,然后進行處理
重寫
ThreadPoolExecutor.afterExecute方法,處理傳遞到afterExecute方法中的異常為工作者線程設置
UncaughtExceptionHandler,在uncaughtException方法中處理異常
要注意的是,使用最后一種方案時,無法處理以submit的方式提交的任務。