FutureTask原理解析

原文鏈接:http://www.studyshare.cn/blog/details/1130/1

首先寫一個(gè)簡單的Demo

public static void main(String[] args) throws Exception{

FutureTask futureTask =new FutureTask(new Callable() {

????????@Override

? ? ? ? public Object call() throws Exception {

????????????long startTime = System.currentTimeMillis();

? ? ? ? ? ? int count =0;

? ? ? ? ? ? //進(jìn)行累加,讓子線程執(zhí)行一段時(shí)間

? ? ? ? ? ? for(int i=0;i<2000000000;i++){

????????????????count += i;

? ? ? ? ? ? }

????????????System.out.println("count = " + count +" use time is "+ (System.currentTimeMillis()-startTime));

? ? ? ? ? ? return "test";

? ? ? ? }

});

? ? new Thread(futureTask).start();

? ? //get()在主線程中執(zhí)行,阻塞方法,主線程等待子線程執(zhí)行完才被喚醒執(zhí)行獲取子線程返回的結(jié)果

? ? System.out.println(futureTask.get());

}

打印結(jié)果很簡單:

更多深度技術(shù)文章,在這里。?java開發(fā)工具下載地址及安裝教程大全,點(diǎn)這里。

首先解釋FutureTask的成員變量含義,在它的各種方法中很多地方使用

????????volatile int state :用來表示當(dāng)前執(zhí)行線程的狀態(tài),用volatile關(guān)鍵字修飾,表示內(nèi)存可見性,其他線程修改volatile修飾的state后,子線程會強(qiáng) ????????制到主內(nèi)存取state最新的值,可參考文章:https://www.cnblogs.com/daxin/p/3364014.html

????????private static final int NEW =0;? //表示新建的狀態(tài)

????????private static final int COMPLETING? =1;//執(zhí)行中狀態(tài)(或者即將完成)

????????private static final int NORMAL? ? ? =2; //正常執(zhí)行結(jié)束狀態(tài)

????????private static final int EXCEPTIONAL? =3;//異常狀態(tài)

????????private static final int CANCELLED? ? =4;//已取消狀態(tài)

????????private static final int INTERRUPTING =5;//中斷中狀態(tài)

????????private static final int INTERRUPTED? =6;//已中斷狀態(tài)

? ??????private Callable?callable : Callable接口變量,外部實(shí)例化傳遞到FutureTask中

? ??????private Object outcome :用于存放返回結(jié)果或者異常信息

????????private volatile Thread runner :存放當(dāng)前子線程(執(zhí)行任務(wù)的線程)

? ??????private volatile WaitNode waiters :一個(gè)節(jié)點(diǎn)內(nèi)部類,類中包含

? ??????static final class WaitNode {

????????????????volatile Thread thread;//線程成員變量

? ? ? ? ? ? ? ? volatile WaitNode next;//下一個(gè)包裝了等待線程的節(jié)點(diǎn)

? ? ????????????WaitNode() {thread = Thread.currentThread(); }//初始化時(shí)當(dāng)前線程設(shè)置為需要等待的線程

? ? ? ? ?}

當(dāng)前子線程執(zhí)行過程中以上狀態(tài)會有以下幾種路線:

????????1.??NEW -> COMPLETING -> NORMAL : 新建 --> 執(zhí)行--> 正常結(jié)束

? ? ? ? 2.??NEW -> COMPLETING -> EXCEPTIONAL:新建--> 執(zhí)行--> 異常結(jié)束(執(zhí)行過程中發(fā)生異常)

? ? ? ? 3.? NEW -> CANCELLED : 新建--> 取消(準(zhǔn)備執(zhí)行時(shí)主線程調(diào)用子線程的取消方法)

? ? ? ? 4.??NEW -> INTERRUPTING -> INTERRUPTED :新建--> 被通知中斷(準(zhǔn)備執(zhí)行時(shí)主線程調(diào)用子線程取消方法并傳遞中斷標(biāo)志位true)-->已中斷

FutureTask的主要方法:

? ? ? ? 1.FutureTask(Callable callable):構(gòu)造方法,初始化傳入的callable,并將state設(shè)置為NEW

? ? ? ? 2.isCancelled() :獲取當(dāng)前執(zhí)行中的線程狀態(tài)是否已經(jīng)取消,已取消返回true,否則返回false

? ? ? ? 3.isDone() : 獲取當(dāng)前執(zhí)行的線程的狀態(tài)是否處于運(yùn)行狀態(tài),是返回true,否則返回false

? ? ? ? 4.cancel(boolean mayInterruptIfRunning):發(fā)出取消或者中斷當(dāng)前線程的信號。參數(shù)為true,則發(fā)出中斷,否則為取消

? ? ? ? 5.get()、get(long timeout, TimeUnit unit) :獲取線程執(zhí)行完成后的返回結(jié)果

? ? ? ? 6.set(V v):設(shè)置線程的結(jié)果值到成員變量outcome 中

? ? ? ? 7.setException(Throwable t):設(shè)置線程出現(xiàn)異常時(shí)的異常值到成員變量outcome 中

? ? ? ? 8.run():FutureTask實(shí)現(xiàn)了Runnable接口,因此需要覆蓋run()方法,call()方法就是在run()方法中進(jìn)行調(diào)用

? ? ? ? 9.finishCompletion():該方法是執(zhí)行的線程完成(無論正常完成還是異常完成)后喚醒其他等待的線程繼續(xù)執(zhí)行。并清空callable。

? ? ? ? 10.awaitDone(boolean timed, long nanos):核心方法,當(dāng)執(zhí)行線程在運(yùn)行中,主線程調(diào)用get()方法后則加入等待隊(duì)列,類似AQS的同步隊(duì)列,執(zhí)行的線程執(zhí)行完畢會調(diào)用finishCompletion()方法喚醒等待隊(duì)列中的線程進(jìn)行執(zhí)行。

下面對Demo中的代碼進(jìn)行源碼分析:

? ? ? ? 第一步:執(zhí)行構(gòu)造方法

????????????public FutureTask(Callable callable) {

????????????????????if (callable ==null) //判斷傳入的callable不為空,則初始化FutureTask中的成員變量

????????????????????????????throw new NullPointerException();

? ????????????? ????this.callable = callable;

? ? ? ? ? ? ? ? ????this.state =NEW;? ? ? // ensure visibility of callable

????????????}

? ? ? ? 第二步:new Thread(futureTask).start();

? ? ? ? ????此處實(shí)例化一個(gè)線程并調(diào)用start()方法,則操作系統(tǒng)調(diào)度該線程,并分配CPU時(shí)間片,如果該線程獲取到CPU時(shí)間片后,則會執(zhí)行run方法

? ? ? ? ????注意:由于使用的是FutureTask,該類實(shí)現(xiàn)了Runnable接口并覆寫了run方法,則會進(jìn)入到FutrueTask的run方法中執(zhí)行,代碼如下:

? ??????????public void run() {

? ? ? ? ? ? ? ? //首先判斷state是否是處于新建以外的狀態(tài)(如果是新建以外的狀態(tài)則直接返回,因?yàn)榫€程剛進(jìn)入run只會是new狀態(tài),為了程序健壯性做此判斷),compareAndSwapObject是判斷當(dāng)前FutureTask中的runner是否是null,為空則將當(dāng)前執(zhí)行的這個(gè)線程賦值給runner,不為空就直接退出

????????????????if (state !=NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))

????????????????????return;

? ? ? ? ? ? ? ? try {

????????????????????Callable c =callable;

? ? ? ? ????????????if (c !=null &&state ==NEW) { //判斷狀態(tài)與callable

????????????????????V result;

? ? ? ? ? ????????? boolean ran;

? ? ? ? ? ? ????????try {

????????????????????????result = c.call(); //執(zhí)行Callable中的call()方法

? ? ? ? ? ? ? ? ????????ran =true;

? ? ? ? ? ? ????????}catch (Throwable ex) {//如果執(zhí)行call()方法出現(xiàn)了異常,則捕獲異常并去設(shè)置異常信息

????????????????????????result =null;

? ? ? ? ? ? ? ? ????????ran =false;

? ? ? ? ? ? ? ? ????????setException(ex);//捕獲異常并去設(shè)置異常信息,將異常信息設(shè)置到Object outcome結(jié)果對象中,同時(shí)改變state的狀態(tài)為EXCEPTIONAL

? ? ? ? ? ? ????????}

????????????????????if (ran)

????????????????????????set(result);//正常執(zhí)行,設(shè)置返回結(jié)果,設(shè)置到Object outcome中,并設(shè)置state狀態(tài)為NORMAL

? ? ? ? ? ? ? ? ? ? }

????????????????????}finally {

? ? ? ? ????????????????runner =null;//清空runner,方便GC回收

? ? ? ? ????????????????int s =state;

? ? ? ? ????????????????if (s >=INTERRUPTING) //如果state狀態(tài)為中斷中或者已中斷

????????????????????????????handlePossibleCancellationInterrupt(s);//交出當(dāng)前線程的執(zhí)行權(quán),與其他線程重新競爭

? ? ????????????????????}

????????????????}

? ? ? ? ? ? 此處看一下setException()與set()方法內(nèi)的源碼

? ??????????setException():

? ??????????????protected void setException(Throwable t) {

? ? ? ? ? ? ? ? ? ? //原子操作比較state的內(nèi)存地址上的值是否與NEW相等,相等則將state修改為COMPLETING

????????????????????if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

????????????????????????outcome = t;//使用outcome保存異常結(jié)果信息

? ? ? ? ????????????????UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 將state設(shè)置為EXCEPTIONAL

? ? ? ? ????????????????finishCompletion();//下面貼出源碼,具體解釋

? ????????????????? }

????????????????}

? ? ? ? ? ? set():

? ??????????????protected void set(V v) {

? ? ? ? ? ? ? ? ? ? ? ? //原子操作比較state的內(nèi)存地址上的值是否與NEW相等,相等則將state修改為COMPLETING

????????????????????????if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

????????????????????????????????outcome = v;//使用outcome保存正常返回的結(jié)果信息

? ? ? ? ????????????????????????UNSAFE.putOrderedInt(this, stateOffset, NORMAL); ?// 將state設(shè)置為NORMAL

? ? ? ? ????????????????????????finishCompletion();

? ? ????????????????????}

????????????????????}

? ??????????????finishCompletion():這是線程執(zhí)行(正?;蛘弋惓#┑淖詈笠粋€(gè)方法,下面分析一下源碼

? ??????????????????private void finishCompletion() {

????????????????????????// 無論執(zhí)行的線程是正常還是異常都會返回結(jié)果并設(shè)置到outcome中,那么返回結(jié)果后,主線程或者其他線程還在等待隊(duì)列中,則需要去喚醒等待隊(duì)列中的線程進(jìn)行執(zhí)行。

? ? ????????????????????for (WaitNode q; (q =waiters) !=null;) {//循環(huán)等待隊(duì)列中的線程節(jié)點(diǎn)

????????????????????????????if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//判斷等待線程節(jié)點(diǎn)是否與之前設(shè)置的等待線程一致,一致則返回true

????????????????????????????????for (;;) {//自旋

????????????????????????????????????Thread t = q.thread;

? ? ? ? ? ? ? ? ????????????????????if (t !=null) {//等待線程不為空

????????????????????????????????????????q.thread =null;//等待線程節(jié)點(diǎn)中的線程局部變量置空

? ? ? ? ? ? ? ? ? ? ????????????????????LockSupport.unpark(t);//喚醒等待線程執(zhí)行

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

????????????????????????????????????WaitNode next = q.next;//獲取等待線程節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)

? ? ? ? ? ? ? ? ????????????????????if (next ==null)// 如果為空,則說明當(dāng)前等待線程節(jié)點(diǎn)就是頭結(jié)點(diǎn),已經(jīng)沒有后續(xù)等待節(jié)點(diǎn),

????????????????????????????????????????break;//退出自旋

? ? ? ? ? ? ? ? ????????????????????????q.next =null; // 置空,方便GC回收

? ? ? ? ? ? ? ? ????????????????????????q = next;//當(dāng)前等待線程節(jié)點(diǎn)也置空,在上面已經(jīng)喚醒了當(dāng)前的等待線程,因此此處也將已經(jīng)喚醒的線程置空讓GC回收

? ? ? ? ? ? ????????????????????????}

????????????????????????????????????break;

? ? ? ? ????????????????????????}

????????????????????????????}

????????????????????????????done();

? ? ????????????????????????callable =null;? ? ? ? // 當(dāng)前線程中的callable置空,讓gc回收,整個(gè)線程到此執(zhí)行完畢

????????????????????????}

? ? ? ? ? ? 以上詳細(xì)說明線程start()方法調(diào)用后的一系列執(zhí)行過程。那么有個(gè)問題就是這個(gè)等待線程節(jié)點(diǎn)是從哪里產(chǎn)生的呢?下面解釋這個(gè)問題

? ? ? ? ? ? 看Demo中的代碼:?System.out.println(futureTask.get()); 這句代碼在demo有解釋,get()方法是一個(gè)阻塞的方法,那么就是這個(gè)get()

????????????方法產(chǎn)生的WaitNode節(jié)點(diǎn)的。看源碼:

? ??????????????public V get() throws InterruptedException, ExecutionException {

????????????????????int s =state;?

? ? ????????????????if (s <=COMPLETING) //判斷state是否處于執(zhí)行中或者新建狀態(tài),是的話則說明調(diào)用get()方法的線程需要進(jìn)入等待隊(duì)列,等待callable所在的線程執(zhí)行完畢后并等待喚醒,此處的代碼就證明了get()方法是阻塞的

????????????????????????s = awaitDone(false, 0L);//下面分析源碼

? ? ????????????????return report(s);

? ? ? ? ? ? ?????}

? ??????????????awaitDone():

? ??????????????????private int awaitDone(boolean timed, long nanos) throws InterruptedException {

????????????????????????????final long deadline = timed ? System.nanoTime() + nanos :0L;//引入等待超時(shí)機(jī)制,調(diào)用get()方法傳false,0L則表示不進(jìn)行超時(shí)處理

? ? ? ? ? ? ? ? ? ? ? ? ? ? WaitNode q =null;

? ? ? ? ? ? ? ? ? ? ? ? ? ? boolean queued =false;

? ? ????????????????????????for (;;) {//自旋

????????????????????????????????if (Thread.interrupted()) {//判斷當(dāng)前線程(主線程)是否已經(jīng)中斷,中斷則去移除等待線程節(jié)點(diǎn)并拋出中斷異常,此代碼為程序健壯性考慮,不必過分關(guān)注

????????????????????????????????????removeWaiter(q);

? ? ? ? ? ? ????????????????????????throw new InterruptedException();

? ? ? ? ????????????????????????}

????????????????????????????????int s =state;

? ? ? ????????????????????????? if (s >COMPLETING) {//如果執(zhí)行的子線程的state狀態(tài)已經(jīng)處于NORMAL或者EXCEPTIONAL狀態(tài),說明子線程已經(jīng)執(zhí)行結(jié)束了,那么直接返回,說明不用讓阻塞線程進(jìn)入等待隊(duì)列。

????????????????????????????????????if (q !=null)//等待線程節(jié)點(diǎn)不為空,則直接置為空,并返回state

????????????????????????????????????????q.thread =null;

? ? ? ? ? ? ????????????????????????????return s;

? ? ? ????????????????????????? }

????????????????????????????????else if (s ==COMPLETING)// 重點(diǎn),當(dāng)當(dāng)前執(zhí)行子線程還在運(yùn)行中的時(shí)候,則此時(shí)要讓主線程交出CPU執(zhí)行權(quán)。

? ? ? ? ? ? ????????????????????????Thread.yield();//交出CPU執(zhí)行權(quán),yield()方法雖然是交出執(zhí)行權(quán),但主線程還是可以和其他線程進(jìn)行公平競爭

? ? ? ????????????????????????? else if (q ==null)//重點(diǎn),以上的條件都不滿足,則說明該線程確實(shí)需要進(jìn)入等待隊(duì)列進(jìn)行等待

????????????????????????????????????q =new WaitNode();//構(gòu)造一個(gè)等待節(jié)點(diǎn),該構(gòu)造方法中將當(dāng)前線程作為參數(shù)傳遞給節(jié)點(diǎn)中的局部變量進(jìn)行保存

? ? ? ? ????????????????????????else if (!queued)//注意,此處較難理解,這是入隊(duì)列的操作,q.next=waiters ,如果是首節(jié)點(diǎn),waiters一定是null的,則q.next=null,waitersOffset偏移量指向的地址上的初始值也是null,則期望值與內(nèi)存地址值都為null,則會將q的值設(shè)置到waitersOffset指向的地址,同時(shí)返回true,等待節(jié)點(diǎn)進(jìn)入隊(duì)列就成功。原子操作比較抽象,建議去深入理解CAS操作

????????????????????????????????????queued =UNSAFE.compareAndSwapObject(this, waitersOffset, q.next =waiters, q);

? ? ? ? ????????????????????????else if (timed) {

????????????????????????????????????nanos = deadline - System.nanoTime();

? ? ? ? ? ? ????????????????????????if (nanos <=0L) {

????????????????????????????????????????removeWaiter(q);

? ? ? ? ? ? ? ? ????????????????????????return state;

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}

????????????????????????????????????LockSupport.parkNanos(this, nanos);//這是帶有超時(shí)設(shè)置的阻塞方法

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? ? ? ? ? ? ? else

? ? ? ? ? ? ????????????????LockSupport.park(this);//自旋到這里的時(shí)候,說明已經(jīng)加入等待隊(duì)列,阻塞當(dāng)前線程,讓其等待被喚醒。

? ? ????????????????????}

????????????????????}

? ? ? ? ? ? ? ? 另外最后獲取返回值的方法report()比較簡單就不多做解釋。內(nèi)部就是返回outcome的值

????????????????????private V report(int s)throws ExecutionException {

????????????????????????Object x =outcome;

? ????????????????????? if (s ==NORMAL)

????????????????????????????return (V)x;

? ????????????????????? if (s >=CANCELLED)

????????????????????????????throw new CancellationException();

? ????????????????????????? throw new ExecutionException((Throwable)x);

????????????????????}

? ? ? ? ? ? ? ? 到此,F(xiàn)utureTask源碼就分析完畢,總結(jié)一下:Callable實(shí)現(xiàn)的線程內(nèi)部使用state的轉(zhuǎn)換,這種轉(zhuǎn)換是基于原子操作來保證線程安全(多線程環(huán)境下對state進(jìn)行競爭),同時(shí)其他非Callable的外部線程調(diào)用FutureTask中的方法(主要是get()),則讓這些線程進(jìn)入等待隊(duì)列,當(dāng)Callable的線程執(zhí)行完畢會使用自旋對等待中的線程進(jìn)行喚醒。

更多深度技術(shù)文章,在這里。?java開發(fā)工具下載地址及安裝教程大全,點(diǎn)這里。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容