原文鏈接:http://www.studyshare.cn/blog/details/1130/1
首先寫一個(gè)簡單的Demo
public static void main(String[] args) throws Exception{
FutureTask futureTask =new FutureTask(new Callable() {
????????@Override
? ? ? ? public Object call() throws Exception {
????????????long startTime = System.currentTimeMillis();
? ? ? ? ? ? int count =0;
? ? ? ? ? ? //進(jìn)行累加,讓子線程執(zhí)行一段時(shí)間
? ? ? ? ? ? for(int i=0;i<2000000000;i++){
????????????????count += i;
? ? ? ? ? ? }
????????????System.out.println("count = " + count +" use time is "+ (System.currentTimeMillis()-startTime));
? ? ? ? ? ? return "test";
? ? ? ? }
});
? ? new Thread(futureTask).start();
? ? //get()在主線程中執(zhí)行,阻塞方法,主線程等待子線程執(zhí)行完才被喚醒執(zhí)行獲取子線程返回的結(jié)果
? ? System.out.println(futureTask.get());
}
打印結(jié)果很簡單:

更多深度技術(shù)文章,在這里。?java開發(fā)工具下載地址及安裝教程大全,點(diǎn)這里。
首先解釋FutureTask的成員變量含義,在它的各種方法中很多地方使用
????????volatile int state :用來表示當(dāng)前執(zhí)行線程的狀態(tài),用volatile關(guān)鍵字修飾,表示內(nèi)存可見性,其他線程修改volatile修飾的state后,子線程會強(qiáng) ????????制到主內(nèi)存取state最新的值,可參考文章:https://www.cnblogs.com/daxin/p/3364014.html
????????private static final int NEW =0;? //表示新建的狀態(tài)
????????private static final int COMPLETING? =1;//執(zhí)行中狀態(tài)(或者即將完成)
????????private static final int NORMAL? ? ? =2; //正常執(zhí)行結(jié)束狀態(tài)
????????private static final int EXCEPTIONAL? =3;//異常狀態(tài)
????????private static final int CANCELLED? ? =4;//已取消狀態(tài)
????????private static final int INTERRUPTING =5;//中斷中狀態(tài)
????????private static final int INTERRUPTED? =6;//已中斷狀態(tài)
? ??????private Callable?callable : Callable接口變量,外部實(shí)例化傳遞到FutureTask中
? ??????private Object outcome :用于存放返回結(jié)果或者異常信息
????????private volatile Thread runner :存放當(dāng)前子線程(執(zhí)行任務(wù)的線程)
? ??????private volatile WaitNode waiters :一個(gè)節(jié)點(diǎn)內(nèi)部類,類中包含
? ??????static final class WaitNode {
????????????????volatile Thread thread;//線程成員變量
? ? ? ? ? ? ? ? volatile WaitNode next;//下一個(gè)包裝了等待線程的節(jié)點(diǎn)
? ? ????????????WaitNode() {thread = Thread.currentThread(); }//初始化時(shí)當(dāng)前線程設(shè)置為需要等待的線程
? ? ? ? ?}
當(dāng)前子線程執(zhí)行過程中以上狀態(tài)會有以下幾種路線:
????????1.??NEW -> COMPLETING -> NORMAL : 新建 --> 執(zhí)行--> 正常結(jié)束
? ? ? ? 2.??NEW -> COMPLETING -> EXCEPTIONAL:新建--> 執(zhí)行--> 異常結(jié)束(執(zhí)行過程中發(fā)生異常)
? ? ? ? 3.? NEW -> CANCELLED : 新建--> 取消(準(zhǔn)備執(zhí)行時(shí)主線程調(diào)用子線程的取消方法)
? ? ? ? 4.??NEW -> INTERRUPTING -> INTERRUPTED :新建--> 被通知中斷(準(zhǔn)備執(zhí)行時(shí)主線程調(diào)用子線程取消方法并傳遞中斷標(biāo)志位true)-->已中斷
FutureTask的主要方法:
? ? ? ? 1.FutureTask(Callable callable):構(gòu)造方法,初始化傳入的callable,并將state設(shè)置為NEW
? ? ? ? 2.isCancelled() :獲取當(dāng)前執(zhí)行中的線程狀態(tài)是否已經(jīng)取消,已取消返回true,否則返回false
? ? ? ? 3.isDone() : 獲取當(dāng)前執(zhí)行的線程的狀態(tài)是否處于運(yùn)行狀態(tài),是返回true,否則返回false
? ? ? ? 4.cancel(boolean mayInterruptIfRunning):發(fā)出取消或者中斷當(dāng)前線程的信號。參數(shù)為true,則發(fā)出中斷,否則為取消
? ? ? ? 5.get()、get(long timeout, TimeUnit unit) :獲取線程執(zhí)行完成后的返回結(jié)果
? ? ? ? 6.set(V v):設(shè)置線程的結(jié)果值到成員變量outcome 中
? ? ? ? 7.setException(Throwable t):設(shè)置線程出現(xiàn)異常時(shí)的異常值到成員變量outcome 中
? ? ? ? 8.run():FutureTask實(shí)現(xiàn)了Runnable接口,因此需要覆蓋run()方法,call()方法就是在run()方法中進(jìn)行調(diào)用
? ? ? ? 9.finishCompletion():該方法是執(zhí)行的線程完成(無論正常完成還是異常完成)后喚醒其他等待的線程繼續(xù)執(zhí)行。并清空callable。
? ? ? ? 10.awaitDone(boolean timed, long nanos):核心方法,當(dāng)執(zhí)行線程在運(yùn)行中,主線程調(diào)用get()方法后則加入等待隊(duì)列,類似AQS的同步隊(duì)列,執(zhí)行的線程執(zhí)行完畢會調(diào)用finishCompletion()方法喚醒等待隊(duì)列中的線程進(jìn)行執(zhí)行。
下面對Demo中的代碼進(jìn)行源碼分析:
? ? ? ? 第一步:執(zhí)行構(gòu)造方法
????????????public FutureTask(Callable callable) {
????????????????????if (callable ==null) //判斷傳入的callable不為空,則初始化FutureTask中的成員變量
????????????????????????????throw new NullPointerException();
? ????????????? ????this.callable = callable;
? ? ? ? ? ? ? ? ????this.state =NEW;? ? ? // ensure visibility of callable
????????????}
? ? ? ? 第二步:new Thread(futureTask).start();
? ? ? ? ????此處實(shí)例化一個(gè)線程并調(diào)用start()方法,則操作系統(tǒng)調(diào)度該線程,并分配CPU時(shí)間片,如果該線程獲取到CPU時(shí)間片后,則會執(zhí)行run方法
? ? ? ? ????注意:由于使用的是FutureTask,該類實(shí)現(xiàn)了Runnable接口并覆寫了run方法,則會進(jìn)入到FutrueTask的run方法中執(zhí)行,代碼如下:
? ??????????public void run() {
? ? ? ? ? ? ? ? //首先判斷state是否是處于新建以外的狀態(tài)(如果是新建以外的狀態(tài)則直接返回,因?yàn)榫€程剛進(jìn)入run只會是new狀態(tài),為了程序健壯性做此判斷),compareAndSwapObject是判斷當(dāng)前FutureTask中的runner是否是null,為空則將當(dāng)前執(zhí)行的這個(gè)線程賦值給runner,不為空就直接退出
????????????????if (state !=NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
????????????????????return;
? ? ? ? ? ? ? ? try {
????????????????????Callable c =callable;
? ? ? ? ????????????if (c !=null &&state ==NEW) { //判斷狀態(tài)與callable
????????????????????V result;
? ? ? ? ? ????????? boolean ran;
? ? ? ? ? ? ????????try {
????????????????????????result = c.call(); //執(zhí)行Callable中的call()方法
? ? ? ? ? ? ? ? ????????ran =true;
? ? ? ? ? ? ????????}catch (Throwable ex) {//如果執(zhí)行call()方法出現(xiàn)了異常,則捕獲異常并去設(shè)置異常信息
????????????????????????result =null;
? ? ? ? ? ? ? ? ????????ran =false;
? ? ? ? ? ? ? ? ????????setException(ex);//捕獲異常并去設(shè)置異常信息,將異常信息設(shè)置到Object outcome結(jié)果對象中,同時(shí)改變state的狀態(tài)為EXCEPTIONAL
? ? ? ? ? ? ????????}
????????????????????if (ran)
????????????????????????set(result);//正常執(zhí)行,設(shè)置返回結(jié)果,設(shè)置到Object outcome中,并設(shè)置state狀態(tài)為NORMAL
? ? ? ? ? ? ? ? ? ? }
????????????????????}finally {
? ? ? ? ????????????????runner =null;//清空runner,方便GC回收
? ? ? ? ????????????????int s =state;
? ? ? ? ????????????????if (s >=INTERRUPTING) //如果state狀態(tài)為中斷中或者已中斷
????????????????????????????handlePossibleCancellationInterrupt(s);//交出當(dāng)前線程的執(zhí)行權(quán),與其他線程重新競爭
? ? ????????????????????}
????????????????}
? ? ? ? ? ? 此處看一下setException()與set()方法內(nèi)的源碼
? ??????????setException():
? ??????????????protected void setException(Throwable t) {
? ? ? ? ? ? ? ? ? ? //原子操作比較state的內(nèi)存地址上的值是否與NEW相等,相等則將state修改為COMPLETING
????????????????????if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
????????????????????????outcome = t;//使用outcome保存異常結(jié)果信息
? ? ? ? ????????????????UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 將state設(shè)置為EXCEPTIONAL
? ? ? ? ????????????????finishCompletion();//下面貼出源碼,具體解釋
? ????????????????? }
????????????????}
? ? ? ? ? ? set():
? ??????????????protected void set(V v) {
? ? ? ? ? ? ? ? ? ? ? ? //原子操作比較state的內(nèi)存地址上的值是否與NEW相等,相等則將state修改為COMPLETING
????????????????????????if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
????????????????????????????????outcome = v;//使用outcome保存正常返回的結(jié)果信息
? ? ? ? ????????????????????????UNSAFE.putOrderedInt(this, stateOffset, NORMAL); ?// 將state設(shè)置為NORMAL
? ? ? ? ????????????????????????finishCompletion();
? ? ????????????????????}
????????????????????}
? ??????????????finishCompletion():這是線程執(zhí)行(正?;蛘弋惓#┑淖詈笠粋€(gè)方法,下面分析一下源碼
? ??????????????????private void finishCompletion() {
????????????????????????// 無論執(zhí)行的線程是正常還是異常都會返回結(jié)果并設(shè)置到outcome中,那么返回結(jié)果后,主線程或者其他線程還在等待隊(duì)列中,則需要去喚醒等待隊(duì)列中的線程進(jìn)行執(zhí)行。
? ? ????????????????????for (WaitNode q; (q =waiters) !=null;) {//循環(huán)等待隊(duì)列中的線程節(jié)點(diǎn)
????????????????????????????if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//判斷等待線程節(jié)點(diǎn)是否與之前設(shè)置的等待線程一致,一致則返回true
????????????????????????????????for (;;) {//自旋
????????????????????????????????????Thread t = q.thread;
? ? ? ? ? ? ? ? ????????????????????if (t !=null) {//等待線程不為空
????????????????????????????????????????q.thread =null;//等待線程節(jié)點(diǎn)中的線程局部變量置空
? ? ? ? ? ? ? ? ? ? ????????????????????LockSupport.unpark(t);//喚醒等待線程執(zhí)行
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
????????????????????????????????????WaitNode next = q.next;//獲取等待線程節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
? ? ? ? ? ? ? ? ????????????????????if (next ==null)// 如果為空,則說明當(dāng)前等待線程節(jié)點(diǎn)就是頭結(jié)點(diǎn),已經(jīng)沒有后續(xù)等待節(jié)點(diǎn),
????????????????????????????????????????break;//退出自旋
? ? ? ? ? ? ? ? ????????????????????????q.next =null; // 置空,方便GC回收
? ? ? ? ? ? ? ? ????????????????????????q = next;//當(dāng)前等待線程節(jié)點(diǎn)也置空,在上面已經(jīng)喚醒了當(dāng)前的等待線程,因此此處也將已經(jīng)喚醒的線程置空讓GC回收
? ? ? ? ? ? ????????????????????????}
????????????????????????????????????break;
? ? ? ? ????????????????????????}
????????????????????????????}
????????????????????????????done();
? ? ????????????????????????callable =null;? ? ? ? // 當(dāng)前線程中的callable置空,讓gc回收,整個(gè)線程到此執(zhí)行完畢
????????????????????????}
? ? ? ? ? ? 以上詳細(xì)說明線程start()方法調(diào)用后的一系列執(zhí)行過程。那么有個(gè)問題就是這個(gè)等待線程節(jié)點(diǎn)是從哪里產(chǎn)生的呢?下面解釋這個(gè)問題
? ? ? ? ? ? 看Demo中的代碼:?System.out.println(futureTask.get()); 這句代碼在demo有解釋,get()方法是一個(gè)阻塞的方法,那么就是這個(gè)get()
????????????方法產(chǎn)生的WaitNode節(jié)點(diǎn)的。看源碼:
? ??????????????public V get() throws InterruptedException, ExecutionException {
????????????????????int s =state;?
? ? ????????????????if (s <=COMPLETING) //判斷state是否處于執(zhí)行中或者新建狀態(tài),是的話則說明調(diào)用get()方法的線程需要進(jìn)入等待隊(duì)列,等待callable所在的線程執(zhí)行完畢后并等待喚醒,此處的代碼就證明了get()方法是阻塞的
????????????????????????s = awaitDone(false, 0L);//下面分析源碼
? ? ????????????????return report(s);
? ? ? ? ? ? ?????}
? ??????????????awaitDone():
? ??????????????????private int awaitDone(boolean timed, long nanos) throws InterruptedException {
????????????????????????????final long deadline = timed ? System.nanoTime() + nanos :0L;//引入等待超時(shí)機(jī)制,調(diào)用get()方法傳false,0L則表示不進(jìn)行超時(shí)處理
? ? ? ? ? ? ? ? ? ? ? ? ? ? WaitNode q =null;
? ? ? ? ? ? ? ? ? ? ? ? ? ? boolean queued =false;
? ? ????????????????????????for (;;) {//自旋
????????????????????????????????if (Thread.interrupted()) {//判斷當(dāng)前線程(主線程)是否已經(jīng)中斷,中斷則去移除等待線程節(jié)點(diǎn)并拋出中斷異常,此代碼為程序健壯性考慮,不必過分關(guān)注
????????????????????????????????????removeWaiter(q);
? ? ? ? ? ? ????????????????????????throw new InterruptedException();
? ? ? ? ????????????????????????}
????????????????????????????????int s =state;
? ? ? ????????????????????????? if (s >COMPLETING) {//如果執(zhí)行的子線程的state狀態(tài)已經(jīng)處于NORMAL或者EXCEPTIONAL狀態(tài),說明子線程已經(jīng)執(zhí)行結(jié)束了,那么直接返回,說明不用讓阻塞線程進(jìn)入等待隊(duì)列。
????????????????????????????????????if (q !=null)//等待線程節(jié)點(diǎn)不為空,則直接置為空,并返回state
????????????????????????????????????????q.thread =null;
? ? ? ? ? ? ????????????????????????????return s;
? ? ? ????????????????????????? }
????????????????????????????????else if (s ==COMPLETING)// 重點(diǎn),當(dāng)當(dāng)前執(zhí)行子線程還在運(yùn)行中的時(shí)候,則此時(shí)要讓主線程交出CPU執(zhí)行權(quán)。
? ? ? ? ? ? ????????????????????????Thread.yield();//交出CPU執(zhí)行權(quán),yield()方法雖然是交出執(zhí)行權(quán),但主線程還是可以和其他線程進(jìn)行公平競爭
? ? ? ????????????????????????? else if (q ==null)//重點(diǎn),以上的條件都不滿足,則說明該線程確實(shí)需要進(jìn)入等待隊(duì)列進(jìn)行等待
????????????????????????????????????q =new WaitNode();//構(gòu)造一個(gè)等待節(jié)點(diǎn),該構(gòu)造方法中將當(dāng)前線程作為參數(shù)傳遞給節(jié)點(diǎn)中的局部變量進(jìn)行保存
? ? ? ? ????????????????????????else if (!queued)//注意,此處較難理解,這是入隊(duì)列的操作,q.next=waiters ,如果是首節(jié)點(diǎn),waiters一定是null的,則q.next=null,waitersOffset偏移量指向的地址上的初始值也是null,則期望值與內(nèi)存地址值都為null,則會將q的值設(shè)置到waitersOffset指向的地址,同時(shí)返回true,等待節(jié)點(diǎn)進(jìn)入隊(duì)列就成功。原子操作比較抽象,建議去深入理解CAS操作
????????????????????????????????????queued =UNSAFE.compareAndSwapObject(this, waitersOffset, q.next =waiters, q);
? ? ? ? ????????????????????????else if (timed) {
????????????????????????????????????nanos = deadline - System.nanoTime();
? ? ? ? ? ? ????????????????????????if (nanos <=0L) {
????????????????????????????????????????removeWaiter(q);
? ? ? ? ? ? ? ? ????????????????????????return state;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
????????????????????????????????????LockSupport.parkNanos(this, nanos);//這是帶有超時(shí)設(shè)置的阻塞方法
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? else
? ? ? ? ? ? ????????????????LockSupport.park(this);//自旋到這里的時(shí)候,說明已經(jīng)加入等待隊(duì)列,阻塞當(dāng)前線程,讓其等待被喚醒。
? ? ????????????????????}
????????????????????}
? ? ? ? ? ? ? ? 另外最后獲取返回值的方法report()比較簡單就不多做解釋。內(nèi)部就是返回outcome的值
????????????????????private V report(int s)throws ExecutionException {
????????????????????????Object x =outcome;
? ????????????????????? if (s ==NORMAL)
????????????????????????????return (V)x;
? ????????????????????? if (s >=CANCELLED)
????????????????????????????throw new CancellationException();
? ????????????????????????? throw new ExecutionException((Throwable)x);
????????????????????}
? ? ? ? ? ? ? ? 到此,F(xiàn)utureTask源碼就分析完畢,總結(jié)一下:Callable實(shí)現(xiàn)的線程內(nèi)部使用state的轉(zhuǎn)換,這種轉(zhuǎn)換是基于原子操作來保證線程安全(多線程環(huán)境下對state進(jìn)行競爭),同時(shí)其他非Callable的外部線程調(diào)用FutureTask中的方法(主要是get()),則讓這些線程進(jìn)入等待隊(duì)列,當(dāng)Callable的線程執(zhí)行完畢會使用自旋對等待中的線程進(jìn)行喚醒。