簡書 Trust_FreeDom
轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!
啟動線程執(zhí)行任務(wù),如果需要在任務(wù)執(zhí)行完畢之后得到任務(wù)執(zhí)行結(jié)果,可以使用從Java 1.5開始提供的Callable和Future
下面就分析一下Callable、Future以及FutureTask的具體實現(xiàn)及使用方法
源碼分析基于JDK 1.7
一、Callable 與 Runnable
java.lang.Runnable是一個接口,只有一個run()方法
public interface Runnable {
public abstract void run();
}
run()方法的返回值是void,故在執(zhí)行完任務(wù)后無法返回任何結(jié)果
Callable是java.util.concurrent包下的,也是一個接口,也只有一個call()方法,類似于java.lang.Runnable的run()方法,實現(xiàn)Callable接口的類和實現(xiàn)Runnable接口的類都是可以被其它線程執(zhí)行的任務(wù)
public interface Callable<V> {
V call() throws Exception;
}
可以看到call()方法是有返回值的,可以將執(zhí)行的結(jié)果返回
Callable和Runnable的區(qū)別:
1、Callable中定義的是call()方法,Runnable中定義的是run()方法
2、Callable中的call()方法可以返回執(zhí)行任務(wù)后的結(jié)果,Runnable中的run()方法無法獲得返回值
3、Callable中的call()方法定義了throws Exception拋出異常,拋出的異??梢栽谥骶€程Future.get()時被主線程捕獲;Runnable中的run()方法沒有定義拋出異常,運(yùn)行任務(wù)時發(fā)生異常時也會上拋,因為即使不加默認(rèn)也會上拋RuntimeException,但異常無法被主線程獲取
4、運(yùn)行Callable任務(wù)可以拿到一個Future對象代表異步運(yùn)算的結(jié)果
二、Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future是java.util.concurrent包下的一個接口,代表著一個異步計算的結(jié)果,可以通過get()獲取線程執(zhí)行的返回值,cancel()取消任務(wù)執(zhí)行,isCancelled()和isDone()獲得任務(wù)執(zhí)行的情況
boolean cancel(boolean mayInterruptIfRunning)
嘗試取消任務(wù)的執(zhí)行,取消成功返回true,取消失敗返回false
mayInterruptIfRunning表示是否允許中斷正在執(zhí)行的任務(wù)
1、如果任務(wù)還未開始,cancel返回true,且任務(wù)永遠(yuǎn)不會被執(zhí)行
2、如果任務(wù)正在執(zhí)行,根據(jù)mayInterruptIfRunning的值判斷是否需要中斷執(zhí)行中的任務(wù),且如果mayInterruptIfRunning為true,會調(diào)用中斷邏輯,返回true;如果mayInterruptIfRunning為false,不會調(diào)用線程中斷,只是將任務(wù)取消
3、如果任務(wù)結(jié)束(可能是正常完成、異常終止、被取消),返回false
4、如果cancel()操作返回true,后續(xù)調(diào)用isDone()、isCancelled()都返回true
boolean isCancelled()
表示任務(wù)是否被取消成功,如果在任務(wù)正常完成前被取消成功,則返回true
boolean isDone()
表示任務(wù)是否已經(jīng)完成,則返回true,注意:正常完成、異常 或 取消操作都代表任務(wù)完成
V get() 和 V get(long timeout, TimeUnit unit)
get()用來獲取執(zhí)行結(jié)果,這個方法會產(chǎn)生阻塞,會一直等到任務(wù)執(zhí)行完畢才返回
get(long timeout, TimeUnit unit)用來獲取執(zhí)行結(jié)果,如果在指定時間內(nèi)還沒獲取到結(jié)果,會拋出TimeoutException
Future提供了三種功能:
1、獲取任務(wù)執(zhí)行的結(jié)果
2、取消任務(wù)
3、判斷任務(wù)是否完成 或 是否取消
因為Future只是一個接口,所以是無法直接用來創(chuàng)建對象使用的,因此就有了下面的FutureTask
三、FutureTask
public class FutureTask<V> implements RunnableFuture<V>
FutureTask實現(xiàn)了RunnableFuture接口,那么RunnableFuture又是什么呢?
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture接口繼承了Runnable和Future,所以它既是一個可以讓線程執(zhí)行的Runnable任務(wù),又是一個可以獲取Callable返回值的Future
FutureTask的屬性
/** The run state of this task */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome;
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
state 是任務(wù)的運(yùn)行狀態(tài)
- 初始化時是NEW
- 任務(wù)終止的狀態(tài)有NORMAL(正常結(jié)束)、EXCEPTIONAL(異常結(jié)束)、CANCELLED(被取消)、INTERRUPTED(執(zhí)行中被中斷),這些狀態(tài)是通過set()、setException、cancel()方法觸發(fā)的
- COMPLETING 和 INTERRUPTING是兩個中間狀態(tài),當(dāng)正常結(jié)束設(shè)置outcome屬性前是COMPLETING,設(shè)置后變成NORMAL;當(dāng)中斷運(yùn)行中線程前是INTERRUPTING,調(diào)用thread.interrupt()后是INTERRUPTED
可能的狀態(tài)轉(zhuǎn)換:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
callable 是線程執(zhí)行的有返回值的任務(wù)
outcome 是任務(wù)執(zhí)行后的結(jié)果或異常
waiters 表示等待獲取結(jié)果的阻塞線程,鏈表結(jié)構(gòu),后等待線程的會排在鏈表前面
FutureTask的構(gòu)造方法
FutureTask有兩個構(gòu)造方法:
FutureTask(Callable<V> callable)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
構(gòu)造方法參數(shù)是Callable定義的任務(wù),并將state置為NEW,只有當(dāng)state為NEW時,callable才能被執(zhí)行
FutureTask(Runnable runnable, V result)
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
參數(shù)為Runnable和帶泛型的result對象,由于Runnable本身是沒有返回值的,故線程的執(zhí)行結(jié)果通過result返回
可以看到通過runnable和result封裝了個Callable,實際上是new RunnableAdapter<T>(task, result),這個Adapter適配器將Runnable和result轉(zhuǎn)換成Callable,并返回result
FutureTask.run()的實現(xiàn)
線程運(yùn)行時真正執(zhí)行的方法,Callable.call()會在其中執(zhí)行,并包含設(shè)置返回值或異常的邏輯
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
1、任務(wù)執(zhí)行狀態(tài)不是NEW,直接返回;將runner屬性從null->當(dāng)前線程不成功,直接返回
2、調(diào)用call()方法,調(diào)用成功,使用set()設(shè)置返回值
3、調(diào)用過程發(fā)生異常,使用setException()保存異常
set() 和 setException()
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
set()和setException()的實現(xiàn)基本一樣,都是先將任務(wù)運(yùn)行狀態(tài)從NEW->COMPLETING,分別設(shè)置返回值或異常給outcome,再將狀態(tài)分別置為NORMAL和EXCEPTIONAL,最后調(diào)用finishCompletion()依次喚醒等待獲取結(jié)果的阻塞線程
finishCompletion()實現(xiàn)
/**
* Removes and signals all waiting threads, invokes done(), and nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//將成員變量waiters置為null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//循環(huán)喚醒WaitNode中的等待線程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//由子類實現(xiàn)的方法
done();
callable = null; // to reduce footprint
}
1、執(zhí)行FutureTask類的get方法時,會把主線程封裝成WaitNode節(jié)點并保存在waiters鏈表中
2、FutureTask任務(wù)執(zhí)行完成后,通過UNSAFE設(shè)置waiters的值為null,并通過LockSupport.unpark方法依次喚醒等待獲取結(jié)果的線程
FutureTask.get()的實現(xiàn)
get()方法有兩個實現(xiàn),一個是一直等待獲取結(jié)果,直到任務(wù)執(zhí)行完;一個是等待指定時間,超時后任務(wù)還未完成會上拋TimeoutException
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
內(nèi)部通過awaitDone()對主線程進(jìn)行阻塞,具體實現(xiàn)如下:
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; //截止時間
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果主線程已經(jīng)被中斷,removeWaiter(),并上拋InterruptedException
//注意:Thread.interrupted()后會導(dǎo)致線程的中斷狀態(tài)為false
if (Thread.interrupted()) {
removeWaiter(q); //線程被中斷的情況下,從waiters鏈表中刪除q
throw new InterruptedException();
}
int s = state;
//如果任務(wù)已經(jīng)完成(可能是正常完成、異常、中斷),直接返回,即還沒有開始等待,任務(wù)已經(jīng)完成了
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果任務(wù)正在完成,讓出CPU資源,等待state變成NORMAL或EXCEPTIONAL
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//s<COMPLETING 且 還沒有創(chuàng)建WaitNode
else if (q == null)
q = new WaitNode();
//s<COMPLETING 且 已經(jīng)創(chuàng)建WaitNode,但還沒有入隊
else if (!queued)
/**
* 1、將當(dāng)前waiters賦值給q.next,即“q-->當(dāng)前waiters”
* 2、CAS,將waiters屬性,從“當(dāng)前waiters-->q”
* 所以后等待的會排在鏈表的前面,而任務(wù)完成時會從鏈表前面開始依次喚醒等待線程
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//所有準(zhǔn)備工作完成,判斷等待是否需要計時
else if (timed) {
nanos = deadline - System.nanoTime();
//如果已經(jīng)等待超時,remove當(dāng)前WaiterNode
if (nanos <= 0L) {
removeWaiter(q); //等待超時的情況下,從waiters鏈表中刪除q
return state;
}
LockSupport.parkNanos(this, nanos); //掛起一段時間
}
else
LockSupport.park(this); //一直掛起,等待喚醒
}
}
1、判斷主線程是否被中斷,如果被中斷,將當(dāng)前WaitNode節(jié)點從waiters鏈表中刪除,并上拋InterruptedException
2、如果任務(wù)已經(jīng)完成(可能是正常完成、異常、中斷),直接返回(即還沒有開始等待,任務(wù)已經(jīng)完成了,就返回了)
3、如果任務(wù)正在完成,讓出CPU資源,等待state變成NORMAL或EXCEPTIONAL
4、如果任務(wù)沒有被中斷,也沒有完成,new WaitNode()
5、如果任務(wù)沒有被中斷,也沒有完成,也創(chuàng)建了WaitNode,使用UNSAFE.CAS()操作將WaitNode加入waiters鏈表
6、所有準(zhǔn)備工作完畢,通過LockSupport的park或parkNanos掛起線程
而WaitNode就是一個簡單的鏈表節(jié)點,記錄這等待的線程和下一個WaitNode
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread; //等待的線程
volatile WaitNode next; //下一個WaitNode
WaitNode() { thread = Thread.currentThread(); }
}
FutureTask.cancel()的實現(xiàn)
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt(); //中斷線程
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}
1、如果任務(wù)不是運(yùn)行狀態(tài),直接返回false失敗
2、如果mayInterruptIfRunning==true,中斷運(yùn)行中的任務(wù),使用CAS操作將狀態(tài)NEW-->INTERRUPTING,再調(diào)用runner.interrupt(),最后將狀態(tài)置為INTERRUPTED
3、如果mayInterruptIfRunning==false,將任務(wù)置為CANCELLED取消狀態(tài)
4、調(diào)用finishCompletion()依次喚醒等待獲取結(jié)果的線程,返回true取消成功
四、使用示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestFuture {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(1);
Task task = new Task(); //callable任務(wù)
Future<Integer> result = executor.submit(task);
executor.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主線程在執(zhí)行任務(wù)");
try {
System.out.println("task運(yùn)行結(jié)果:"+result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任務(wù)執(zhí)行完畢");
}
static class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子線程在進(jìn)行計算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
}
運(yùn)行結(jié)果:
子線程在進(jìn)行計算
主線程在執(zhí)行任務(wù)
task運(yùn)行結(jié)果:4950
所有任務(wù)執(zhí)行完畢
如果只是想控制在某些情況下可以將任務(wù)取消,可以使用Future<?> future = executor.submit(runnable),這樣返回結(jié)果肯定為null,但可以使用future.cancel()取消任務(wù)執(zhí)行
五、總結(jié)
1、有了Runnable,為什么還需要Callable,它們的區(qū)別是什么?
Runnable和Callable都表示執(zhí)行的任務(wù),但不同的是Runnable.run()方法沒有返回值,Callable.call()有返回值
但其實線程在執(zhí)行任務(wù)時還是執(zhí)行的Runnable.run()方法,所以在使用ThreadPoolExecutor.submit()時會將Callable封裝為FutureTask,而FutureTask是Runnable和Future的實現(xiàn)類
所以在執(zhí)行Callable的任務(wù)時,線程其實是執(zhí)行FutureTask這個Runnable的run()方法,其中封裝了調(diào)用Callable.call()并返回結(jié)果的邏輯執(zhí)行Runnable任務(wù)如果發(fā)生異常,主線程無法知曉;而執(zhí)行Callable任務(wù)如果發(fā)生異常,在Future.get()時會拋出java.util.concurrent.ExecutionException,其中封裝了真實異常
2、Future.get()是如何獲取線程返回值的?
首先得益于Callable.call()方法定義了返回值,提交Callable任務(wù)后,Callable會被封裝成FutureTask,其既可以作為Runnable被執(zhí)行,也可以作為Future獲取返回值,F(xiàn)utureTask.run()方法會調(diào)用Callable.call()中的任務(wù)代碼
在任務(wù)執(zhí)行完成前,如果主線程使用Future.get(),其實是調(diào)用FutureTask.get(),其中會判斷任務(wù)狀態(tài)尚未結(jié)束,將主線程加入waiters等待鏈表,并掛起主線程
待任務(wù)執(zhí)行結(jié)束后,F(xiàn)utureTask會喚醒所有等待獲取返回值的線程,此時主線程的FutureTask.get()就會返回了所以,主線程和運(yùn)行線程是通過FutureTask作為橋梁獲取線程返回值的
3、Future.cancel()真的能取消任務(wù)的執(zhí)行嗎?
首先答案是“不一定”,根據(jù)JDK中的方法注釋“Attempts to cancel execution of this task”,即嘗試去取消執(zhí)行的任務(wù)
如果任務(wù)正在執(zhí)行,且調(diào)用cancel()時參數(shù)mayInterruptIfRunning傳的是true,那么會對執(zhí)行線程調(diào)用interrupt()方法
那么問題就變成了interrupt()方法能中斷線程執(zhí)行嗎?
interrupt()方法不會中斷正在運(yùn)行的線程。這一方法實際上完成的是在線程受到阻塞時拋出一個中斷信號,這樣線程就得以退出阻塞的狀態(tài)。更確切的說,如果線程被Object.wait()、Thread.join()、Thread.sleep()等阻塞,那么它將接收到一個中斷異常(InterruptedException),從而提早地終結(jié)被阻塞狀態(tài)。
如果線程沒有被阻塞,調(diào)用interrupt()將不起作用
那么即使線程正在阻塞狀態(tài),并拋出了InterruptedException,線程能否真的取消執(zhí)行還要看代碼中是否捕獲了InterruptedException和有沒有做相應(yīng)的對中斷標(biāo)示的判斷邏輯